• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.13
/lbry/file/file_manager.py
1
import asyncio
1✔
2
import logging
1✔
3
import typing
1✔
4
from typing import Optional
1✔
5
from aiohttp.web import Request
1✔
6
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError
1✔
7
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
1✔
8
from lbry.error import InvalidStreamURLError
1✔
9
from lbry.stream.managed_stream import ManagedStream
1✔
10
from lbry.torrent.torrent_manager import TorrentSource
1✔
11
from lbry.utils import cache_concurrent
1✔
12
from lbry.schema.url import URL
1✔
13
from lbry.wallet.dewies import dewies_to_lbc
1✔
14
from lbry.file.source_manager import SourceManager
1✔
15
from lbry.file.source import ManagedDownloadSource
1✔
16
from lbry.extras.daemon.storage import StoredContentClaim
1✔
17
if typing.TYPE_CHECKING:
1!
18
    from lbry.conf import Config
×
19
    from lbry.extras.daemon.analytics import AnalyticsManager
×
20
    from lbry.extras.daemon.storage import SQLiteStorage
×
21
    from lbry.wallet import WalletManager
×
22
    from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
×
23

24
log = logging.getLogger(__name__)
1✔
25

26

27
class FileManager:
1✔
28
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager',
1✔
29
                 storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
30
        self.loop = loop
1✔
31
        self.config = config
1✔
32
        self.wallet_manager = wallet_manager
1✔
33
        self.storage = storage
1✔
34
        self.analytics_manager = analytics_manager
1✔
35
        self.source_managers: typing.Dict[str, SourceManager] = {}
1✔
36
        self.started = asyncio.Event()
1✔
37

38
    @property
1✔
39
    def streams(self):
40
        return self.source_managers['stream']._sources
×
41

42
    async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource:
1✔
43
        if 'stream' in self.source_managers:
×
44
            return await self.source_managers['stream'].create(file_path, key, **kwargs)
×
45
        raise NotImplementedError
×
46

47
    async def start(self):
1✔
48
        await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
×
49
        for manager in self.source_managers.values():
×
50
            await manager.started.wait()
×
51
        self.started.set()
×
52

53
    def stop(self):
1✔
54
        for manager in self.source_managers.values():
×
55
            # fixme: pop or not?
56
            manager.stop()
×
57
        self.started.clear()
×
58

59
    @cache_concurrent
1✔
60
    async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
1✔
61
                                timeout: Optional[float] = None, file_name: Optional[str] = None,
62
                                download_directory: Optional[str] = None,
63
                                save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
64
                                wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
65

66
        wallet = wallet or self.wallet_manager.default_wallet
1✔
67
        timeout = timeout or self.config.download_timeout
1✔
68
        start_time = self.loop.time()
1✔
69
        resolved_time = None
1✔
70
        stream = None
1✔
71
        claim = None
1✔
72
        error = None
1✔
73
        outpoint = None
1✔
74
        if save_file is None:
1!
75
            save_file = self.config.save_files
1✔
76
        if file_name and not save_file:
1!
77
            save_file = True
×
78
        if save_file:
1!
79
            download_directory = download_directory or self.config.download_dir
1✔
80
        else:
81
            download_directory = None
×
82

83
        payment = None
1✔
84
        try:
1✔
85
            # resolve the claim
86
            try:
1✔
87
                if not URL.parse(uri).has_stream:
1!
88
                    raise InvalidStreamURLError(uri)
×
89
            except ValueError:
×
90
                raise InvalidStreamURLError(uri)
×
91
            try:
1✔
92
                resolved_result = await asyncio.wait_for(
1✔
93
                    self.wallet_manager.ledger.resolve(
94
                        wallet.accounts, [uri],
95
                        include_purchase_receipt=True,
96
                        include_is_my_output=True
97
                    ), resolve_timeout
98
                )
99
            except asyncio.TimeoutError:
×
100
                raise ResolveTimeoutError(uri)
×
101
            except Exception as err:
×
102
                if isinstance(err, asyncio.CancelledError):
×
103
                    raise
×
104
                log.exception("Unexpected error resolving stream:")
×
105
                raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
×
106
            if 'error' in resolved_result:
1!
107
                raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
×
108
            if not resolved_result or uri not in resolved_result:
1✔
109
                raise ResolveError(f"Failed to resolve stream at '{uri}'")
1✔
110
            txo = resolved_result[uri]
1✔
111
            if isinstance(txo, dict):
1!
112
                raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
×
113
            claim = txo.claim
1✔
114
            outpoint = f"{txo.tx_ref.id}:{txo.position}"
1✔
115
            resolved_time = self.loop.time() - start_time
1✔
116
            await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo)
1✔
117

118
            ####################
119
            # update or replace
120
            ####################
121

122
            if claim.stream.source.bt_infohash:
1!
123
                source_manager = self.source_managers['torrent']
×
124
                existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
×
125
            elif claim.stream.source.sd_hash:
1!
126
                source_manager = self.source_managers['stream']
1✔
127
                existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
1✔
128
            else:
129
                raise ResolveError(f"There is nothing to download at {uri} - Source is unknown or unset")
×
130

131
            # resume or update an existing stream, if the stream changed: download it and delete the old one after
132
            to_replace, updated_stream = None, None
1✔
133
            if existing and existing[0].claim_id != txo.claim_id:
1!
134
                raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
×
135
            if existing:
1!
136
                log.info("claim contains a metadata only update to a stream we have")
×
137
                if claim.stream.source.bt_infohash:
×
138
                    await self.storage.save_torrent_content_claim(
×
139
                        existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
140
                    )
141
                    claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
×
NEW
142
                    existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
×
143
                else:
144
                    await self.storage.save_content_claim(
×
145
                        existing[0].stream_hash, outpoint
146
                    )
147
                    await source_manager._update_content_claim(existing[0])
×
148
                updated_stream = existing[0]
×
149
            else:
150
                existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
1✔
151
                if existing_for_claim_id:
1!
152
                    log.info("claim contains an update to a stream we have, downloading it")
×
153
                    if save_file and existing_for_claim_id[0].output_file_exists:
×
154
                        save_file = False
×
155
                    if not claim.stream.source.bt_infohash:
×
156
                        existing_for_claim_id[0].downloader.node = source_manager.node
×
157
                    await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file)
×
158
                    if not existing_for_claim_id[0].output_file_exists and (
×
159
                            save_file or file_name or download_directory):
160
                        await existing_for_claim_id[0].save_file(
×
161
                            file_name=file_name, download_directory=download_directory
162
                        )
163
                    to_replace = existing_for_claim_id[0]
×
164

165
            # resume or update an existing stream, if the stream changed: download it and delete the old one after
166
            if updated_stream:
1!
167
                log.info("already have stream for %s", uri)
×
168
                if save_file and updated_stream.output_file_exists:
×
169
                    save_file = False
×
170
                if not claim.stream.source.bt_infohash:
×
171
                    updated_stream.downloader.node = source_manager.node
×
172
                await updated_stream.start(timeout=timeout, save_now=save_file)
×
173
                if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
×
174
                    await updated_stream.save_file(
×
175
                        file_name=file_name, download_directory=download_directory
176
                    )
177
                return updated_stream
×
178

179
            ####################
180
            # pay fee
181
            ####################
182

183
            needs_purchasing = (
1✔
184
                not to_replace and
185
                not txo.is_my_output and
186
                txo.has_price and
187
                not txo.purchase_receipt
188
            )
189

190
            if needs_purchasing:
1✔
191
                payment = await self.wallet_manager.create_purchase_transaction(
1✔
192
                    wallet.accounts, txo, exchange_rate_manager
193
                )
194

195
            ####################
196
            # make downloader and wait for start
197
            ####################
198
            # temporary with fields we know so downloader can start. Missing fields are populated later.
199
            stored_claim = StoredContentClaim(outpoint=outpoint, claim_id=txo.claim_id, name=txo.claim_name,
1✔
200
                                              amount=txo.amount, height=txo.tx_ref.height,
201
                                              serialized=claim.to_bytes().hex())
202

203
            if not claim.stream.source.bt_infohash:
1!
204
                # fixme: this shouldnt be here
205
                stream = ManagedStream(
1✔
206
                    self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
207
                    download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
208
                    analytics_manager=self.analytics_manager, claim=stored_claim
209
                )
210
                stream.downloader.node = source_manager.node
1✔
211
            else:
212
                stream = TorrentSource(
×
213
                    self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
214
                    file_name=file_name, download_directory=download_directory or self.config.download_dir,
215
                    status=ManagedStream.STATUS_RUNNING, claim=stored_claim, analytics_manager=self.analytics_manager,
216
                    torrent_session=source_manager.torrent_session
217
                )
218
            log.info("starting download for %s", uri)
1✔
219

220
            before_download = self.loop.time()
1✔
221
            await stream.start(timeout, save_file)
1✔
222

223
            ####################
224
            # success case: delete to_replace if applicable, broadcast fee payment
225
            ####################
226

227
            if to_replace:  # delete old stream now that the replacement has started downloading
1!
228
                await source_manager.delete(to_replace)
×
229

230
            if payment is not None:
1!
231
                await self.wallet_manager.broadcast_or_release(payment)
×
232
                payment = None  # to avoid releasing in `finally` later
×
233
                log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
×
234
                await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
×
235

236
            source_manager.add(stream)
1✔
237

238
            if not claim.stream.source.bt_infohash:
1!
239
                await self.storage.save_content_claim(stream.stream_hash, outpoint)
1✔
240
            else:
241
                await self.storage.save_torrent_content_claim(
×
242
                    stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
243
                )
244
                claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
×
NEW
245
                stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
×
246
            if save_file:
1!
247
                await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
1✔
248
            return stream
1✔
249
        except asyncio.TimeoutError:
1✔
250
            error = DownloadDataTimeoutError(stream.identifier)
1✔
251
            raise error
1✔
252
        except Exception as err:  # forgive data timeout, don't delete stream
1✔
253
            expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
1✔
254
                        KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
255
            if isinstance(err, expected):
1✔
256
                log.warning("Failed to download %s: %s", uri, str(err))
1✔
257
            elif isinstance(err, asyncio.CancelledError):
1!
258
                pass
×
259
            else:
260
                log.exception("Unexpected error downloading stream:")
1✔
261
            error = err
1✔
262
            raise
1✔
263
        finally:
264
            if payment is not None:
1!
265
                # payment is set to None after broadcasting, if we're here an exception probably happened
266
                await self.wallet_manager.ledger.release_tx(payment)
×
267
            if self.analytics_manager and claim and claim.stream.source.bt_infohash:
1!
268
                # TODO: analytics for torrents
269
                pass
×
270
            elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
1!
271
                                                                   stream.downloader.time_to_first_bytes))):
272
                server = self.wallet_manager.ledger.network.client.server
1✔
273
                self.loop.create_task(
1✔
274
                    self.analytics_manager.send_time_to_first_bytes(
275
                        resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
276
                        uri, outpoint,
277
                        None if not stream else len(stream.downloader.blob_downloader.active_connections),
278
                        None if not stream else len(stream.downloader.blob_downloader.scores),
279
                        None if not stream else len(stream.downloader.blob_downloader.connection_failures),
280
                        False if not stream else stream.downloader.added_fixed_peers,
281
                        self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
282
                        None if not stream else stream.sd_hash,
283
                        None if not stream else stream.downloader.time_to_descriptor,
284
                        None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
285
                        None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
286
                        None if not stream else stream.downloader.time_to_first_bytes,
287
                        None if not error else error.__class__.__name__,
288
                        None if not error else str(error),
289
                        None if not server else f"{server[0]}:{server[1]}"
290
                    )
291
                )
292

293
    async def stream_partial_content(self, request: Request, identifier: str):
1✔
NEW
294
        for source_manager in self.source_managers.values():
×
NEW
295
            if source_manager.get_filtered(identifier=identifier):
×
NEW
296
                return await source_manager.stream_partial_content(request, identifier)
×
297

298
    def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
1✔
299
        """
300
        Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers
301
        """
302
        result = last_error = None
1✔
303
        for manager in self.source_managers.values():
1✔
304
            try:
1✔
305
                result = (result or []) + manager.get_filtered(*args, **kwargs)
1✔
NEW
306
            except ValueError as error:
×
NEW
307
                last_error = error
×
308
        if result is not None:
1!
309
            return result
1✔
NEW
310
        raise last_error
×
311

312
    async def delete(self, source: ManagedDownloadSource, delete_file=False):
1✔
313
        for manager in self.source_managers.values():
×
314
            await manager.delete(source, delete_file)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc