• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.23
/lbry/stream/managed_stream.py
1
import os
1✔
2
import asyncio
1✔
3
import time
1✔
4
import typing
1✔
5
import logging
1✔
6
from typing import Optional
1✔
7
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
1✔
8
from lbry.error import DownloadSDTimeoutError
1✔
9
from lbry.schema.mime_types import guess_media_type
1✔
10
from lbry.stream.downloader import StreamDownloader
1✔
11
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
1✔
12
from lbry.stream.reflector.client import StreamReflectorClient
1✔
13
from lbry.extras.daemon.storage import StoredContentClaim
1✔
14
from lbry.blob import MAX_BLOB_SIZE
1✔
15
from lbry.file.source import ManagedDownloadSource
1✔
16

17
if typing.TYPE_CHECKING:
1!
18
    from lbry.conf import Config
×
19
    from lbry.blob.blob_manager import BlobManager
×
20
    from lbry.blob.blob_info import BlobInfo
×
21
    from lbry.extras.daemon.analytics import AnalyticsManager
×
22
    from lbry.wallet.transaction import Transaction
×
23

24
log = logging.getLogger(__name__)
1✔
25

26

27
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
1✔
28
    base_name, ext = os.path.splitext(os.path.basename(file_name))
1✔
29
    i = 0
1✔
30
    while os.path.isfile(os.path.join(download_directory, file_name)):
1!
31
        i += 1
×
32
        file_name = "%s_%i%s" % (base_name, i, ext)
×
33

34
    return file_name
1✔
35

36

37
async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str:
1✔
38
    return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
1✔
39

40

41
class ManagedStream(ManagedDownloadSource):
1✔
42
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
1✔
43
                 sd_hash: str, download_directory: Optional[str] = None, file_name: Optional[str] = None,
44
                 status: Optional[str] = ManagedDownloadSource.STATUS_STOPPED,
45
                 claim: Optional[StoredContentClaim] = None,
46
                 download_id: Optional[str] = None, rowid: Optional[int] = None,
47
                 descriptor: Optional[StreamDescriptor] = None,
48
                 content_fee: Optional['Transaction'] = None,
49
                 analytics_manager: Optional['AnalyticsManager'] = None,
50
                 added_on: Optional[int] = None):
51
        super().__init__(loop, config, blob_manager.storage, sd_hash, file_name, download_directory, status, claim,
1✔
52
                         download_id, rowid, content_fee, analytics_manager, added_on)
53
        self.blob_manager = blob_manager
1✔
54
        self.purchase_receipt = None
1✔
55
        self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
1✔
56
        self.analytics_manager = analytics_manager
1✔
57

58
        self.reflector_progress = 0
1✔
59
        self.uploading_to_reflector = False
1✔
60
        self.file_output_task: typing.Optional[asyncio.Task] = None
1✔
61
        self.delayed_stop_task: typing.Optional[asyncio.Task] = None
1✔
62
        self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
1✔
63
        self.fully_reflected = asyncio.Event()
1✔
64
        self.streaming = asyncio.Event()
1✔
65
        self._running = asyncio.Event()
1✔
66

67
    @property
1✔
68
    def sd_hash(self) -> str:
1✔
69
        return self.identifier
1✔
70

71
    @property
1✔
72
    def is_fully_reflected(self) -> bool:
1✔
73
        return self.fully_reflected.is_set()
1✔
74

75
    @property
1✔
76
    def descriptor(self) -> StreamDescriptor:
1✔
77
        return self.downloader.descriptor
1✔
78

79
    @property
1✔
80
    def stream_hash(self) -> str:
1✔
81
        return self.descriptor.stream_hash
1✔
82

83
    @property
1✔
84
    def file_name(self) -> Optional[str]:
1✔
85
        return self._file_name or self.suggested_file_name
1✔
86

87
    @property
1✔
88
    def suggested_file_name(self) -> Optional[str]:
1✔
89
        first_option = ((self.descriptor and self.descriptor.suggested_file_name) or '').strip()
1✔
90
        return sanitize_file_name(first_option or (self.stream_claim_info and self.stream_claim_info.claim and
1✔
91
                                                   self.stream_claim_info.claim.stream.source.name))
92

93
    @property
1✔
94
    def stream_name(self) -> Optional[str]:
1✔
95
        first_option = ((self.descriptor and self.descriptor.stream_name) or '').strip()
1✔
96
        return first_option or (self.stream_claim_info and self.stream_claim_info.claim and
1✔
97
                                self.stream_claim_info.claim.stream.source.name)
98

99
    @property
1✔
100
    def written_bytes(self) -> int:
1✔
101
        return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
1✔
102

103
    @property
1✔
104
    def completed(self):
1✔
105
        return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
1✔
106

107
    @property
1✔
108
    def stream_url(self):
1✔
109
        return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"
×
110

111
    async def update_status(self, status: str):
1✔
112
        assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
1✔
113
        self._status = status
1✔
114
        await self.blob_manager.storage.change_file_status(self.stream_hash, status)
1✔
115

116
    @property
1✔
117
    def blobs_completed(self) -> int:
1✔
118
        return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0
1✔
119
                    for b in self.descriptor.blobs[:-1]])
120

121
    @property
1✔
122
    def blobs_in_stream(self) -> int:
1✔
123
        return len(self.descriptor.blobs) - 1
×
124

125
    @property
1✔
126
    def blobs_remaining(self) -> int:
1✔
127
        return self.blobs_in_stream - self.blobs_completed
×
128

129
    @property
1✔
130
    def mime_type(self):
1✔
131
        return guess_media_type(os.path.basename(self.suggested_file_name))[0]
1✔
132

133
    @property
1✔
134
    def download_path(self):
1✔
135
        return f"{self.download_directory}/{self._file_name}" if self.download_directory and self._file_name else None
×
136

137
    # @classmethod
138
    # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config',
139
    #                  file_path: str, key: Optional[bytes] = None,
140
    #                  iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
141
    #     """
142
    #     Generate a stream from a file and save it to the db
143
    #     """
144
    #     descriptor = await StreamDescriptor.create_stream(
145
    #         loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
146
    #         blob_completed_callback=blob_manager.blob_completed
147
    #     )
148
    #     await blob_manager.storage.store_stream(
149
    #         blob_manager.get_blob(descriptor.sd_hash), descriptor
150
    #     )
151
    #     row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
152
    #                                                             os.path.dirname(file_path), 0)
153
    #     return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
154
    #                os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
155

156
    async def start(self, timeout: Optional[float] = None,
1✔
157
                    save_now: bool = False):
158
        timeout = timeout or self.config.download_timeout
1✔
159
        if self._running.is_set():
1✔
160
            return
1✔
161
        log.info("start downloader for stream (sd hash: %s)", self.sd_hash)
1✔
162
        self._running.set()
1✔
163
        try:
1✔
164
            await asyncio.wait_for(self.downloader.start(), timeout)
1✔
165
        except asyncio.TimeoutError:
1✔
166
            self._running.clear()
1✔
167
            raise DownloadSDTimeoutError(self.sd_hash)
1✔
168

169
        if self.delayed_stop_task and not self.delayed_stop_task.done():
1✔
170
            self.delayed_stop_task.cancel()
1✔
171
        self.delayed_stop_task = self.loop.create_task(self._delayed_stop())
1✔
172
        if not await self.blob_manager.storage.file_exists(self.sd_hash):
1✔
173
            if save_now:
1✔
174
                if not self._file_name:
1!
175
                    self._file_name = await get_next_available_file_name(
1✔
176
                        self.loop, self.download_directory,
177
                        self._file_name or sanitize_file_name(self.suggested_file_name)
178
                    )
179
                file_name, download_dir = self._file_name, self.download_directory
1✔
180
            else:
181
                file_name, download_dir = None, None
1✔
182
            self._added_on = int(time.time())
1✔
183
            self.rowid = await self.blob_manager.storage.save_downloaded_file(
1✔
184
                self.stream_hash, file_name, download_dir, 0.0, added_on=self._added_on
185
            )
186
        if self.status != self.STATUS_RUNNING:
1✔
187
            await self.update_status(self.STATUS_RUNNING)
1✔
188

189
    async def stop(self, finished: bool = False):
1✔
190
        """
191
        Stop any running save/stream tasks as well as the downloader and update the status in the database
192
        """
193

194
        await self.stop_tasks()
1✔
195
        if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
1✔
196
            await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
1✔
197

198
    async def _aiter_read_stream(self, start_blob_num: Optional[int] = 0, connection_id: int = 0)\
1✔
199
            -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]:
200
        if start_blob_num >= len(self.descriptor.blobs[:-1]):
1!
201
            raise IndexError(start_blob_num)
×
202
        for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
1✔
203
            assert i + start_blob_num == blob_info.blob_num
1✔
204
            if connection_id == self.STREAMING_ID:
1!
205
                decrypted = await self.downloader.cached_read_blob(blob_info)
×
206
            else:
207
                decrypted = await self.downloader.read_blob(blob_info, connection_id)
1✔
208
            yield (blob_info, decrypted)
1✔
209

210
    async def stream_file(self, request: Request) -> StreamResponse:
1✔
211
        log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
×
212
                 self.sd_hash[:6])
213
        headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
×
214
            request.headers.get('range', 'bytes=0-')
215
        )
216
        await self.start()
×
217
        response = StreamResponse(
×
218
            status=206,
219
            headers=headers
220
        )
221
        await response.prepare(request)
×
222
        self.streaming_responses.append((request, response))
×
223
        self.streaming.set()
×
224
        wrote = 0
×
225
        try:
×
226
            async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
×
227
                if not wrote:
×
228
                    decrypted = decrypted[first_blob_start_offset:]
×
229
                if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
×
230
                    decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * (MAX_BLOB_SIZE - 1))))
×
231
                    log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
×
232
                              len(self.descriptor.blobs) - 1)
233
                    await response.write_eof(decrypted)
×
234
                else:
235
                    log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
×
236
                    await response.write(decrypted)
×
237
                wrote += len(decrypted)
×
238
                log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
×
239
                         blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
240
                if response._eof_sent:
×
241
                    break
×
242
            return response
×
243
        except ConnectionResetError:
×
244
            log.warning("connection was reset after sending browser %i blob bytes", wrote)
×
245
            raise asyncio.CancelledError("range request transport was reset")
×
246
        finally:
247
            response.force_close()
×
248
            if (request, response) in self.streaming_responses:
×
249
                self.streaming_responses.remove((request, response))
×
250
            if not self.streaming_responses:
×
251
                self.streaming.clear()
×
252

253
    @staticmethod
1✔
254
    def _write_decrypted_blob(output_path: str, data: bytes):
1✔
255
        with open(output_path, 'ab') as handle:
1✔
256
            handle.write(data)
1✔
257
            handle.flush()
1✔
258

259
    async def _save_file(self, output_path: str):
1✔
260
        log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
1✔
261
                 output_path)
262
        self.saving.set()
1✔
263
        self.finished_write_attempt.clear()
1✔
264
        self.finished_writing.clear()
1✔
265
        self.started_writing.clear()
1✔
266
        try:
1✔
267
            open(output_path, 'wb').close()  # pylint: disable=consider-using-with
1✔
268
            async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
1!
269
                log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
1✔
270
                await self.loop.run_in_executor(None, self._write_decrypted_blob, output_path, decrypted)
1✔
271
                if not self.started_writing.is_set():
1✔
272
                    self.started_writing.set()
1✔
273
            await self.update_status(ManagedStream.STATUS_FINISHED)
1✔
274
            if self.analytics_manager:
1✔
275
                self.loop.create_task(self.analytics_manager.send_download_finished(
1✔
276
                    self.download_id, self.claim_name, self.sd_hash
277
                ))
278
            self.finished_writing.set()
1✔
279
            log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
1✔
280
                     self.sd_hash[:6], self.full_path)
281
            await self.blob_manager.storage.set_saved_file(self.stream_hash)
1✔
282
        except (Exception, asyncio.CancelledError) as err:
1✔
283
            if os.path.isfile(output_path):
1✔
284
                log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
1✔
285
                os.remove(output_path)
1✔
286
            if isinstance(err, asyncio.TimeoutError):
1✔
287
                self.downloader.stop()
1✔
288
                await self.blob_manager.storage.change_file_download_dir_and_file_name(
1✔
289
                    self.stream_hash, None, None
290
                )
291
                self._file_name, self.download_directory = None, None
1✔
292
                await self.blob_manager.storage.clear_saved_file(self.stream_hash)
1✔
293
                await self.update_status(self.STATUS_STOPPED)
1✔
294
                return
1✔
295
            elif not isinstance(err, asyncio.CancelledError):
1!
296
                log.exception("unexpected error encountered writing file for stream %s", self.sd_hash)
×
297
            raise err
1✔
298
        finally:
299
            self.saving.clear()
1✔
300
            self.finished_write_attempt.set()
1✔
301

302
    async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
1✔
303
        await self.start()
1✔
304
        if self.file_output_task and not self.file_output_task.done():  # cancel an already running save task
1!
305
            self.file_output_task.cancel()
×
306
        self.download_directory = download_directory or self.download_directory or self.config.download_dir
1✔
307
        if not self.download_directory:
1!
308
            raise ValueError("no directory to download to")
×
309
        if not (file_name or self._file_name or self.suggested_file_name):
1!
310
            raise ValueError("no file name to download to")
×
311
        if not os.path.isdir(self.download_directory):
1!
312
            log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
×
313
            os.mkdir(self.download_directory)
×
314
        self._file_name = await get_next_available_file_name(
1✔
315
            self.loop, self.download_directory,
316
            file_name or self._file_name or sanitize_file_name(self.suggested_file_name)
317
        )
318
        await self.blob_manager.storage.change_file_download_dir_and_file_name(
1✔
319
            self.stream_hash, self.download_directory, self.file_name
320
        )
321
        await self.update_status(ManagedStream.STATUS_RUNNING)
1✔
322
        self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
1✔
323
        try:
1✔
324
            await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
1✔
325
        except asyncio.TimeoutError:
1✔
326
            log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
×
327
            await self.stop_tasks()
×
328
            await self.update_status(ManagedStream.STATUS_STOPPED)
×
329

330
    async def stop_tasks(self):
1✔
331
        if self.file_output_task and not self.file_output_task.done():
1✔
332
            self.file_output_task.cancel()
1✔
333
            await asyncio.gather(self.file_output_task, return_exceptions=True)
1✔
334
        self.file_output_task = None
1✔
335
        while self.streaming_responses:
1!
336
            req, response = self.streaming_responses.pop()
×
337
            response.force_close()
×
338
            req.transport.close()
×
339
        self.downloader.stop()
1✔
340
        self._running.clear()
1✔
341

342
    async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
1✔
343
        sent = []
1✔
344
        protocol = StreamReflectorClient(self.blob_manager, self.descriptor)
1✔
345
        try:
1✔
346
            self.uploading_to_reflector = True
1✔
347
            await self.loop.create_connection(lambda: protocol, host, port)
1✔
348
            await protocol.send_handshake()
1✔
349
            sent_sd, needed = await protocol.send_descriptor()
1✔
350
            if sent_sd:  # reflector needed the sd blob
1✔
351
                sent.append(self.sd_hash)
1✔
352
            if not sent_sd and not needed:  # reflector already has the stream
1✔
353
                if not self.fully_reflected.is_set():
1✔
354
                    self.fully_reflected.set()
1✔
355
                    await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
1✔
356
                    return []
1✔
357
            we_have = [
1✔
358
                blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes
359
            ]
360
            log.info("we have %i/%i needed blobs needed by reflector for lbry://%s#%s", len(we_have), len(needed),
1✔
361
                     self.claim_name, self.claim_id)
362
            for i, blob_hash in enumerate(we_have):
1✔
363
                await protocol.send_blob(blob_hash)
1✔
364
                sent.append(blob_hash)
1✔
365
                self.reflector_progress = int((i + 1) / len(we_have) * 100)
1✔
366
        except (asyncio.TimeoutError, ValueError):
1!
367
            return sent
×
368
        except ConnectionError:
1!
369
            return sent
×
370
        except (OSError, Exception, asyncio.CancelledError) as err:
1✔
371
            if isinstance(err, asyncio.CancelledError):
1✔
372
                log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
1✔
373
            elif isinstance(err, OSError):
1✔
374
                log.warning(
1✔
375
                    "stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name,
376
                    self.claim_id
377
                )
378
            else:
379
                log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id)
1✔
380
            return sent
1✔
381
        finally:
382
            if protocol.transport:
1✔
383
                protocol.transport.close()
1✔
384
            self.uploading_to_reflector = False
1!
385

386
        return sent
1✔
387

388
    async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
1✔
389
        if not claim_info:
×
390
            claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
×
391
        self.set_claim(claim_info, claim_info['value'])
×
392

393
    async def _delayed_stop(self):
1✔
394
        stalled_count = 0
1✔
395
        while self._running.is_set():
1!
396
            if self.saving.is_set() or self.streaming.is_set():
1✔
397
                stalled_count = 0
1✔
398
            else:
399
                stalled_count += 1
1✔
400
            if stalled_count > 1:
1✔
401
                log.info("stopping inactive download for lbry://%s#%s (%s...)", self.claim_name, self.claim_id,
1✔
402
                         self.sd_hash[:6])
403
                await self.stop()
1✔
404
                return
1✔
405
            await asyncio.sleep(1)
1✔
406

407
    def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
1✔
408
        if '=' in get_range:
×
409
            get_range = get_range.split('=')[1]
×
410
        start, end = get_range.split('-')
×
411
        size = 0
×
412

413
        for blob in self.descriptor.blobs[:-1]:
×
414
            size += blob.length - 1
×
415
        if self.stream_claim_info and self.stream_claim_info.claim.stream.source.size:
×
416
            size_from_claim = int(self.stream_claim_info.claim.stream.source.size)
×
417
            if not size_from_claim <= size <= size_from_claim + 16:
×
418
                raise ValueError("claim contains implausible stream size")
×
419
            log.debug("using stream size from claim")
×
420
            size = size_from_claim
×
421
        elif self.stream_claim_info:
×
422
            log.debug("estimating stream size")
×
423

424
        start = int(start)
×
425
        if not 0 <= start < size:
×
426
            raise HTTPRequestRangeNotSatisfiable()
×
427

428
        end = int(end) if end else size - 1
×
429

430
        if end >= size:
×
431
            raise HTTPRequestRangeNotSatisfiable()
×
432

433
        skip_blobs = start // (MAX_BLOB_SIZE - 2)  # -2 because ... dont remember
×
434
        skip = skip_blobs * (MAX_BLOB_SIZE - 1)  # -1 because
×
435
        skip_first_blob = start - skip
×
436
        start = skip_first_blob + skip
×
437
        final_size = end - start + 1
×
438
        headers = {
×
439
            'Accept-Ranges': 'bytes',
440
            'Content-Range': f'bytes {start}-{end}/{size}',
441
            'Content-Length': str(final_size),
442
            'Content-Type': self.mime_type
443
        }
444
        return headers, size, skip_blobs, skip_first_blob
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc