• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3788717008

pending completion
3788717008

Pull #3711

github

GitHub
Merge 69297ea9c into 625865165
Pull Request #3711: Bump to Python 3.9 attempt 3.

2802 of 6558 branches covered (42.73%)

Branch coverage included in aggregate %.

25 of 41 new or added lines in 17 files covered. (60.98%)

33 existing lines in 9 files now uncovered.

12281 of 19915 relevant lines covered (61.67%)

1.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.55
/lbry/torrent/torrent_manager.py
1
import asyncio
3✔
2
import binascii
3✔
3
import logging
3✔
4
import os
3✔
5
import typing
3✔
6
from typing import Optional
3✔
7
from aiohttp.web import Request
3✔
8
from lbry.file.source_manager import SourceManager
3✔
9
from lbry.file.source import ManagedDownloadSource
3✔
10

11
if typing.TYPE_CHECKING:
3!
12
    from lbry.torrent.session import TorrentSession
×
13
    from lbry.conf import Config
×
14
    from lbry.wallet.transaction import Transaction
×
15
    from lbry.extras.daemon.analytics import AnalyticsManager
×
16
    from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
×
17
    from lbry.extras.daemon.storage import StoredContentClaim
×
18

19
log = logging.getLogger(__name__)
3✔
20

21

22
def path_or_none(encoded_path) -> Optional[str]:
3✔
23
    if not encoded_path:
×
24
        return
×
25
    return binascii.unhexlify(encoded_path).decode()
×
26

27

28
class TorrentSource(ManagedDownloadSource):
3✔
29
    STATUS_STOPPED = "stopped"
3✔
30
    filter_fields = SourceManager.filter_fields
3✔
31
    filter_fields.update({
3✔
32
        'bt_infohash'
33
    })
34

35
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
3✔
36
                 file_name: Optional[str] = None, download_directory: Optional[str] = None,
37
                 status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None,
38
                 download_id: Optional[str] = None, rowid: Optional[int] = None,
39
                 content_fee: Optional['Transaction'] = None,
40
                 analytics_manager: Optional['AnalyticsManager'] = None,
41
                 added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None):
42
        super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
×
43
                         rowid, content_fee, analytics_manager, added_on)
44
        self.torrent_session = torrent_session
×
45

46
    @property
3✔
47
    def full_path(self) -> Optional[str]:
3✔
48
        full_path = self.torrent_session.full_path(self.identifier)
×
49
        self.download_directory = os.path.dirname(full_path)
×
50
        return full_path
×
51

52
    async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
3✔
53
        await self.torrent_session.add_torrent(self.identifier, self.download_directory)
×
54

55
    async def stop(self, finished: bool = False):
3✔
56
        await self.torrent_session.remove_torrent(self.identifier)
×
57

58
    async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
3✔
59
        await self.torrent_session.save_file(self.identifier, download_directory)
×
60

61
    @property
3✔
62
    def torrent_length(self):
3✔
63
        return self.torrent_session.get_size(self.identifier)
×
64

65
    @property
3✔
66
    def written_bytes(self):
3✔
67
        return self.torrent_session.get_downloaded(self.identifier)
×
68

69
    @property
3✔
70
    def torrent_name(self):
3✔
71
        return self.torrent_session.get_name(self.identifier)
×
72

73
    @property
3✔
74
    def bt_infohash(self):
3✔
75
        return self.identifier
×
76

77
    async def stop_tasks(self):
3✔
78
        pass
×
79

80
    @property
3✔
81
    def completed(self):
3✔
82
        return self.torrent_session.is_completed(self.identifier)
×
83

84

85
class TorrentManager(SourceManager):
3✔
86
    _sources: typing.Dict[str, ManagedDownloadSource]
3✔
87

88
    filter_fields = set(SourceManager.filter_fields)
3✔
89
    filter_fields.update({
3✔
90
        'bt_infohash',
91
        'blobs_remaining',  # TODO: here they call them "parts", but its pretty much the same concept
92
        'blobs_in_stream'
93
    })
94

95
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession',
3✔
96
                 storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
97
        super().__init__(loop, config, storage, analytics_manager)
×
98
        self.torrent_session: 'TorrentSession' = torrent_session
×
99

100
    async def recover_streams(self, file_infos: typing.List[typing.Dict]):
3✔
101
        raise NotImplementedError
×
102

103
    async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
3✔
104
                           download_directory: Optional[str], status: str,
105
                           claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
106
                           added_on: Optional[int]):
107
        stream = TorrentSource(
×
108
            self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
109
            download_directory=download_directory, status=status, claim=claim, rowid=rowid,
110
            content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on,
111
            torrent_session=self.torrent_session
112
        )
113
        self.add(stream)
×
114

115
    async def initialize_from_database(self):
3✔
116
        pass
×
117

118
    async def start(self):
3✔
119
        await super().start()
×
120

121
    async def stop(self):
3✔
NEW
122
        await super().stop()
×
UNCOV
123
        log.info("finished stopping the torrent manager")
×
124

125
    async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
3✔
126
        await super().delete(source, delete_file)
×
127
        self.torrent_session.remove_torrent(source.identifier, delete_file)
×
128

129
    async def create(self, file_path: str, key: Optional[bytes] = None,
3✔
130
                     iv_generator: Optional[typing.Generator[bytes, None, None]] = None):
131
        raise NotImplementedError
×
132

133
    async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
3✔
134
        raise NotImplementedError
×
135
        # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
136
        # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
137
        # await self.storage.delete_stream(source.descriptor)
138

139
    async def stream_partial_content(self, request: Request, sd_hash: str):
3✔
140
        raise NotImplementedError
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc