• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.18
/lbry/torrent/torrent_manager.py
1
import asyncio
1✔
2
import logging
1✔
3
import os
1✔
4
import typing
1✔
5
from pathlib import Path
1✔
6
from typing import Optional
1✔
7
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
1✔
8

9
from lbry.error import DownloadMetadataTimeoutError
1✔
10
from lbry.file.source_manager import SourceManager
1✔
11
from lbry.file.source import ManagedDownloadSource
1✔
12
from lbry.schema.mime_types import guess_media_type
1✔
13

14
if typing.TYPE_CHECKING:
1!
15
    from lbry.torrent.session import TorrentSession
×
16
    from lbry.conf import Config
×
17
    from lbry.wallet.transaction import Transaction
×
18
    from lbry.extras.daemon.analytics import AnalyticsManager
×
19
    from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
×
20
    from lbry.extras.daemon.storage import StoredContentClaim
×
21

22
log = logging.getLogger(__name__)
1✔
23

24

25
class TorrentSource(ManagedDownloadSource):
1✔
26
    STATUS_STOPPED = "stopped"
1✔
27
    filter_fields = SourceManager.filter_fields
1✔
28
    filter_fields.update({
1✔
29
        'bt_infohash'
30
    })
31

32
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
1✔
33
                 file_name: Optional[str] = None, download_directory: Optional[str] = None,
34
                 status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None,
35
                 download_id: Optional[str] = None, rowid: Optional[int] = None,
36
                 content_fee: Optional['Transaction'] = None,
37
                 analytics_manager: Optional['AnalyticsManager'] = None,
38
                 added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None):
39
        super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
×
40
                         rowid, content_fee, analytics_manager, added_on)
41
        self.torrent_session = torrent_session
×
NEW
42
        self._suggested_file_name = None
×
NEW
43
        self._full_path = None
×
44

45
    @property
1✔
46
    def full_path(self) -> Optional[str]:
1✔
NEW
47
        if not self._full_path:
×
NEW
48
            self._full_path = self.select_path()
×
NEW
49
            self._file_name = os.path.basename(self._full_path)
×
NEW
50
        self.download_directory = self.torrent_session.save_path(self.identifier)
×
NEW
51
        return self._full_path
×
52

53
    def select_path(self):
1✔
NEW
54
        wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or ''
×
NEW
55
        wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
×
NEW
56
        if wanted_index is None:
×
57
            # maybe warn?
NEW
58
            largest = None
×
NEW
59
            for (path, size) in self.torrent_session.get_files(self.identifier).items():
×
NEW
60
                largest = (path, size) if not largest or size > largest[1] else largest
×
NEW
61
            return largest[0]
×
62
        else:
NEW
63
            return self.torrent_session.full_path(self.identifier, wanted_index or 0)
×
64

65
    @property
1✔
66
    def suggested_file_name(self):
NEW
67
        self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
×
NEW
68
        return self._suggested_file_name
×
69

70
    @property
1✔
71
    def mime_type(self) -> Optional[str]:
1✔
NEW
72
        return guess_media_type(os.path.basename(self.full_path))[0]
×
73

74
    async def setup(self, timeout: Optional[float] = None):
1✔
NEW
75
        try:
×
NEW
76
            metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory)
×
NEW
77
            await asyncio.wait_for(metadata_download, timeout, loop=self.loop)
×
NEW
78
        except asyncio.TimeoutError:
×
NEW
79
            self.torrent_session.remove_torrent(btih=self.identifier)
×
NEW
80
            raise DownloadMetadataTimeoutError(self.identifier)
×
NEW
81
        self.download_directory = self.torrent_session.save_path(self.identifier)
×
NEW
82
        self._file_name = os.path.basename(self.full_path)
×
83

84
    async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
1✔
NEW
85
        await self.setup(timeout)
×
NEW
86
        if not self.rowid:
×
NEW
87
            await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
×
NEW
88
            self.rowid = await self.storage.save_downloaded_file(
×
89
                self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
90
            )
91

92
    async def stop(self, finished: bool = False):
1✔
93
        await self.torrent_session.remove_torrent(self.identifier)
×
94

95
    async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
1✔
96
        await self.torrent_session.save_file(self.identifier, download_directory)
×
97

98
    @property
1✔
99
    def torrent_length(self):
NEW
100
        return self.torrent_session.get_total_size(self.identifier)
×
101

102
    @property
1✔
103
    def stream_length(self):
NEW
104
        return self.torrent_session.get_size(self.identifier, self.file_name)
×
105

106
    @property
1✔
107
    def written_bytes(self):
108
        return self.torrent_session.get_downloaded(self.identifier)
×
109

110
    @property
1✔
111
    def torrent_name(self):
112
        return self.torrent_session.get_name(self.identifier)
×
113

114
    @property
1✔
115
    def bt_infohash(self):
116
        return self.identifier
×
117

118
    def stop_tasks(self):
1✔
119
        pass
×
120

121
    @property
1✔
122
    def completed(self):
123
        return self.torrent_session.is_completed(self.identifier)
×
124

125
    @property
1✔
126
    def status(self):
NEW
127
        return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING
×
128

129
    async def stream_file(self, request):
1✔
NEW
130
        log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
×
131
                 self.identifier[:6])
NEW
132
        headers, start, end = self._prepare_range_response_headers(
×
133
            request.headers.get('range', 'bytes=0-')
134
        )
NEW
135
        target = self.suggested_file_name
×
NEW
136
        await self.start()
×
NEW
137
        response = StreamResponse(
×
138
            status=206,
139
            headers=headers
140
        )
NEW
141
        await response.prepare(request)
×
NEW
142
        while not os.path.exists(self.full_path):
×
NEW
143
            async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
×
NEW
144
                break
×
NEW
145
        with open(self.full_path, 'rb') as infile:
×
NEW
146
            infile.seek(start)
×
NEW
147
            async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
×
NEW
148
                if infile.tell() + read_size < end:
×
NEW
149
                    await response.write(infile.read(read_size))
×
150
                else:
NEW
151
                    await response.write_eof(infile.read(end - infile.tell() + 1))
×
NEW
152
        return response
×
153

154
    def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
1✔
NEW
155
        if '=' in get_range:
×
NEW
156
            get_range = get_range.split('=')[1]
×
NEW
157
        start, end = get_range.split('-')
×
NEW
158
        size = self.stream_length
×
159

NEW
160
        start = int(start)
×
NEW
161
        end = int(end) if end else size - 1
×
162

NEW
163
        if end >= size or not 0 <= start < size:
×
NEW
164
            raise HTTPRequestRangeNotSatisfiable()
×
165

NEW
166
        final_size = end - start + 1
×
NEW
167
        headers = {
×
168
            'Accept-Ranges': 'bytes',
169
            'Content-Range': f'bytes {start}-{end}/{size}',
170
            'Content-Length': str(final_size),
171
            'Content-Type': self.mime_type
172
        }
NEW
173
        return headers, start, end
×
174

175

176
class TorrentManager(SourceManager):
1✔
177
    _sources: typing.Dict[str, ManagedDownloadSource]
1✔
178

179
    filter_fields = set(SourceManager.filter_fields)
1✔
180
    filter_fields.update({
1✔
181
        'bt_infohash',
182
        'blobs_remaining',  # TODO: here they call them "parts", but its pretty much the same concept
183
        'blobs_in_stream'
184
    })
185

186
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession',
1✔
187
                 storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
188
        super().__init__(loop, config, storage, analytics_manager)
×
189
        self.torrent_session: 'TorrentSession' = torrent_session
×
190

191
    async def recover_streams(self, file_infos: typing.List[typing.Dict]):
1✔
192
        raise NotImplementedError
×
193

194
    async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
1✔
195
                           download_directory: Optional[str], status: str,
196
                           claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
197
                           added_on: Optional[int], **kwargs):
198
        stream = TorrentSource(
×
199
            self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
200
            download_directory=download_directory, status=status, claim=claim, rowid=rowid,
201
            content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on,
202
            torrent_session=self.torrent_session
203
        )
204
        self.add(stream)
×
NEW
205
        await stream.setup()
×
206

207
    async def initialize_from_database(self):
1✔
NEW
208
        for file in await self.storage.get_all_torrent_files():
×
NEW
209
            claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash'])
×
NEW
210
            file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None
×
NEW
211
            file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None
×
NEW
212
            await self._load_stream(claim=claim, **file)
×
213

214
    async def start(self):
1✔
215
        await super().start()
×
216

217
    def stop(self):
1✔
218
        super().stop()
×
219
        log.info("finished stopping the torrent manager")
×
220

221
    async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
1✔
222
        await super().delete(source, delete_file)
×
223
        self.torrent_session.remove_torrent(source.identifier, delete_file)
×
224

225
    async def create(self, file_path: str, key: Optional[bytes] = None,
1✔
226
                     iv_generator: Optional[typing.Generator[bytes, None, None]] = None):
227
        raise NotImplementedError
×
228

229
    async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
1✔
230
        raise NotImplementedError
×
231

232
    async def stream_partial_content(self, request: Request, identifier: str):
1✔
NEW
233
        return await self._sources[identifier].stream_file(request)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc