• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.26
/lbry/file/file_manager.py
1
import asyncio
1✔
2
import logging
1✔
3
import typing
1✔
4
from typing import Optional
1✔
5
from aiohttp.web import Request
1✔
6
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
1✔
7
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
1✔
8
from lbry.error import InvalidStreamURLError
1✔
9
from lbry.stream.managed_stream import ManagedStream
1✔
10
from lbry.torrent.torrent_manager import TorrentSource
1✔
11
from lbry.utils import cache_concurrent
1✔
12
from lbry.schema.url import URL
1✔
13
from lbry.wallet.dewies import dewies_to_lbc
1✔
14
from lbry.file.source_manager import SourceManager
1✔
15
from lbry.file.source import ManagedDownloadSource
1✔
16
from lbry.extras.daemon.storage import StoredContentClaim
1✔
17
if typing.TYPE_CHECKING:
1!
18
    from lbry.conf import Config
×
19
    from lbry.extras.daemon.analytics import AnalyticsManager
×
20
    from lbry.extras.daemon.storage import SQLiteStorage
×
21
    from lbry.wallet import WalletManager
×
22
    from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
×
23

24
log = logging.getLogger(__name__)
1✔
25

26

27
class FileManager:
1✔
28
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager',
1✔
29
                 storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
30
        self.loop = loop
1✔
31
        self.config = config
1✔
32
        self.wallet_manager = wallet_manager
1✔
33
        self.storage = storage
1✔
34
        self.analytics_manager = analytics_manager
1✔
35
        self.source_managers: typing.Dict[str, SourceManager] = {}
1✔
36
        self.started = asyncio.Event()
1✔
37

38
    @property
1✔
39
    def streams(self):
1✔
40
        return self.source_managers['stream']._sources
×
41

42
    async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource:
1✔
43
        if 'stream' in self.source_managers:
×
44
            return await self.source_managers['stream'].create(file_path, key, **kwargs)
×
45
        raise NotImplementedError
×
46

47
    async def start(self):
1✔
48
        await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
×
49
        for manager in self.source_managers.values():
×
50
            await manager.started.wait()
×
51
        self.started.set()
×
52

53
    async def stop(self):
1✔
54
        for manager in self.source_managers.values():
×
55
            # fixme: pop or not?
56
            await manager.stop()
×
57
        self.started.clear()
×
58

59
    @cache_concurrent
1✔
60
    async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
1✔
61
                                timeout: Optional[float] = None, file_name: Optional[str] = None,
62
                                download_directory: Optional[str] = None,
63
                                save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
64
                                wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
65

66
        wallet = wallet or self.wallet_manager.default_wallet
1✔
67
        timeout = timeout or self.config.download_timeout
1✔
68
        start_time = self.loop.time()
1✔
69
        resolved_time = None
1✔
70
        stream = None
1✔
71
        claim = None
1✔
72
        error = None
1✔
73
        outpoint = None
1✔
74
        if save_file is None:
1!
75
            save_file = self.config.save_files
1✔
76
        if file_name and not save_file:
1!
77
            save_file = True
×
78
        if save_file:
1!
79
            download_directory = download_directory or self.config.download_dir
1✔
80
        else:
81
            download_directory = None
×
82

83
        payment = None
1✔
84
        try:
1✔
85
            # resolve the claim
86
            try:
1✔
87
                if not URL.parse(uri).has_stream:
1!
88
                    raise InvalidStreamURLError(uri)
×
89
            except ValueError:
×
90
                raise InvalidStreamURLError(uri)
×
91
            try:
1✔
92
                resolved_result = await asyncio.wait_for(
1✔
93
                    self.wallet_manager.ledger.resolve(
94
                        wallet.accounts, [uri],
95
                        include_purchase_receipt=True,
96
                        include_is_my_output=True
97
                    ), resolve_timeout
98
                )
99
            except asyncio.TimeoutError:
×
100
                raise ResolveTimeoutError(uri)
×
101
            except Exception as err:
×
102
                log.exception("Unexpected error resolving stream:")
×
103
                raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
×
104
            if 'error' in resolved_result:
1!
105
                raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
×
106
            if not resolved_result or uri not in resolved_result:
1✔
107
                raise ResolveError(f"Failed to resolve stream at '{uri}'")
1✔
108
            txo = resolved_result[uri]
1✔
109
            if isinstance(txo, dict):
1!
110
                raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
×
111
            claim = txo.claim
1✔
112
            outpoint = f"{txo.tx_ref.id}:{txo.position}"
1✔
113
            resolved_time = self.loop.time() - start_time
1✔
114
            await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo)
1✔
115

116
            ####################
117
            # update or replace
118
            ####################
119

120
            if claim.stream.source.bt_infohash:
1!
121
                source_manager = self.source_managers['torrent']
×
122
                existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
×
123
            elif claim.stream.source.sd_hash:
1!
124
                source_manager = self.source_managers['stream']
1✔
125
                existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
1✔
126
            else:
127
                raise ResolveError(f"There is nothing to download at {uri} - Source is unknown or unset")
×
128

129
            # resume or update an existing stream, if the stream changed: download it and delete the old one after
130
            to_replace, updated_stream = None, None
1✔
131
            if existing and existing[0].claim_id != txo.claim_id:
1!
132
                raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
×
133
            if existing:
1!
134
                log.info("claim contains a metadata only update to a stream we have")
×
135
                if claim.stream.source.bt_infohash:
×
136
                    await self.storage.save_torrent_content_claim(
×
137
                        existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
138
                    )
139
                    claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
×
140
                    existing[0].set_claim(claim_info, claim)
×
141
                else:
142
                    await self.storage.save_content_claim(
×
143
                        existing[0].stream_hash, outpoint
144
                    )
145
                    await source_manager._update_content_claim(existing[0])
×
146
                updated_stream = existing[0]
×
147
            else:
148
                existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
1✔
149
                if existing_for_claim_id:
1!
150
                    log.info("claim contains an update to a stream we have, downloading it")
×
151
                    if save_file and existing_for_claim_id[0].output_file_exists:
×
152
                        save_file = False
×
153
                    if not claim.stream.source.bt_infohash:
×
154
                        existing_for_claim_id[0].downloader.node = source_manager.node
×
155
                    await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file)
×
156
                    if not existing_for_claim_id[0].output_file_exists and (
×
157
                            save_file or file_name or download_directory):
158
                        await existing_for_claim_id[0].save_file(
×
159
                            file_name=file_name, download_directory=download_directory
160
                        )
161
                    to_replace = existing_for_claim_id[0]
×
162

163
            # resume or update an existing stream, if the stream changed: download it and delete the old one after
164
            if updated_stream:
1!
165
                log.info("already have stream for %s", uri)
×
166
                if save_file and updated_stream.output_file_exists:
×
167
                    save_file = False
×
168
                if not claim.stream.source.bt_infohash:
×
169
                    updated_stream.downloader.node = source_manager.node
×
170
                await updated_stream.start(timeout=timeout, save_now=save_file)
×
171
                if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
×
172
                    await updated_stream.save_file(
×
173
                        file_name=file_name, download_directory=download_directory
174
                    )
175
                return updated_stream
×
176

177
            ####################
178
            # pay fee
179
            ####################
180

181
            needs_purchasing = (
1✔
182
                not to_replace and
183
                not txo.is_my_output and
184
                txo.has_price and
185
                not txo.purchase_receipt
186
            )
187

188
            if needs_purchasing:
1✔
189
                payment = await self.wallet_manager.create_purchase_transaction(
1✔
190
                    wallet.accounts, txo, exchange_rate_manager
191
                )
192

193
            ####################
194
            # make downloader and wait for start
195
            ####################
196
            # temporary with fields we know so downloader can start. Missing fields are populated later.
197
            stored_claim = StoredContentClaim(outpoint=outpoint, claim_id=txo.claim_id, name=txo.claim_name,
1✔
198
                                              amount=txo.amount, height=txo.tx_ref.height,
199
                                              serialized=claim.to_bytes().hex())
200

201
            if not claim.stream.source.bt_infohash:
1!
202
                # fixme: this shouldnt be here
203
                stream = ManagedStream(
1✔
204
                    self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
205
                    download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
206
                    analytics_manager=self.analytics_manager, claim=stored_claim
207
                )
208
                stream.downloader.node = source_manager.node
1✔
209
            else:
210
                stream = TorrentSource(
×
211
                    self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
212
                    file_name=file_name, download_directory=download_directory or self.config.download_dir,
213
                    status=ManagedStream.STATUS_RUNNING, claim=stored_claim, analytics_manager=self.analytics_manager,
214
                    torrent_session=source_manager.torrent_session
215
                )
216
            log.info("starting download for %s", uri)
1✔
217

218
            before_download = self.loop.time()
1✔
219
            await stream.start(timeout, save_file)
1✔
220

221
            ####################
222
            # success case: delete to_replace if applicable, broadcast fee payment
223
            ####################
224

225
            if to_replace:  # delete old stream now that the replacement has started downloading
1!
226
                await source_manager.delete(to_replace)
×
227

228
            if payment is not None:
1!
229
                await self.wallet_manager.broadcast_or_release(payment)
×
230
                payment = None  # to avoid releasing in `finally` later
×
231
                log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
×
232
                await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
×
233

234
            source_manager.add(stream)
1✔
235

236
            if not claim.stream.source.bt_infohash:
1!
237
                await self.storage.save_content_claim(stream.stream_hash, outpoint)
1✔
238
            else:
239
                await self.storage.save_torrent_content_claim(
×
240
                    stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
241
                )
242
                claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
×
243
                stream.set_claim(claim_info, claim)
×
244
            if save_file:
1!
245
                await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
1✔
246
            return stream
1✔
247
        except asyncio.TimeoutError:
1✔
248
            error = DownloadDataTimeoutError(stream.sd_hash)
1✔
249
            raise error
1✔
250
        except (Exception, asyncio.CancelledError) as err:  # forgive data timeout, don't delete stream
1✔
251
            expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
1✔
252
                        KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
253
            if isinstance(err, expected):
1✔
254
                log.warning("Failed to download %s: %s", uri, str(err))
1✔
255
            elif isinstance(err, asyncio.CancelledError):
1!
256
                pass
×
257
            else:
258
                log.exception("Unexpected error downloading stream:")
1✔
259
            error = err
1✔
260
            raise
1✔
261
        finally:
262
            if payment is not None:
1!
263
                # payment is set to None after broadcasting, if we're here an exception probably happened
264
                await self.wallet_manager.ledger.release_tx(payment)
×
265
            if self.analytics_manager and claim and claim.stream.source.bt_infohash:
1!
266
                # TODO: analytics for torrents
267
                pass
×
268
            elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
1!
269
                                                                   stream.downloader.time_to_first_bytes))):
270
                server = self.wallet_manager.ledger.network.client.server
1✔
271
                self.loop.create_task(
1!
272
                    self.analytics_manager.send_time_to_first_bytes(
273
                        resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
274
                        uri, outpoint,
275
                        None if not stream else len(stream.downloader.blob_downloader.active_connections),
276
                        None if not stream else len(stream.downloader.blob_downloader.scores),
277
                        None if not stream else len(stream.downloader.blob_downloader.connection_failures),
278
                        False if not stream else stream.downloader.added_fixed_peers,
279
                        self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
280
                        None if not stream else stream.sd_hash,
281
                        None if not stream else stream.downloader.time_to_descriptor,
282
                        None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
283
                        None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
284
                        None if not stream else stream.downloader.time_to_first_bytes,
285
                        None if not error else error.__class__.__name__,
286
                        None if not error else str(error),
287
                        None if not server else f"{server[0]}:{server[1]}"
288
                    )
289
                )
290

291
    async def stream_partial_content(self, request: Request, sd_hash: str):
1✔
292
        return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
×
293

294
    def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
1✔
295
        """
296
        Get a list of filtered and sorted ManagedStream objects
297

298
        :param sort_by: field to sort by
299
        :param reverse: reverse sorting
300
        :param comparison: comparison operator used for filtering
301
        :param search_by: fields and values to filter by
302
        """
303
        return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
1✔
304

305
    async def delete(self, source: ManagedDownloadSource, delete_file=False):
1✔
306
        for manager in self.source_managers.values():
×
307
            await manager.delete(source, delete_file)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc