• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.7
/lbry/extras/daemon/components.py
1
import math
1✔
2
import os
1✔
3
import asyncio
1✔
4
import logging
1✔
5
import binascii
1✔
6
import typing
1✔
7

8
import base58
1✔
9

10
from aioupnp import __version__ as aioupnp_version
1✔
11
from aioupnp.upnp import UPnP
1✔
12
from aioupnp.fault import UPnPError
1✔
13

14
from lbry import utils
1✔
15
from lbry.dht.node import Node
1✔
16
from lbry.dht.peer import is_valid_public_ipv4
1✔
17
from lbry.dht.blob_announcer import BlobAnnouncer
1✔
18
from lbry.blob.blob_manager import BlobManager
1✔
19
from lbry.blob.disk_space_manager import DiskSpaceManager
1✔
20
from lbry.blob_exchange.server import BlobServer
1✔
21
from lbry.stream.background_downloader import BackgroundDownloader
1✔
22
from lbry.stream.stream_manager import StreamManager
1✔
23
from lbry.file.file_manager import FileManager
1✔
24
from lbry.extras.daemon.component import Component
1✔
25
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
1✔
26
from lbry.extras.daemon.storage import SQLiteStorage
1✔
27
from lbry.torrent.torrent_manager import TorrentManager
1✔
28
from lbry.wallet import WalletManager
1✔
29
from lbry.wallet.usage_payment import WalletServerPayer
1✔
30
from lbry.torrent.tracker import TrackerClient
1✔
31
from lbry.torrent.session import TorrentSession
1✔
32

33
log = logging.getLogger(__name__)
1✔
34

35
# settings must be initialized before this file is imported
36

37
DATABASE_COMPONENT = "database"
1✔
38
BLOB_COMPONENT = "blob_manager"
1✔
39
WALLET_COMPONENT = "wallet"
1✔
40
WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
1✔
41
DHT_COMPONENT = "dht"
1✔
42
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
1✔
43
FILE_MANAGER_COMPONENT = "file_manager"
1✔
44
DISK_SPACE_COMPONENT = "disk_space"
1✔
45
BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader"
1✔
46
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
1✔
47
UPNP_COMPONENT = "upnp"
1✔
48
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
1✔
49
TRACKER_ANNOUNCER_COMPONENT = "tracker_announcer_component"
1✔
50
LIBTORRENT_COMPONENT = "libtorrent_component"
1✔
51

52

53
class DatabaseComponent(Component):
1✔
54
    component_name = DATABASE_COMPONENT
1✔
55

56
    def __init__(self, component_manager):
1✔
57
        super().__init__(component_manager)
1✔
58
        self.storage = None
1✔
59

60
    @property
1✔
61
    def component(self):
1✔
62
        return self.storage
×
63

64
    @staticmethod
1✔
65
    def get_current_db_revision():
1✔
66
        return 15
×
67

68
    @property
1✔
69
    def revision_filename(self):
1✔
70
        return os.path.join(self.conf.data_dir, 'db_revision')
×
71

72
    def _write_db_revision_file(self, version_num):
1✔
73
        with open(self.revision_filename, mode='w') as db_revision:
×
74
            db_revision.write(str(version_num))
×
75

76
    async def start(self):
1✔
77
        # check directories exist, create them if they don't
78
        log.info("Loading databases")
×
79

80
        if not os.path.exists(self.revision_filename):
×
81
            log.info("db_revision file not found. Creating it")
×
82
            self._write_db_revision_file(self.get_current_db_revision())
×
83

84
        # check the db migration and run any needed migrations
85
        with open(self.revision_filename, "r") as revision_read_handle:
×
86
            old_revision = int(revision_read_handle.read().strip())
×
87

88
        if old_revision > self.get_current_db_revision():
×
89
            raise Exception('This version of lbrynet is not compatible with the database\n'
×
90
                            'Your database is revision %i, expected %i' %
91
                            (old_revision, self.get_current_db_revision()))
92
        if old_revision < self.get_current_db_revision():
×
93
            from lbry.extras.daemon.migrator import dbmigrator  # pylint: disable=import-outside-toplevel
×
94
            log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision())
×
95
            await asyncio.get_event_loop().run_in_executor(
×
96
                None, dbmigrator.migrate_db, self.conf, old_revision, self.get_current_db_revision()
97
            )
98
            self._write_db_revision_file(self.get_current_db_revision())
×
99
            log.info("Finished upgrading the databases.")
×
100

101
        self.storage = SQLiteStorage(
×
102
            self.conf, os.path.join(self.conf.data_dir, "lbrynet.sqlite")
103
        )
104
        await self.storage.open()
×
105

106
    async def stop(self):
1✔
107
        await self.storage.close()
×
108
        self.storage = None
×
109

110

111
class WalletComponent(Component):
1✔
112
    component_name = WALLET_COMPONENT
1✔
113
    depends_on = [DATABASE_COMPONENT]
1✔
114

115
    def __init__(self, component_manager):
1✔
116
        super().__init__(component_manager)
1✔
117
        self.wallet_manager = None
1✔
118

119
    @property
1✔
120
    def component(self):
1✔
121
        return self.wallet_manager
×
122

123
    async def get_status(self):
1✔
124
        if self.wallet_manager is None:
×
125
            return
×
126
        is_connected = self.wallet_manager.ledger.network.is_connected
×
127
        sessions = []
×
128
        connected = None
×
129
        if is_connected:
×
130
            addr, port = self.wallet_manager.ledger.network.client.server
×
131
            connected = f"{addr}:{port}"
×
132
            sessions.append(self.wallet_manager.ledger.network.client)
×
133

134
        result = {
×
135
            'connected': connected,
136
            'connected_features': self.wallet_manager.ledger.network.server_features,
137
            'servers': [
138
                {
139
                    'host': session.server[0],
140
                    'port': session.server[1],
141
                    'latency': session.connection_latency,
142
                    'availability': session.available,
143
                } for session in sessions
144
            ],
145
            'known_servers': len(self.wallet_manager.ledger.network.known_hubs),
146
            'available_servers': 1 if is_connected else 0
147
        }
148

149
        if self.wallet_manager.ledger.network.remote_height:
×
150
            local_height = self.wallet_manager.ledger.local_height_including_downloaded_height
×
151
            disk_height = len(self.wallet_manager.ledger.headers)
×
152
            remote_height = self.wallet_manager.ledger.network.remote_height
×
153
            download_height, target_height = local_height - disk_height, remote_height - disk_height
×
154
            if target_height > 0:
×
155
                progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100)
×
156
            else:
157
                progress = 100
×
158
            best_hash = await self.wallet_manager.get_best_blockhash()
×
159
            result.update({
×
160
                'headers_synchronization_progress': progress,
161
                'blocks': max(local_height, 0),
162
                'blocks_behind': max(remote_height - local_height, 0),
163
                'best_blockhash': best_hash,
164
            })
165

166
        return result
×
167

168
    async def start(self):
1✔
169
        log.info("Starting wallet")
×
170
        self.wallet_manager = await WalletManager.from_lbrynet_config(self.conf)
×
171
        await self.wallet_manager.start()
×
172

173
    async def stop(self):
1✔
174
        await self.wallet_manager.stop()
×
175
        self.wallet_manager = None
×
176

177

178
class WalletServerPaymentsComponent(Component):
1✔
179
    component_name = WALLET_SERVER_PAYMENTS_COMPONENT
1✔
180
    depends_on = [WALLET_COMPONENT]
1✔
181

182
    def __init__(self, component_manager):
1✔
183
        super().__init__(component_manager)
1✔
184
        self.usage_payment_service = WalletServerPayer(
1✔
185
            max_fee=self.conf.max_wallet_server_fee, analytics_manager=self.component_manager.analytics_manager,
186
        )
187

188
    @property
1✔
189
    def component(self) -> typing.Optional[WalletServerPayer]:
1✔
190
        return self.usage_payment_service
×
191

192
    async def start(self):
1✔
193
        wallet_manager = self.component_manager.get_component(WALLET_COMPONENT)
1✔
194
        await self.usage_payment_service.start(wallet_manager.ledger, wallet_manager.default_wallet)
1✔
195

196
    async def stop(self):
1✔
197
        await self.usage_payment_service.stop()
1✔
198

199
    async def get_status(self):
1✔
200
        return {
×
201
            'max_fee': self.usage_payment_service.max_fee,
202
            'running': self.usage_payment_service.running
203
        }
204

205

206
class BlobComponent(Component):
1✔
207
    component_name = BLOB_COMPONENT
1✔
208
    depends_on = [DATABASE_COMPONENT]
1✔
209

210
    def __init__(self, component_manager):
1✔
211
        super().__init__(component_manager)
1✔
212
        self.blob_manager: typing.Optional[BlobManager] = None
1✔
213

214
    @property
1✔
215
    def component(self) -> typing.Optional[BlobManager]:
1✔
216
        return self.blob_manager
×
217

218
    async def start(self):
1✔
219
        storage = self.component_manager.get_component(DATABASE_COMPONENT)
×
220
        data_store = None
×
221
        if DHT_COMPONENT not in self.component_manager.skip_components:
×
222
            dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
×
223
            if dht_node:
×
224
                data_store = dht_node.protocol.data_store
×
225
        blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
×
226
        if not os.path.isdir(blob_dir):
×
227
            os.mkdir(blob_dir)
×
228
        self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
×
229
        return await self.blob_manager.setup()
×
230

231
    async def stop(self):
1✔
232
        self.blob_manager.stop()
×
233

234
    async def get_status(self):
1✔
235
        count = 0
×
236
        if self.blob_manager:
×
237
            count = len(self.blob_manager.completed_blob_hashes)
×
238
        return {
×
239
            'finished_blobs': count,
240
            'connections': {} if not self.blob_manager else self.blob_manager.connection_manager.status
241
        }
242

243

244
class DHTComponent(Component):
1✔
245
    component_name = DHT_COMPONENT
1✔
246
    depends_on = [UPNP_COMPONENT, DATABASE_COMPONENT]
1✔
247

248
    def __init__(self, component_manager):
1✔
249
        super().__init__(component_manager)
1✔
250
        self.dht_node: typing.Optional[Node] = None
1✔
251
        self.external_udp_port = None
1✔
252
        self.external_peer_port = None
1✔
253

254
    @property
1✔
255
    def component(self) -> typing.Optional[Node]:
1✔
256
        return self.dht_node
×
257

258
    async def get_status(self):
1✔
259
        return {
×
260
            'node_id': None if not self.dht_node else binascii.hexlify(self.dht_node.protocol.node_id),
261
            'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers())
262
        }
263

264
    def get_node_id(self):
1✔
265
        node_id_filename = os.path.join(self.conf.data_dir, "node_id")
×
266
        if os.path.isfile(node_id_filename):
×
267
            with open(node_id_filename, "r") as node_id_file:
×
268
                return base58.b58decode(str(node_id_file.read()).strip())
×
269
        node_id = utils.generate_id()
×
270
        with open(node_id_filename, "w") as node_id_file:
×
271
            node_id_file.write(base58.b58encode(node_id).decode())
×
272
        return node_id
×
273

274
    async def start(self):
1✔
275
        log.info("start the dht")
×
276
        upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
×
277
        self.external_peer_port = upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
×
278
        self.external_udp_port = upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
×
279
        external_ip = upnp_component.external_ip
×
280
        storage = self.component_manager.get_component(DATABASE_COMPONENT)
×
281
        if not external_ip:
×
282
            external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
×
283
            if not external_ip:
×
284
                log.warning("failed to get external ip")
×
285

286
        self.dht_node = Node(
×
287
            self.component_manager.loop,
288
            self.component_manager.peer_manager,
289
            node_id=self.get_node_id(),
290
            internal_udp_port=self.conf.udp_port,
291
            udp_port=self.external_udp_port,
292
            external_ip=external_ip,
293
            peer_port=self.external_peer_port,
294
            rpc_timeout=self.conf.node_rpc_timeout,
295
            split_buckets_under_index=self.conf.split_buckets_under_index,
296
            is_bootstrap_node=self.conf.is_bootstrap_node,
297
            storage=storage
298
        )
299
        self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes)
×
300
        log.info("Started the dht")
×
301

302
    async def stop(self):
1✔
303
        self.dht_node.stop()
×
304

305

306
class HashAnnouncerComponent(Component):
1✔
307
    component_name = HASH_ANNOUNCER_COMPONENT
1✔
308
    depends_on = [DHT_COMPONENT, DATABASE_COMPONENT]
1✔
309

310
    def __init__(self, component_manager):
1✔
311
        super().__init__(component_manager)
1✔
312
        self.hash_announcer: typing.Optional[BlobAnnouncer] = None
1✔
313

314
    @property
1✔
315
    def component(self) -> typing.Optional[BlobAnnouncer]:
1✔
316
        return self.hash_announcer
×
317

318
    async def start(self):
1✔
319
        storage = self.component_manager.get_component(DATABASE_COMPONENT)
×
320
        dht_node = self.component_manager.get_component(DHT_COMPONENT)
×
321
        self.hash_announcer = BlobAnnouncer(self.component_manager.loop, dht_node, storage)
×
322
        self.hash_announcer.start(self.conf.concurrent_blob_announcers)
×
323
        log.info("Started blob announcer")
×
324

325
    async def stop(self):
1✔
326
        self.hash_announcer.stop()
×
327
        log.info("Stopped blob announcer")
×
328

329
    async def get_status(self):
1✔
330
        return {
×
331
            'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.announce_queue)
332
        }
333

334

335
class FileManagerComponent(Component):
1✔
336
    component_name = FILE_MANAGER_COMPONENT
1✔
337
    depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
1✔
338

339
    def __init__(self, component_manager):
1✔
340
        super().__init__(component_manager)
1✔
341
        self.file_manager: typing.Optional[FileManager] = None
1✔
342

343
    @property
1✔
344
    def component(self) -> typing.Optional[FileManager]:
1✔
345
        return self.file_manager
×
346

347
    async def get_status(self):
1✔
348
        if not self.file_manager:
×
349
            return
×
350
        return {
×
351
            'managed_files': len(self.file_manager.get_filtered()),
352
        }
353

354
    async def start(self):
1✔
355
        blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
×
356
        storage = self.component_manager.get_component(DATABASE_COMPONENT)
×
357
        wallet = self.component_manager.get_component(WALLET_COMPONENT)
×
358
        node = self.component_manager.get_component(DHT_COMPONENT) \
×
359
            if self.component_manager.has_component(DHT_COMPONENT) else None
360
        log.info('Starting the file manager')
×
361
        loop = asyncio.get_event_loop()
×
362
        self.file_manager = FileManager(
×
363
            loop, self.conf, wallet, storage, self.component_manager.analytics_manager
364
        )
365
        self.file_manager.source_managers['stream'] = StreamManager(
×
366
            loop, self.conf, blob_manager, wallet, storage, node,
367
        )
368
        if self.component_manager.has_component(LIBTORRENT_COMPONENT):
×
369
            torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT)
×
370
            self.file_manager.source_managers['torrent'] = TorrentManager(
×
371
                loop, self.conf, torrent, storage, self.component_manager.analytics_manager
372
            )
373
        await self.file_manager.start()
×
374
        log.info('Done setting up file manager')
×
375

376
    async def stop(self):
1✔
377
        await self.file_manager.stop()
×
378

379

380
class BackgroundDownloaderComponent(Component):
1✔
381
    MIN_PREFIX_COLLIDING_BITS = 8
1✔
382
    component_name = BACKGROUND_DOWNLOADER_COMPONENT
1✔
383
    depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
1✔
384

385
    def __init__(self, component_manager):
1✔
386
        super().__init__(component_manager)
1✔
387
        self.background_task: typing.Optional[asyncio.Task] = None
1✔
388
        self.download_loop_delay_seconds = 60
1✔
389
        self.ongoing_download: typing.Optional[asyncio.Task] = None
1✔
390
        self.space_manager: typing.Optional[DiskSpaceManager] = None
1✔
391
        self.blob_manager: typing.Optional[BlobManager] = None
1✔
392
        self.background_downloader: typing.Optional[BackgroundDownloader] = None
1✔
393
        self.dht_node: typing.Optional[Node] = None
1✔
394
        self.space_available: typing.Optional[int] = None
1✔
395

396
    @property
1✔
397
    def is_busy(self):
1✔
398
        return bool(self.ongoing_download and not self.ongoing_download.done())
×
399

400
    @property
1✔
401
    def component(self) -> 'BackgroundDownloaderComponent':
1✔
402
        return self
×
403

404
    async def get_status(self):
1✔
405
        return {'running': self.background_task is not None and not self.background_task.done(),
×
406
                'available_free_space_mb': self.space_available,
407
                'ongoing_download': self.is_busy}
408

409
    async def download_blobs_in_background(self):
1✔
410
        while True:
411
            self.space_available = await self.space_manager.get_free_space_mb(True)
×
412
            if not self.is_busy and self.space_available > 10:
×
413
                self._download_next_close_blob_hash()
×
414
            await asyncio.sleep(self.download_loop_delay_seconds)
×
415

416
    def _download_next_close_blob_hash(self):
1✔
417
        node_id = self.dht_node.protocol.node_id
×
418
        for blob_hash in self.dht_node.stored_blob_hashes:
×
419
            if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
×
420
                continue
×
421
            if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
×
422
                self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
×
423
                return
×
424

425
    async def start(self):
1✔
426
        self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
×
427
        if not self.component_manager.has_component(DHT_COMPONENT):
×
428
            return
×
429
        self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
×
430
        self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
×
431
        storage = self.component_manager.get_component(DATABASE_COMPONENT)
×
432
        self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node)
×
433
        self.background_task = asyncio.create_task(self.download_blobs_in_background())
×
434

435
    async def stop(self):
1✔
436
        if self.ongoing_download and not self.ongoing_download.done():
×
437
            self.ongoing_download.cancel()
×
438
        if self.background_task:
×
439
            self.background_task.cancel()
×
440

441

442
class DiskSpaceComponent(Component):
1✔
443
    component_name = DISK_SPACE_COMPONENT
1✔
444
    depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
1✔
445

446
    def __init__(self, component_manager):
1✔
447
        super().__init__(component_manager)
1✔
448
        self.disk_space_manager: typing.Optional[DiskSpaceManager] = None
1✔
449

450
    @property
1✔
451
    def component(self) -> typing.Optional[DiskSpaceManager]:
1✔
452
        return self.disk_space_manager
×
453

454
    async def get_status(self):
1✔
455
        if self.disk_space_manager:
×
456
            space_used = await self.disk_space_manager.get_space_used_mb(cached=True)
×
457
            return {
×
458
                'total_used_mb': space_used['total'],
459
                'published_blobs_storage_used_mb': space_used['private_storage'],
460
                'content_blobs_storage_used_mb': space_used['content_storage'],
461
                'seed_blobs_storage_used_mb': space_used['network_storage'],
462
                'running': self.disk_space_manager.running,
463
            }
464
        return {'space_used': '0', 'network_seeding_space_used': '0', 'running': False}
×
465

466
    async def start(self):
1✔
467
        db = self.component_manager.get_component(DATABASE_COMPONENT)
×
468
        blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
×
469
        self.disk_space_manager = DiskSpaceManager(
×
470
            self.conf, db, blob_manager,
471
            analytics=self.component_manager.analytics_manager
472
        )
473
        await self.disk_space_manager.start()
×
474

475
    async def stop(self):
1✔
476
        await self.disk_space_manager.stop()
×
477

478

479
class TorrentComponent(Component):
1✔
480
    component_name = LIBTORRENT_COMPONENT
1✔
481

482
    def __init__(self, component_manager):
1✔
483
        super().__init__(component_manager)
1✔
484
        self.torrent_session = None
1✔
485

486
    @property
1✔
487
    def component(self) -> typing.Optional[TorrentSession]:
1✔
488
        return self.torrent_session
×
489

490
    async def get_status(self):
1✔
491
        if not self.torrent_session:
×
492
            return
×
493
        return {
×
494
            'running': True,  # TODO: what to return here?
495
        }
496

497
    async def start(self):
1✔
498
        self.torrent_session = TorrentSession(asyncio.get_event_loop(), None)
1✔
499
        await self.torrent_session.bind()  # TODO: specify host/port
1✔
500

501
    async def stop(self):
1✔
502
        if self.torrent_session:
1!
503
            await self.torrent_session.pause()
1✔
504

505

506
class PeerProtocolServerComponent(Component):
1✔
507
    component_name = PEER_PROTOCOL_SERVER_COMPONENT
1✔
508
    depends_on = [UPNP_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT]
1✔
509

510
    def __init__(self, component_manager):
1✔
511
        super().__init__(component_manager)
1✔
512
        self.blob_server: typing.Optional[BlobServer] = None
1✔
513

514
    @property
1✔
515
    def component(self) -> typing.Optional[BlobServer]:
1✔
516
        return self.blob_server
×
517

518
    async def start(self):
1✔
519
        log.info("start blob server")
×
520
        blob_manager: BlobManager = self.component_manager.get_component(BLOB_COMPONENT)
×
521
        wallet: WalletManager = self.component_manager.get_component(WALLET_COMPONENT)
×
522
        peer_port = self.conf.tcp_port
×
523
        address = await wallet.get_unused_address()
×
524
        self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address)
×
525
        self.blob_server.start_server(peer_port, interface=self.conf.network_interface)
×
526
        await self.blob_server.started_listening.wait()
×
527

528
    async def stop(self):
1✔
529
        if self.blob_server:
×
530
            self.blob_server.stop_server()
×
531

532

533
class UPnPComponent(Component):
1✔
534
    component_name = UPNP_COMPONENT
1✔
535

536
    def __init__(self, component_manager):
1✔
537
        super().__init__(component_manager)
1✔
538
        self._int_peer_port = self.conf.tcp_port
1✔
539
        self._int_dht_node_port = self.conf.udp_port
1✔
540
        self.use_upnp = self.conf.use_upnp
1✔
541
        self.upnp: typing.Optional[UPnP] = None
1✔
542
        self.upnp_redirects = {}
1✔
543
        self.external_ip: typing.Optional[str] = None
1✔
544
        self._maintain_redirects_task = None
1✔
545

546
    @property
1✔
547
    def component(self) -> 'UPnPComponent':
1✔
548
        return self
×
549

550
    async def _repeatedly_maintain_redirects(self, now=True):
1✔
551
        while True:
552
            if now:
×
553
                await self._maintain_redirects()
×
554
            await asyncio.sleep(360)
×
555

556
    async def _maintain_redirects(self):
1✔
557
        # setup the gateway if necessary
558
        if not self.upnp:
×
559
            try:
×
560
                self.upnp = await UPnP.discover(loop=self.component_manager.loop)
×
561
                log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
×
562
            except Exception as err:
×
563
                log.warning("upnp discovery failed: %s", err)
×
564
                self.upnp = None
×
565

566
        # update the external ip
567
        external_ip = None
×
568
        if self.upnp:
×
569
            try:
×
570
                external_ip = await self.upnp.get_external_ip()
×
571
                if external_ip != "0.0.0.0" and not self.external_ip:
×
572
                    log.info("got external ip from UPnP: %s", external_ip)
×
573
            except (asyncio.TimeoutError, UPnPError, NotImplementedError):
×
574
                pass
×
575
        if external_ip and not is_valid_public_ipv4(external_ip):
×
576
            log.warning("UPnP returned a private/reserved ip - %s, checking lbry.com fallback", external_ip)
×
577
            external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
×
578
        if self.external_ip and self.external_ip != external_ip:
×
579
            log.info("external ip changed from %s to %s", self.external_ip, external_ip)
×
580
        if external_ip:
×
581
            self.external_ip = external_ip
×
582
            dht_component = self.component_manager.get_component(DHT_COMPONENT)
×
583
            if dht_component:
×
584
                dht_node = dht_component.component
×
585
                dht_node.protocol.external_ip = external_ip
×
586
        # assert self.external_ip is not None   # TODO: handle going/starting offline
587

588
        if not self.upnp_redirects and self.upnp:  # setup missing redirects
×
589
            log.info("add UPnP port mappings")
×
590
            upnp_redirects = {}
×
591
            if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
×
592
                try:
×
593
                    upnp_redirects["TCP"] = await self.upnp.get_next_mapping(
×
594
                        self._int_peer_port, "TCP", "LBRY peer port", self._int_peer_port
595
                    )
596
                except (UPnPError, asyncio.TimeoutError, NotImplementedError):
×
597
                    pass
×
598
            if DHT_COMPONENT not in self.component_manager.skip_components:
×
599
                try:
×
600
                    upnp_redirects["UDP"] = await self.upnp.get_next_mapping(
×
601
                        self._int_dht_node_port, "UDP", "LBRY DHT port", self._int_dht_node_port
602
                    )
603
                except (UPnPError, asyncio.TimeoutError, NotImplementedError):
×
604
                    pass
×
605
            if upnp_redirects:
×
606
                log.info("set up redirects: %s", upnp_redirects)
×
607
                self.upnp_redirects.update(upnp_redirects)
×
608
        elif self.upnp:  # check existing redirects are still active
×
609
            found = set()
×
610
            mappings = await self.upnp.get_redirects()
×
611
            for mapping in mappings:
×
612
                proto = mapping.protocol
×
613
                if proto in self.upnp_redirects and mapping.external_port == self.upnp_redirects[proto]:
×
614
                    if mapping.lan_address == self.upnp.lan_address:
×
615
                        found.add(proto)
×
616
            if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
×
617
                try:
×
618
                    udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
×
619
                    self.upnp_redirects['UDP'] = udp_port
×
620
                    log.info("refreshed upnp redirect for dht port: %i", udp_port)
×
621
                except (asyncio.TimeoutError, UPnPError, NotImplementedError):
×
622
                    del self.upnp_redirects['UDP']
×
623
            if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
×
624
                try:
×
625
                    tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
×
626
                    self.upnp_redirects['TCP'] = tcp_port
×
627
                    log.info("refreshed upnp redirect for peer port: %i", tcp_port)
×
628
                except (asyncio.TimeoutError, UPnPError, NotImplementedError):
×
629
                    del self.upnp_redirects['TCP']
×
630
            if ('TCP' in self.upnp_redirects and
×
631
                    PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and \
632
                    ('UDP' in self.upnp_redirects and DHT_COMPONENT not in self.component_manager.skip_components):
633
                if self.upnp_redirects:
×
634
                    log.debug("upnp redirects are still active")
×
635

636
    async def start(self):
1✔
637
        log.info("detecting external ip")
×
638
        if not self.use_upnp:
×
639
            self.external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
×
640
            return
×
641
        success = False
×
642
        await self._maintain_redirects()
×
643
        if self.upnp:
×
644
            if not self.upnp_redirects and not all(
×
645
                    x in self.component_manager.skip_components
646
                    for x in (DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)
647
            ):
648
                log.error("failed to setup upnp")
×
649
            else:
650
                success = True
×
651
                if self.upnp_redirects:
×
652
                    log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
×
653
        else:
654
            log.error("failed to setup upnp")
×
655
        if not self.external_ip:
×
656
            self.external_ip, probed_url = await utils.get_external_ip(self.conf.lbryum_servers)
×
657
            if self.external_ip:
×
658
                log.info("detected external ip using %s fallback", probed_url)
×
659
        if self.component_manager.analytics_manager:
×
660
            self.component_manager.loop.create_task(
×
661
                self.component_manager.analytics_manager.send_upnp_setup_success_fail(
662
                    success, await self.get_status()
663
                )
664
            )
665
        self._maintain_redirects_task = self.component_manager.loop.create_task(
×
666
            self._repeatedly_maintain_redirects(now=False)
667
        )
668

669
    async def stop(self):
1✔
670
        if self.upnp_redirects:
×
671
            log.info("Removing upnp redirects: %s", self.upnp_redirects)
×
672
            await asyncio.wait([
×
673
                self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
674
            ])
675
        if self._maintain_redirects_task and not self._maintain_redirects_task.done():
×
676
            self._maintain_redirects_task.cancel()
×
677

678
    async def get_status(self):
1✔
679
        return {
×
680
            'aioupnp_version': aioupnp_version,
681
            'redirects': self.upnp_redirects,
682
            'gateway': 'No gateway found' if not self.upnp else self.upnp.gateway.manufacturer_string,
683
            'dht_redirect_set': 'UDP' in self.upnp_redirects,
684
            'peer_redirect_set': 'TCP' in self.upnp_redirects,
685
            'external_ip': self.external_ip
686
        }
687

688

689
class ExchangeRateManagerComponent(Component):
1✔
690
    component_name = EXCHANGE_RATE_MANAGER_COMPONENT
1✔
691

692
    def __init__(self, component_manager):
1✔
693
        super().__init__(component_manager)
1✔
694
        self.exchange_rate_manager = ExchangeRateManager()
1✔
695

696
    @property
1✔
697
    def component(self) -> ExchangeRateManager:
1✔
698
        return self.exchange_rate_manager
×
699

700
    async def start(self):
1✔
701
        self.exchange_rate_manager.start()
×
702

703
    async def stop(self):
1✔
704
        self.exchange_rate_manager.stop()
×
705

706

707
class TrackerAnnouncerComponent(Component):
1✔
708
    component_name = TRACKER_ANNOUNCER_COMPONENT
1✔
709
    depends_on = [FILE_MANAGER_COMPONENT]
1✔
710

711
    def __init__(self, component_manager):
1✔
712
        super().__init__(component_manager)
1✔
713
        self.file_manager = None
1✔
714
        self.announce_task = None
1✔
715
        self.tracker_client: typing.Optional[TrackerClient] = None
1✔
716

717
    @property
1✔
718
    def component(self):
1✔
719
        return self.tracker_client
×
720

721
    @property
1✔
722
    def running(self):
1✔
723
        return self._running and self.announce_task and not self.announce_task.done()
1✔
724

725
    async def announce_forever(self):
1✔
726
        while True:
727
            sleep_seconds = 60.0
1✔
728
            announce_sd_hashes = []
1✔
729
            for file in self.file_manager.get_filtered():
1!
730
                if not file.downloader:
×
731
                    continue
×
732
                announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
×
733
            await self.tracker_client.announce_many(*announce_sd_hashes)
1✔
734
            await asyncio.sleep(sleep_seconds)
×
735

736
    async def start(self):
1✔
737
        node = self.component_manager.get_component(DHT_COMPONENT) \
1✔
738
            if self.component_manager.has_component(DHT_COMPONENT) else None
739
        node_id = node.protocol.node_id if node else None
1✔
740
        self.tracker_client = TrackerClient(node_id, self.conf.tcp_port, lambda: self.conf.tracker_servers)
1✔
741
        await self.tracker_client.start()
1✔
742
        self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
1✔
743
        self.announce_task = asyncio.create_task(self.announce_forever())
1✔
744

745
    async def stop(self):
1✔
746
        self.file_manager = None
1✔
747
        if self.announce_task and not self.announce_task.done():
1!
748
            self.announce_task.cancel()
1✔
749
        self.announce_task = None
1✔
750
        self.tracker_client.stop()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc