• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.19
/lbry/file/source.py
1
import os
1✔
2
import asyncio
1✔
3
import time
1✔
4
import typing
1✔
5
import logging
1✔
6
import binascii
1✔
7
from typing import Optional
1✔
8
from lbry.utils import generate_id
1✔
9
from lbry.extras.daemon.storage import StoredContentClaim
1✔
10

11
if typing.TYPE_CHECKING:
1!
12
    from lbry.conf import Config
×
13
    from lbry.extras.daemon.analytics import AnalyticsManager
×
14
    from lbry.wallet.transaction import Transaction
×
15
    from lbry.extras.daemon.storage import SQLiteStorage
×
16

17
log = logging.getLogger(__name__)
1✔
18

19

20
class ManagedDownloadSource:
1✔
21
    STATUS_RUNNING = "running"
1✔
22
    STATUS_STOPPED = "stopped"
1✔
23
    STATUS_FINISHED = "finished"
1✔
24

25
    SAVING_ID = 1
1✔
26
    STREAMING_ID = 2
1✔
27

28
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
1✔
29
                 file_name: Optional[str] = None, download_directory: Optional[str] = None,
30
                 status: Optional[str] = STATUS_STOPPED, claim: Optional[StoredContentClaim] = None,
31
                 download_id: Optional[str] = None, rowid: Optional[int] = None,
32
                 content_fee: Optional['Transaction'] = None,
33
                 analytics_manager: Optional['AnalyticsManager'] = None,
34
                 added_on: Optional[int] = None):
35
        self.loop = loop
1✔
36
        self.storage = storage
1✔
37
        self.config = config
1✔
38
        self.identifier = identifier
1✔
39
        self.download_directory = download_directory
1✔
40
        self._file_name = file_name
1✔
41
        self._status = status
1✔
42
        self.stream_claim_info = claim
1✔
43
        self.download_id = download_id or binascii.hexlify(generate_id()).decode()
1✔
44
        self.rowid = rowid
1✔
45
        self.content_fee = content_fee
1✔
46
        self.purchase_receipt = None
1✔
47
        self._added_on = added_on or int(time.time())
1✔
48
        self.analytics_manager = analytics_manager
1✔
49
        self.downloader = None
1✔
50

51
        self.saving = asyncio.Event()
1✔
52
        self.finished_writing = asyncio.Event()
1✔
53
        self.started_writing = asyncio.Event()
1✔
54
        self.finished_write_attempt = asyncio.Event()
1✔
55

56
    # @classmethod
57
    # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str,
58
    #                  key: Optional[bytes] = None,
59
    #                  iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
60
    #     raise NotImplementedError()
61

62
    async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
1✔
63
        raise NotImplementedError()
×
64

65
    async def stop(self, finished: bool = False):
1✔
66
        raise NotImplementedError()
×
67

68
    async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
1✔
69
        raise NotImplementedError()
×
70

71
    def stop_tasks(self):
1✔
72
        raise NotImplementedError()
×
73

74
    def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
1✔
75
        self.stream_claim_info = StoredContentClaim(
1✔
76
            f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
77
            claim_info['name'], claim_info['amount'], claim_info['height'],
78
            binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
79
            claim_info['claim_sequence'], claim_info.get('channel_name')
80
        )
81

82
    # async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
83
    #     if not claim_info:
84
    #         claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
85
    #     self.set_claim(claim_info, claim_info['value'])
86

87
    @property
1✔
88
    def file_name(self) -> Optional[str]:
1✔
89
        return self._file_name
×
90

91
    @property
1✔
92
    def added_on(self) -> Optional[int]:
1✔
93
        return self._added_on
×
94

95
    @property
1✔
96
    def suggested_file_name(self):
NEW
97
        return self._file_name
×
98

99
    @property
1✔
100
    def stream_name(self):
NEW
101
        return self.suggested_file_name
×
102

103
    @property
1✔
104
    def status(self) -> str:
1✔
105
        return self._status
1✔
106

107
    @property
1✔
108
    def completed(self):
109
        raise NotImplementedError()
×
110

111
    @property
1✔
112
    def stream_url(self):
NEW
113
        return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}"
×
114

115
    @property
1✔
116
    def finished(self) -> bool:
1✔
117
        return self.status == self.STATUS_FINISHED
1✔
118

119
    @property
1✔
120
    def running(self) -> bool:
1✔
121
        return self.status == self.STATUS_RUNNING
1✔
122

123
    @property
1✔
124
    def claim_id(self) -> Optional[str]:
1✔
125
        return None if not self.stream_claim_info else self.stream_claim_info.claim_id
1✔
126

127
    @property
1✔
128
    def txid(self) -> Optional[str]:
1✔
129
        return None if not self.stream_claim_info else self.stream_claim_info.txid
×
130

131
    @property
1✔
132
    def nout(self) -> Optional[int]:
1✔
133
        return None if not self.stream_claim_info else self.stream_claim_info.nout
×
134

135
    @property
1✔
136
    def outpoint(self) -> Optional[str]:
1✔
137
        return None if not self.stream_claim_info else self.stream_claim_info.outpoint
×
138

139
    @property
1✔
140
    def claim_height(self) -> Optional[int]:
1✔
141
        return None if not self.stream_claim_info else self.stream_claim_info.height
×
142

143
    @property
1✔
144
    def channel_claim_id(self) -> Optional[str]:
1✔
145
        return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
×
146

147
    @property
1✔
148
    def channel_name(self) -> Optional[str]:
1✔
149
        return None if not self.stream_claim_info else self.stream_claim_info.channel_name
×
150

151
    @property
1✔
152
    def claim_name(self) -> Optional[str]:
1✔
153
        return None if not self.stream_claim_info else self.stream_claim_info.claim_name
1✔
154

155
    @property
1✔
156
    def metadata(self) -> Optional[typing.Dict]:
1✔
157
        return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
×
158

159
    @property
1✔
160
    def metadata_protobuf(self) -> bytes:
1✔
161
        if self.stream_claim_info:
×
162
            return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
×
163

164
    @property
1✔
165
    def full_path(self) -> Optional[str]:
1✔
166
        return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
1✔
167
            if self.file_name and self.download_directory else None
168

169
    @property
1✔
170
    def output_file_exists(self):
171
        return os.path.isfile(self.full_path) if self.full_path else False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc