• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.59
/lbry/torrent/session.py
1
import asyncio
1✔
2
import binascii
1✔
3
import os
1✔
4
import logging
1✔
5
import random
1✔
6
from tempfile import mkdtemp
1✔
7
from typing import Optional, Tuple, Dict
1✔
8

9
import libtorrent
1✔
10

11

12
log = logging.getLogger(__name__)
1✔
13
DEFAULT_FLAGS = (  # fixme: somehow the logic here is inverted?
1✔
14
        libtorrent.add_torrent_params_flags_t.flag_auto_managed
15
        | libtorrent.add_torrent_params_flags_t.flag_update_subscribe
16
        | libtorrent.add_torrent_params_flags_t.flag_sequential_download
17
        | libtorrent.add_torrent_params_flags_t.flag_paused
18
)
19

20

21
class TorrentHandle:
1✔
22
    def __init__(self, loop, executor, handle):
1✔
23
        self._loop = loop
×
24
        self._executor = executor
×
25
        self._handle: libtorrent.torrent_handle = handle
×
26
        self.finished = asyncio.Event(loop=loop)
×
27
        self.metadata_completed = asyncio.Event(loop=loop)
×
NEW
28
        self.size = handle.status().total_wanted
×
29
        self.total_wanted_done = 0
×
30
        self.name = ''
×
31
        self.tasks = []
×
NEW
32
        self._torrent_info: libtorrent.torrent_info = handle.torrent_file()
×
33
        self._base_path = None
×
34

35
    @property
1✔
36
    def torrent_file(self) -> Optional[libtorrent.file_storage]:
1✔
NEW
37
        return self._torrent_info.files()
×
38

39
    def full_path_at(self, file_num) -> Optional[str]:
1✔
NEW
40
        if self.torrent_file is None:
×
41
            return None
×
NEW
42
        return os.path.join(self.save_path, self.torrent_file.file_path(file_num))
×
43

44
    def size_at(self, file_num) -> Optional[int]:
1✔
NEW
45
        if self.torrent_file is not None:
×
NEW
46
            return self.torrent_file.file_size(file_num)
×
47

48
    @property
1✔
49
    def save_path(self) -> Optional[str]:
1✔
NEW
50
        if not self._base_path:
×
NEW
51
            self._base_path = self._handle.status().save_path
×
NEW
52
        return self._base_path
×
53

54
    def index_from_name(self, file_name):
1✔
55
        for file_num in range(self.torrent_file.num_files()):
×
NEW
56
            if '.pad' in self.torrent_file.file_path(file_num):
×
NEW
57
                continue  # ignore padding files
×
NEW
58
            if file_name == os.path.basename(self.full_path_at(file_num)):
×
NEW
59
                return file_num
×
60

61
    def stop_tasks(self):
1✔
NEW
62
        self._handle.save_resume_data()
×
63
        while self.tasks:
×
64
            self.tasks.pop().cancel()
×
65

66
    def byte_range_to_piece_range(
1✔
67
            self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
NEW
68
        start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
×
NEW
69
        end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
×
NEW
70
        return start_piece, end_piece
×
71

72
    async def stream_range_as_completed(self, file_name, start, end):
1✔
NEW
73
        file_index = self.index_from_name(file_name)
×
NEW
74
        if file_index is None:
×
NEW
75
            raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
×
NEW
76
        first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
×
NEW
77
        start_piece_offset = first_piece.start
×
NEW
78
        piece_size = self._torrent_info.piece_length()
×
NEW
79
        log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
×
80
                 first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
NEW
81
        self.prioritize(file_index, start, end)
×
NEW
82
        for piece_index in range(first_piece.piece, final_piece.piece + 1):
×
NEW
83
            while not self._handle.have_piece(piece_index):
×
NEW
84
                log.info("Waiting for piece %d: %s", piece_index, self.name)
×
NEW
85
                self._handle.set_piece_deadline(piece_index, 0)
×
NEW
86
                await asyncio.sleep(0.2)
×
NEW
87
            log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
×
NEW
88
            yield piece_size - start_piece_offset
×
89

90
    def _show_status(self):
1✔
91
        # fixme: cleanup
92
        if not self._handle.is_valid():
×
93
            return
×
94
        status = self._handle.status()
×
NEW
95
        self._base_path = status.save_path
×
96
        if status.has_metadata:
×
97
            self.size = status.total_wanted
×
98
            self.total_wanted_done = status.total_wanted_done
×
99
            self.name = status.name
×
100
            if not self.metadata_completed.is_set():
×
101
                self.metadata_completed.set()
×
NEW
102
                self._torrent_info = self._handle.torrent_file()
×
103
                log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
×
NEW
104
        log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
×
105
                  status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
106
                  status.num_peers, status.num_seeds, status.state, status.save_path)
NEW
107
        if (status.is_finished or status.is_seeding) and not self.finished.is_set():
×
108
            self.finished.set()
×
109
            log.info("Torrent finished: %s", self.name)
×
110

111
    def prioritize(self, file_index, start, end, cleanup=False):
1✔
NEW
112
        first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end)
×
NEW
113
        priorities = self._handle.get_piece_priorities()
×
NEW
114
        priorities = [0 if cleanup else 1 for _ in priorities]
×
NEW
115
        self._handle.clear_piece_deadlines()
×
NEW
116
        for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)):
×
NEW
117
            priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1
×
NEW
118
            self._handle.set_piece_deadline(piece_number, idx)
×
NEW
119
        log.debug("Prioritizing pieces for %s: %s", self.name, priorities)
×
NEW
120
        self._handle.prioritize_pieces(priorities)
×
121

122
    async def status_loop(self):
1✔
123
        while True:
×
124
            self._show_status()
×
125
            if self.finished.is_set():
×
126
                break
×
127
            await asyncio.sleep(0.1)
×
128

129
    async def pause(self):
1✔
130
        await self._loop.run_in_executor(
×
131
            self._executor, self._handle.pause
132
        )
133

134
    async def resume(self):
1✔
135
        await self._loop.run_in_executor(
×
136
            self._executor, lambda: self._handle.resume()  # pylint: disable=unnecessary-lambda
137
        )
138

139

140
class TorrentSession:
1✔
141
    def __init__(self, loop, executor):
1✔
142
        self._loop = loop
1✔
143
        self._executor = executor
1✔
144
        self._session: Optional[libtorrent.session] = None
1✔
145
        self._handles: Dict[str, TorrentHandle] = {}
1✔
146
        self.tasks = []
1✔
147

148
    def add_peer(self, btih, addr, port):
1✔
NEW
149
        self._handles[btih]._handle.connect_peer((addr, port))
×
150

151
    async def add_fake_torrent(self, file_count=3):
1✔
152
        tmpdir = mkdtemp()
×
NEW
153
        info = _create_fake_torrent(tmpdir, file_count=file_count)
×
154
        flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
×
155
        handle = self._session.add_torrent({
×
156
            'ti': info, 'save_path': tmpdir, 'flags': flags
157
        })
NEW
158
        self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle)
×
NEW
159
        return str(info.info_hash())
×
160

161
    async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
1✔
162
        settings = {
1✔
163
            'listen_interfaces': f"{interface}:{port}",
164
            'enable_natpmp': False,
165
            'enable_upnp': False
166
        }
167
        self._session = await self._loop.run_in_executor(
1✔
168
            self._executor, libtorrent.session, settings  # pylint: disable=c-extension-no-member
169
        )
170
        self.tasks.append(self._loop.create_task(self.process_alerts()))
1✔
171

172
    def stop(self):
1✔
NEW
173
        while self._handles:
×
NEW
174
            self._handles.popitem()[1].stop_tasks()
×
175
        while self.tasks:
×
176
            self.tasks.pop().cancel()
×
NEW
177
        if self._session:
×
NEW
178
            self._session.save_state()
×
NEW
179
            self._session.pause()
×
NEW
180
            self._session = None
×
181

182
    def _pop_alerts(self):
1✔
183
        for alert in self._session.pop_alerts():
1!
184
            log.info("torrent alert: %s", alert)
×
185

186
    async def process_alerts(self):
1✔
187
        while True:
1✔
188
            await self._loop.run_in_executor(
1✔
189
                self._executor, self._pop_alerts
190
            )
191
            await asyncio.sleep(1)
1✔
192

193
    async def pause(self):
1✔
194
        await self._loop.run_in_executor(
1✔
195
            self._executor, lambda: self._session.save_state()  # pylint: disable=unnecessary-lambda
196
        )
197
        await self._loop.run_in_executor(
1✔
198
            self._executor, lambda: self._session.pause()  # pylint: disable=unnecessary-lambda
199
        )
200

201
    async def resume(self):
1✔
202
        await self._loop.run_in_executor(
×
203
            self._executor, self._session.resume
204
        )
205

206
    def _add_torrent(self, btih: str, download_directory: Optional[str]):
1✔
207
        params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
×
208
        if download_directory:
×
209
            params['save_path'] = download_directory
×
210
        handle = self._session.add_torrent(params)
×
211
        handle.force_dht_announce()
×
212
        self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
×
213

214
    def full_path(self, btih, file_num) -> Optional[str]:
1✔
NEW
215
        return self._handles[btih].full_path_at(file_num)
×
216

217
    def save_path(self, btih):
1✔
NEW
218
        return self._handles[btih].save_path
×
219

220
    def has_torrent(self, btih):
1✔
NEW
221
        return btih in self._handles
×
222

223
    async def add_torrent(self, btih, download_path):
1✔
NEW
224
        if btih in self._handles:
×
NEW
225
            return await self._handles[btih].metadata_completed.wait()
×
UNCOV
226
        await self._loop.run_in_executor(
×
227
            self._executor, self._add_torrent, btih, download_path
228
        )
229
        self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
×
230
        await self._handles[btih].metadata_completed.wait()
×
231

232
    def remove_torrent(self, btih, remove_files=False):
1✔
233
        if btih in self._handles:
×
234
            handle = self._handles[btih]
×
235
            handle.stop_tasks()
×
236
            self._session.remove_torrent(handle._handle, 1 if remove_files else 0)
×
237
            self._handles.pop(btih)
×
238

239
    async def save_file(self, btih, download_directory):
1✔
240
        handle = self._handles[btih]
×
241
        await handle.resume()
×
242

243
    def get_total_size(self, btih):
1✔
244
        return self._handles[btih].size
×
245

246
    def get_index_from_name(self, btih, file_name):
1✔
NEW
247
        return self._handles[btih].index_from_name(file_name)
×
248

249
    def get_size(self, btih, file_name) -> Optional[int]:
1✔
NEW
250
        for (path, size) in self.get_files(btih).items():
×
NEW
251
            if os.path.basename(path) == file_name:
×
NEW
252
                return size
×
253

254
    def get_name(self, btih):
1✔
255
        return self._handles[btih].name
×
256

257
    def get_downloaded(self, btih):
1✔
258
        return self._handles[btih].total_wanted_done
×
259

260
    def is_completed(self, btih):
1✔
261
        return self._handles[btih].finished.is_set()
×
262

263
    def stream_file(self, btih, file_name, start, end):
1✔
NEW
264
        handle = self._handles[btih]
×
NEW
265
        return handle.stream_range_as_completed(file_name, start, end)
×
266

267
    def get_files(self, btih) -> Dict:
1✔
NEW
268
        handle = self._handles[btih]
×
NEW
269
        return {
×
270
            self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
271
            for file_num in range(handle.torrent_file.num_files())
272
            if '.pad' not in handle.torrent_file.file_path(file_num)
273
        }
274

275

276
def get_magnet_uri(btih):
1✔
277
    return f"magnet:?xt=urn:btih:{btih}"
×
278

279

280
def _create_fake_torrent(tmpdir, file_count=3, largest_index=1):
1✔
281
    # layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size.
282
    # largest_index: which file index {0 ... file_count} will be the largest file
283
    file_storage = libtorrent.file_storage()
×
NEW
284
    subfolder = os.path.join(tmpdir, "subdir")
×
NEW
285
    os.mkdir(subfolder)
×
NEW
286
    for file_number in range(file_count):
×
NEW
287
        file_name = f"tmp{file_number}"
×
NEW
288
        with open(os.path.join(subfolder, file_name), 'wb') as myfile:
×
NEW
289
            size = myfile.write(
×
290
                bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024)
NEW
291
        file_storage.add_file(os.path.join("subdir", file_name), size)
×
NEW
292
    t = libtorrent.create_torrent(file_storage, 0, 0)
×
293
    libtorrent.set_piece_hashes(t, tmpdir)
×
NEW
294
    return libtorrent.torrent_info(t.generate())
×
295

296

297
async def main():
1✔
298
    if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"):
×
299
        os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent")
×
300
    if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"):
×
301
        os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso")
×
302

303
    btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c"
×
304

305
    executor = None
×
306
    session = TorrentSession(asyncio.get_event_loop(), executor)
×
NEW
307
    await session.bind()
×
NEW
308
    await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
×
309
    while True:
×
NEW
310
        session.full_path(btih, 0)
×
NEW
311
        await asyncio.sleep(1)
×
312
    await session.pause()
×
313
    executor.shutdown()
×
314

315

316
if __name__ == "__main__":
1!
NEW
317
    logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
×
NEW
318
    log = logging.getLogger(__name__)
×
UNCOV
319
    asyncio.run(main())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc