• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.92
/lbry/stream/downloader.py
1
import asyncio
1✔
2
import typing
1✔
3
import logging
1✔
4
import binascii
1✔
5

6
from lbry.dht.node import get_kademlia_peers_from_hosts
1✔
7
from lbry.error import DownloadMetadataTimeoutError
1✔
8
from lbry.utils import lru_cache_concurrent
1✔
9
from lbry.stream.descriptor import StreamDescriptor
1✔
10
from lbry.blob_exchange.downloader import BlobDownloader
1✔
11
from lbry.torrent.tracker import enqueue_tracker_search
1✔
12

13
if typing.TYPE_CHECKING:
1!
14
    from lbry.conf import Config
×
15
    from lbry.dht.node import Node
×
16
    from lbry.blob.blob_manager import BlobManager
×
17
    from lbry.blob.blob_file import AbstractBlob
×
18
    from lbry.blob.blob_info import BlobInfo
×
19

20
log = logging.getLogger(__name__)
1✔
21

22

23
class StreamDownloader:
1✔
24
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str,
1✔
25
                 descriptor: typing.Optional[StreamDescriptor] = None):
26
        self.loop = loop
1✔
27
        self.config = config
1✔
28
        self.blob_manager = blob_manager
1✔
29
        self.sd_hash = sd_hash
1✔
30
        self.search_queue = asyncio.Queue()     # blob hashes to feed into the iterative finder
1✔
31
        self.peer_queue = asyncio.Queue()       # new peers to try
1✔
32
        self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
1✔
33
        self.descriptor: typing.Optional[StreamDescriptor] = descriptor
1✔
34
        self.node: typing.Optional['Node'] = None
1✔
35
        self.accumulate_task: typing.Optional[asyncio.Task] = None
1✔
36
        self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
1✔
37
        self.fixed_peers_delay: typing.Optional[float] = None
1✔
38
        self.added_fixed_peers = False
1✔
39
        self.time_to_descriptor: typing.Optional[float] = None
1✔
40
        self.time_to_first_bytes: typing.Optional[float] = None
1✔
41

42
        async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
1✔
43
            return await self.read_blob(blob_info, 2)
×
44

45
        if self.blob_manager.decrypted_blob_lru_cache is not None:
1!
46
            cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
1✔
47
                cached_read_blob
48
            )
49

50
        self.cached_read_blob = cached_read_blob
1✔
51

52
    async def add_fixed_peers(self):
1✔
53
        def _add_fixed_peers(fixed_peers):
1✔
54
            self.peer_queue.put_nowait(fixed_peers)
1✔
55
            self.added_fixed_peers = True
1✔
56

57
        if not self.config.fixed_peers:
1✔
58
            return
1✔
59
        if 'dht' in self.config.components_to_skip or not self.node or not \
1✔
60
                len(self.node.protocol.routing_table.get_peers()) > 0:
61
            self.fixed_peers_delay = 0.0
1✔
62
        else:
63
            self.fixed_peers_delay = self.config.fixed_peer_delay
1✔
64
        fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
1✔
65
        self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
1✔
66

67
    async def load_descriptor(self, connection_id: int = 0):
1✔
68
        # download or get the sd blob
69
        sd_blob = self.blob_manager.get_blob(self.sd_hash)
1✔
70
        if not sd_blob.get_is_verified():
1✔
71
            try:
1✔
72
                now = self.loop.time()
1✔
73
                sd_blob = await asyncio.wait_for(
1✔
74
                    self.blob_downloader.download_blob(self.sd_hash, connection_id),
75
                    self.config.blob_download_timeout
76
                )
77
                log.info("downloaded sd blob %s", self.sd_hash)
1✔
78
                self.time_to_descriptor = self.loop.time() - now
1✔
79
            except asyncio.TimeoutError:
1✔
NEW
80
                raise DownloadMetadataTimeoutError(self.sd_hash)
×
81

82
        # parse the descriptor
83
        self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
1✔
84
            self.loop, self.blob_manager.blob_dir, sd_blob
85
        )
86
        log.info("loaded stream manifest %s", self.sd_hash)
1✔
87

88
    async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0, save_stream=True):
1✔
89
        # set up peer accumulation
90
        self.node = node or self.node  # fixme: this shouldnt be set here!
1✔
91
        if self.node:
1✔
92
            if self.accumulate_task and not self.accumulate_task.done():
1!
93
                self.accumulate_task.cancel()
×
94
            _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
1✔
95
        await self.add_fixed_peers()
1✔
96
        enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue)
1✔
97
        # start searching for peers for the sd hash
98
        self.search_queue.put_nowait(self.sd_hash)
1✔
99
        log.info("searching for peers for stream %s", self.sd_hash)
1✔
100

101
        if not self.descriptor:
1✔
102
            await self.load_descriptor(connection_id)
1✔
103

104
        if not await self.blob_manager.storage.stream_exists(self.sd_hash) and save_stream:
1✔
105
            await self.blob_manager.storage.store_stream(
1✔
106
                self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
107
            )
108

109
    async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob':
1✔
110
        if not filter(lambda b: b.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
1!
111
            raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
×
112
        blob = await asyncio.wait_for(
1✔
113
            self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id),
114
            self.config.blob_download_timeout * 10
115
        )
116
        return blob
1✔
117

118
    def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob') -> bytes:
1✔
119
        return blob.decrypt(
1✔
120
            binascii.unhexlify(self.descriptor.key.encode()), binascii.unhexlify(blob_info.iv.encode())
121
        )
122

123
    async def read_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> bytes:
1✔
124
        start = None
1✔
125
        if self.time_to_first_bytes is None:
1✔
126
            start = self.loop.time()
1✔
127
        blob = await self.download_stream_blob(blob_info, connection_id)
1✔
128
        decrypted = self.decrypt_blob(blob_info, blob)
1✔
129
        if start:
1✔
130
            self.time_to_first_bytes = self.loop.time() - start
1✔
131
        return decrypted
1✔
132

133
    def stop(self):
1✔
134
        if self.accumulate_task:
1✔
135
            self.accumulate_task.cancel()
1✔
136
            self.accumulate_task = None
1✔
137
        if self.fixed_peers_handle:
1!
138
            self.fixed_peers_handle.cancel()
×
139
            self.fixed_peers_handle = None
×
140
        self.blob_downloader.close()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc