• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.62
/lbry/extras/daemon/storage.py
1
import os
1✔
2
import logging
1✔
3
import sqlite3
1✔
4
import typing
1✔
5
import asyncio
1✔
6
import binascii
1✔
7
import time
1✔
8
from operator import itemgetter
1✔
9
from typing import Optional
1✔
10
from lbry.wallet import SQLiteMixin
1✔
11
from lbry.conf import Config
1✔
12
from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies
1✔
13
from lbry.wallet.transaction import Transaction, Output
1✔
14
from lbry.schema.claim import Claim
1✔
15
from lbry.dht.constants import DATA_EXPIRATION
1✔
16
from lbry.blob.blob_info import BlobInfo
1✔
17

18
if typing.TYPE_CHECKING:
1!
19
    from lbry.blob.blob_file import BlobFile
×
20
    from lbry.stream.descriptor import StreamDescriptor
×
21

22
log = logging.getLogger(__name__)
1✔
23

24

25
def calculate_effective_amount(amount: str, supports: typing.Optional[typing.List[typing.Dict]] = None) -> str:
1✔
26
    return dewies_to_lbc(
1✔
27
        lbc_to_dewies(amount) + sum([lbc_to_dewies(support['amount']) for support in supports])
28
    )
29

30

31
class StoredContentClaim:
1✔
32
    def __init__(self, outpoint: Optional[str] = None, claim_id: Optional[str] = None, name: Optional[str] = None,
1✔
33
                 amount: Optional[int] = None, height: Optional[int] = None, serialized: Optional[str] = None,
34
                 channel_claim_id: Optional[str] = None, address: Optional[str] = None,
35
                 claim_sequence: Optional[int] = None, channel_name: Optional[str] = None):
36
        self.claim_id = claim_id
1✔
37
        self.outpoint = outpoint
1✔
38
        self.claim_name = name
1✔
39
        self.amount = amount
1✔
40
        self.height = height
1✔
41
        self.claim: typing.Optional[Claim] = None if not serialized else Claim.from_bytes(
1✔
42
            binascii.unhexlify(serialized)
43
        )
44
        self.claim_address = address
1✔
45
        self.claim_sequence = claim_sequence
1✔
46
        self.channel_claim_id = channel_claim_id
1✔
47
        self.channel_name = channel_name
1✔
48

49
    @property
1✔
50
    def txid(self) -> typing.Optional[str]:
1✔
51
        return None if not self.outpoint else self.outpoint.split(":")[0]
1✔
52

53
    @property
1✔
54
    def nout(self) -> typing.Optional[int]:
1✔
55
        return None if not self.outpoint else int(self.outpoint.split(":")[1])
1✔
56

57
    def as_dict(self) -> typing.Dict:
1✔
58
        return {
1✔
59
            "name": self.claim_name,
60
            "claim_id": self.claim_id,
61
            "address": self.claim_address,
62
            "claim_sequence": self.claim_sequence,
63
            "value": self.claim,
64
            "height": self.height,
65
            "amount": dewies_to_lbc(self.amount),
66
            "nout": self.nout,
67
            "txid": self.txid,
68
            "channel_claim_id": self.channel_claim_id,
69
            "channel_name": self.channel_name
70
        }
71

72

73
def _get_content_claims(transaction: sqlite3.Connection, query: str,
1✔
74
                        source_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
75
    claims = {}
1✔
76
    for claim_info in _batched_select(transaction, query, source_hashes):
1✔
77
        claims[claim_info[0]] = StoredContentClaim(*claim_info[1:])
1✔
78
    return claims
1✔
79

80

81
def get_claims_from_stream_hashes(transaction: sqlite3.Connection,
1✔
82
                                  stream_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
83
    query = (
1✔
84
        "select content_claim.stream_hash, c.*, case when c.channel_claim_id is not null then "
85
        "   (select claim_name from claim where claim_id==c.channel_claim_id) "
86
        "   else null end as channel_name "
87
        " from content_claim "
88
        " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash in {}"
89
        " order by c.rowid desc"
90
    )
91
    return _get_content_claims(transaction, query, stream_hashes)
1✔
92

93

94
def get_claims_from_torrent_info_hashes(transaction: sqlite3.Connection,
1✔
95
                                        info_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
96
    query = (
×
97
        "select content_claim.bt_infohash, c.*, case when c.channel_claim_id is not null then "
98
        "   (select claim_name from claim where claim_id==c.channel_claim_id) "
99
        "   else null end as channel_name "
100
        " from content_claim "
101
        " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.bt_infohash in {}"
102
        " order by c.rowid desc"
103
    )
104
    return _get_content_claims(transaction, query, info_hashes)
×
105

106

107
def _batched_select(transaction, query, parameters, batch_size=900):
1✔
108
    for start_index in range(0, len(parameters), batch_size):
1✔
109
        current_batch = parameters[start_index:start_index+batch_size]
1✔
110
        bind = "({})".format(','.join(['?'] * len(current_batch)))
1✔
111
        yield from transaction.execute(query.format(bind), current_batch)
1✔
112

113

114
def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
1✔
115
                               sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
116
                               raw_content_fee, fully_reflected):
117
    return {
1✔
118
        "rowid": rowid,
119
        "added_on": added_on,
120
        "stream_hash": stream_hash,
121
        "file_name": file_name,                      # hex
122
        "download_directory": download_dir,          # hex
123
        "blob_data_rate": data_rate,
124
        "status": status,
125
        "sd_hash": sd_hash,
126
        "key": stream_key,
127
        "stream_name": stream_name,                  # hex
128
        "suggested_file_name": suggested_file_name,  # hex
129
        "claim": claim,
130
        "saved_file": bool(saved_file),
131
        "content_fee": None if not raw_content_fee else Transaction(
132
            binascii.unhexlify(raw_content_fee)
133
        ),
134
        "fully_reflected": fully_reflected
135
    }
136

137

138
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
1✔
139
    files = []
1✔
140
    signed_claims = {}
1✔
141
    for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
1✔
142
         added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute(
143
             "select file.rowid, file.*, stream.*, c.*, "
144
             "  case when (SELECT 1 FROM reflected_stream r WHERE r.sd_hash=stream.sd_hash) "
145
             "      is null then 0 else 1 end as fully_reflected "
146
             "from file inner join stream on file.stream_hash=stream.stream_hash "
147
             "inner join content_claim cc on file.stream_hash=cc.stream_hash "
148
             "inner join claim c on cc.claim_outpoint=c.claim_outpoint "
149
             "order by c.rowid desc").fetchall():
150
        claim_args, fully_reflected = tuple(claim_args[:-1]), claim_args[-1]
1✔
151
        claim = StoredContentClaim(*claim_args)
1✔
152
        if claim.channel_claim_id:
1!
153
            if claim.channel_claim_id not in signed_claims:
×
154
                signed_claims[claim.channel_claim_id] = []
×
155
            signed_claims[claim.channel_claim_id].append(claim)
×
156
        files.append(
1✔
157
            _get_lbry_file_stream_dict(
158
                rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
159
                sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
160
                raw_content_fee, fully_reflected
161
            )
162
        )
163
    for claim_name, claim_id in _batched_select(
1!
164
            transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}",
165
            tuple(signed_claims.keys())):
166
        for claim in signed_claims[claim_id]:
×
167
            claim.channel_name = claim_name
×
168
    return files
1✔
169

170

171
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
1✔
172
    # add all blobs, except the last one, which is empty
173
    transaction.executemany(
1✔
174
        "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
175
        ((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0, blob.added_on, blob.is_mine)
176
         for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
177
    ).fetchall()
178
    # associate the blobs to the stream
179
    transaction.execute("insert or ignore into stream values (?, ?, ?, ?, ?)",
1✔
180
                        (descriptor.stream_hash, sd_blob.blob_hash, descriptor.key,
181
                         binascii.hexlify(descriptor.stream_name.encode()).decode(),
182
                         binascii.hexlify(descriptor.suggested_file_name.encode()).decode())).fetchall()
183
    # add the stream
184
    transaction.executemany(
1✔
185
        "insert or ignore into stream_blob values (?, ?, ?, ?)",
186
        ((descriptor.stream_hash, blob.blob_hash, blob.blob_num, blob.iv)
187
         for blob in descriptor.blobs)
188
    ).fetchall()
189
    # ensure should_announce is set regardless if insert was ignored
190
    transaction.execute(
1✔
191
        "update blob set should_announce=1 where blob_hash in (?)",
192
        (sd_blob.blob_hash,)
193
    ).fetchall()
194

195

196
def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'):
1✔
197
    blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]
1✔
198
    blob_hashes.append((descriptor.sd_hash, ))
1✔
199
    transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
1✔
200
    transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
1✔
201
    transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)).fetchall()
1✔
202
    transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
1✔
203
    transaction.executemany("delete from blob where blob_hash=?", blob_hashes).fetchall()
1✔
204

205

206
def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
1✔
207
    transaction.execute("delete from content_claim where bt_infohash=?", (bt_infohash, )).fetchall()
×
208
    transaction.execute("delete from torrent_tracker where bt_infohash=?", (bt_infohash,)).fetchall()
×
209
    transaction.execute("delete from torrent_node where bt_infohash=?", (bt_infohash,)).fetchall()
×
210
    transaction.execute("delete from torrent_http_seed where bt_infohash=?", (bt_infohash,)).fetchall()
×
211
    transaction.execute("delete from file where bt_infohash=?", (bt_infohash,)).fetchall()
×
212
    transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
×
213

214

215
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str],
1✔
216
               download_directory: typing.Optional[str], data_payment_rate: float, status: str,
217
               content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
218
    if not file_name and not download_directory:
1✔
219
        encoded_file_name, encoded_download_dir = None, None
1✔
220
    else:
221
        encoded_file_name = binascii.hexlify(file_name.encode()).decode()
1✔
222
        encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
1✔
223
    is_torrent = len(identifier_value) == 40
1✔
224
    time_added = added_on or int(time.time())
1✔
225
    transaction.execute(
1✔
226
        f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)",
227
        (identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status,
228
         1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
229
         None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
230
    ).fetchall()
231

232
    return transaction.execute(
1✔
233
        f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
234
        (identifier_value, )).fetchone()[0]
235

236

237
class SQLiteStorage(SQLiteMixin):
1✔
238
    CREATE_TABLES_QUERY = """
1✔
239
            pragma foreign_keys=on;
240
            pragma journal_mode=WAL;
241

242
            create table if not exists blob (
243
                blob_hash char(96) primary key not null,
244
                blob_length integer not null,
245
                next_announce_time integer not null,
246
                should_announce integer not null default 0,
247
                status text not null,
248
                last_announced_time integer,
249
                single_announce integer,
250
                added_on integer not null,
251
                is_mine integer not null default 0
252
            );
253

254
            create table if not exists stream (
255
                stream_hash char(96) not null primary key,
256
                sd_hash char(96) not null references blob,
257
                stream_key text not null,
258
                stream_name text not null,
259
                suggested_filename text not null
260
            );
261

262
            create table if not exists stream_blob (
263
                stream_hash char(96) not null references stream,
264
                blob_hash char(96) references blob,
265
                position integer not null,
266
                iv char(32) not null,
267
                primary key (stream_hash, blob_hash)
268
            );
269

270
            create table if not exists claim (
271
                claim_outpoint text not null primary key,
272
                claim_id char(40) not null,
273
                claim_name text not null,
274
                amount integer not null,
275
                height integer not null,
276
                serialized_metadata blob not null,
277
                channel_claim_id text,
278
                address text not null,
279
                claim_sequence integer not null
280
            );
281

282
            create table if not exists torrent (
283
                bt_infohash char(20) not null primary key,
284
                tracker text,
285
                length integer not null,
286
                name text not null
287
            );
288

289
            create table if not exists torrent_node ( -- BEP-0005
290
                bt_infohash char(20) not null references torrent,
291
                host text not null,
292
                port integer not null
293
            );
294

295
            create table if not exists torrent_tracker ( -- BEP-0012
296
                bt_infohash char(20) not null references torrent,
297
                tracker text not null
298
            );
299

300
            create table if not exists torrent_http_seed ( -- BEP-0017
301
                bt_infohash char(20) not null references torrent,
302
                http_seed text not null
303
            );
304

305
            create table if not exists file (
306
                stream_hash char(96) references stream,
307
                bt_infohash char(20) references torrent,
308
                file_name text,
309
                download_directory text,
310
                blob_data_rate real not null,
311
                status text not null,
312
                saved_file integer not null,
313
                content_fee text,
314
                added_on integer not null
315
            );
316

317
            create table if not exists content_claim (
318
                stream_hash char(96) references stream,
319
                bt_infohash char(20) references torrent,
320
                claim_outpoint text unique not null references claim
321
            );
322

323
            create table if not exists support (
324
                support_outpoint text not null primary key,
325
                claim_id text not null,
326
                amount integer not null,
327
                address text not null
328
            );
329

330
            create table if not exists reflected_stream (
331
                sd_hash text not null,
332
                reflector_address text not null,
333
                timestamp integer,
334
                primary key (sd_hash, reflector_address)
335
            );
336

337
            create table if not exists peer (
338
                node_id char(96) not null primary key,
339
                address text not null,
340
                udp_port integer not null,
341
                tcp_port integer,
342
                unique (address, udp_port)
343
            );
344
            create index if not exists blob_data on blob(blob_hash, blob_length, is_mine);
345
    """
346

347
    def __init__(self, conf: Config, path, loop=None, time_getter: typing.Optional[typing.Callable[[], float]] = None):
1✔
348
        super().__init__(path)
1✔
349
        self.conf = conf
1✔
350
        self.content_claim_callbacks = {}
1✔
351
        self.loop = loop or asyncio.get_event_loop()
1✔
352
        self.time_getter = time_getter or time.time
1✔
353

354
    async def run_and_return_one_or_none(self, query, *args):
1✔
355
        for row in await self.db.execute_fetchall(query, args):
1✔
356
            if len(row) == 1:
1!
357
                return row[0]
1✔
358
            return row
×
359

360
    async def run_and_return_list(self, query, *args):
1✔
361
        rows = list(await self.db.execute_fetchall(query, args))
1✔
362
        return [col[0] for col in rows] if rows else []
1✔
363

364
    # # # # # # # # # blob functions # # # # # # # # #
365

366
    async def add_blobs(self, *blob_hashes_and_lengths: typing.Tuple[str, int, int, int], finished=False):
1✔
367
        def _add_blobs(transaction: sqlite3.Connection):
1✔
368
            transaction.executemany(
1✔
369
                "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
370
                (
371
                    (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0, added_on, is_mine)
372
                    for blob_hash, length, added_on, is_mine in blob_hashes_and_lengths
373
                )
374
            ).fetchall()
375
            if finished:
1✔
376
                transaction.executemany(
1✔
377
                    "update blob set status='finished' where blob.blob_hash=?", (
378
                        (blob_hash, ) for blob_hash, _, _, _ in blob_hashes_and_lengths
379
                    )
380
                ).fetchall()
381
        return await self.db.run(_add_blobs)
1✔
382

383
    def get_blob_status(self, blob_hash: str):
1✔
384
        return self.run_and_return_one_or_none(
1✔
385
            "select status from blob where blob_hash=?", blob_hash
386
        )
387

388
    def set_announce(self, *blob_hashes):
1✔
389
        return self.db.execute_fetchall(
×
390
            "update blob set should_announce=1 where blob_hash in (?, ?)", blob_hashes
391
        )
392

393
    def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
1✔
394
        def _update_last_announced_blobs(transaction: sqlite3.Connection):
1✔
395
            last_announced = self.time_getter()
1✔
396
            return transaction.executemany(
1✔
397
                "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 "
398
                "where blob_hash=?",
399
                ((int(last_announced + (DATA_EXPIRATION / 2)), int(last_announced), blob_hash)
400
                 for blob_hash in blob_hashes)
401
            ).fetchall()
402
        return self.db.run(_update_last_announced_blobs)
1✔
403

404
    def should_single_announce_blobs(self, blob_hashes, immediate=False):
1✔
405
        def set_single_announce(transaction):
×
406
            now = int(self.time_getter())
×
407
            for blob_hash in blob_hashes:
×
408
                if immediate:
×
409
                    transaction.execute(
×
410
                        "update blob set single_announce=1, next_announce_time=? "
411
                        "where blob_hash=? and status='finished'", (int(now), blob_hash)
412
                    ).fetchall()
413
                else:
414
                    transaction.execute(
×
415
                        "update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash,)
416
                    ).fetchall()
417
        return self.db.run(set_single_announce)
×
418

419
    def get_blobs_to_announce(self):
1✔
420
        def get_and_update(transaction):
1✔
421
            timestamp = int(self.time_getter())
1✔
422
            if self.conf.announce_head_and_sd_only:
1!
423
                r = transaction.execute(
1✔
424
                    "select blob_hash from blob "
425
                    "where blob_hash is not null and "
426
                    "(should_announce=1 or single_announce=1) and next_announce_time<? and status='finished' "
427
                    "order by next_announce_time asc limit ?",
428
                    (timestamp, int(self.conf.concurrent_blob_announcers * 10))
429
                ).fetchall()
430
            else:
431
                r = transaction.execute(
×
432
                    "select blob_hash from blob where blob_hash is not null "
433
                    "and next_announce_time<? and status='finished' "
434
                    "order by next_announce_time asc limit ?",
435
                    (timestamp, int(self.conf.concurrent_blob_announcers * 10))
436
                ).fetchall()
437
            return [b[0] for b in r]
1✔
438
        return self.db.run(get_and_update)
1✔
439

440
    def delete_blobs_from_db(self, blob_hashes):
1✔
441
        def delete_blobs(transaction):
1✔
442
            transaction.executemany(
1✔
443
                "delete from blob where blob_hash=?;", ((blob_hash,) for blob_hash in blob_hashes)
444
            ).fetchall()
445
        return self.db.run_with_foreign_keys_disabled(delete_blobs)
1✔
446

447
    def get_all_blob_hashes(self):
1✔
448
        return self.run_and_return_list("select blob_hash from blob")
1✔
449

450
    async def get_stored_blobs(self, is_mine: bool, is_network_blob=False):
1✔
451
        is_mine = 1 if is_mine else 0
×
452
        if is_network_blob:
×
453
            return await self.db.execute_fetchall(
×
454
                "select blob.blob_hash, blob.blob_length, blob.added_on "
455
                "from blob left join stream_blob using (blob_hash) "
456
                "where stream_blob.stream_hash is null and blob.is_mine=? and blob.status='finished'"
457
                "order by blob.blob_length desc, blob.added_on asc",
458
                (is_mine,)
459
            )
460

461
        sd_blobs = await self.db.execute_fetchall(
×
462
            "select blob.blob_hash, blob.blob_length, blob.added_on "
463
            "from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) "
464
            "where blob.is_mine=? order by blob.added_on asc",
465
            (is_mine,)
466
        )
467
        content_blobs = await self.db.execute_fetchall(
×
468
            "select blob.blob_hash, blob.blob_length, blob.added_on "
469
            "from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)"
470
            "cross join file using (stream_hash)"
471
            "where blob.is_mine=? and blob.status='finished' order by blob.added_on asc, blob.blob_length asc",
472
            (is_mine,)
473
        )
474
        return content_blobs + sd_blobs
×
475

476
    async def get_stored_blob_disk_usage(self):
1✔
477
        total, network_size, content_size, private_size = await self.db.execute_fetchone("""
×
478
        select coalesce(sum(blob_length), 0) as total,
479
               coalesce(sum(case when
480
                   stream_blob.stream_hash is null
481
               then blob_length else 0 end), 0) as network_storage,
482
               coalesce(sum(case when
483
                   stream_blob.blob_hash is not null and is_mine=0
484
               then blob_length else 0 end), 0) as content_storage,
485
               coalesce(sum(case when
486
                   is_mine=1
487
               then blob_length else 0 end), 0) as private_storage
488
        from blob left join stream_blob using (blob_hash)
489
        where blob_hash not in (select sd_hash from stream) and blob.status="finished"
490
        """)
491
        return {
×
492
            'network_storage': network_size,
493
            'content_storage': content_size,
494
            'private_storage': private_size,
495
            'total': total
496
        }
497

498
    async def update_blob_ownership(self, sd_hash, is_mine: bool):
1✔
499
        is_mine = 1 if is_mine else 0
×
500
        await self.db.execute_fetchall(
×
501
            "update blob set is_mine = ? where blob_hash in ("
502
            "   select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?"
503
            ") OR blob_hash = ?", (is_mine, sd_hash, sd_hash)
504
        )
505

506
    def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
1✔
507
        def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
1✔
508
            finished_blob_hashes = tuple(
1✔
509
                blob_hash for (blob_hash, ) in transaction.execute(
510
                    "select blob_hash from blob where status='finished'"
511
                ).fetchall()
512
            )
513
            finished_blobs_set = set(finished_blob_hashes)
1✔
514
            to_update_set = finished_blobs_set.difference(blob_files)
1✔
515
            transaction.executemany(
1✔
516
                "update blob set status='pending' where blob_hash=?",
517
                ((blob_hash, ) for blob_hash in to_update_set)
518
            ).fetchall()
519
            return blob_files.intersection(finished_blobs_set)
1✔
520
        return self.db.run(_sync_blobs)
1✔
521

522
    # # # # # # # # # stream functions # # # # # # # # #
523

524
    async def stream_exists(self, sd_hash: str) -> bool:
1✔
525
        streams = await self.run_and_return_one_or_none("select stream_hash from stream where sd_hash=?", sd_hash)
1✔
526
        return streams is not None
1✔
527

528
    async def file_exists(self, sd_hash: str) -> bool:
1✔
529
        streams = await self.run_and_return_one_or_none("select f.stream_hash from file f "
1✔
530
                                                        "inner join stream s on "
531
                                                        "s.stream_hash=f.stream_hash and s.sd_hash=?", sd_hash)
532
        return streams is not None
1✔
533

534
    def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
1✔
535
        return self.db.run(store_stream, sd_blob, descriptor)
1✔
536

537
    def get_blobs_for_stream(self, stream_hash, only_completed=False) -> typing.Awaitable[typing.List[BlobInfo]]:
1✔
538
        def _get_blobs_for_stream(transaction):
1✔
539
            crypt_blob_infos = []
1✔
540
            stream_blobs = transaction.execute(
1✔
541
                "select s.blob_hash, s.position, s.iv, b.added_on "
542
                "from stream_blob s left outer join blob b on b.blob_hash=s.blob_hash where stream_hash=? "
543
                "order by position asc", (stream_hash, )
544
            ).fetchall()
545
            if only_completed:
1!
546
                lengths = transaction.execute(
×
547
                    "select b.blob_hash, b.blob_length from blob b "
548
                    "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished' and s.stream_hash=?",
549
                    (stream_hash, )
550
                ).fetchall()
551
            else:
552
                lengths = transaction.execute(
1✔
553
                    "select b.blob_hash, b.blob_length from blob b "
554
                    "inner join stream_blob s ON b.blob_hash=s.blob_hash and s.stream_hash=?",
555
                    (stream_hash, )
556
                ).fetchall()
557

558
            blob_length_dict = {}
1✔
559
            for blob_hash, length in lengths:
1✔
560
                blob_length_dict[blob_hash] = length
1✔
561

562
            current_time = time.time()
1✔
563
            for blob_hash, position, iv, added_on in stream_blobs:
1!
564
                blob_length = blob_length_dict.get(blob_hash, 0)
1✔
565
                crypt_blob_infos.append(BlobInfo(position, blob_length, iv, added_on or current_time, blob_hash))
1✔
566
                if not blob_hash:
1✔
567
                    break
1✔
568
            return crypt_blob_infos
1✔
569
        return self.db.run(_get_blobs_for_stream)
1✔
570

571
    def get_sd_blob_hash_for_stream(self, stream_hash):
1✔
572
        return self.run_and_return_one_or_none(
×
573
            "select sd_hash from stream where stream_hash=?", stream_hash
574
        )
575

576
    def get_stream_hash_for_sd_hash(self, sd_blob_hash):
1✔
577
        return self.run_and_return_one_or_none(
×
578
            "select stream_hash from stream where sd_hash = ?", sd_blob_hash
579
        )
580

581
    def delete_stream(self, descriptor: 'StreamDescriptor'):
1✔
582
        return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
1✔
583

584
    async def delete_torrent(self, bt_infohash: str):
1✔
585
        return await self.db.run(delete_torrent, bt_infohash)
×
586

587
    # # # # # # # # # file stuff # # # # # # # # #
588

589
    def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str],
1✔
590
                             download_directory: typing.Optional[str], data_payment_rate: float,
591
                             content_fee: typing.Optional[Transaction] = None,
592
                             added_on: typing.Optional[int] = None) -> typing.Awaitable[int]:
593
        return self.save_published_file(
1✔
594
            stream_hash, file_name, download_directory, data_payment_rate, status="running",
595
            content_fee=content_fee, added_on=added_on
596
        )
597

598
    def save_published_file(self, stream_hash: str, file_name: typing.Optional[str],
1✔
599
                            download_directory: typing.Optional[str], data_payment_rate: float,
600
                            status: str = "finished",
601
                            content_fee: typing.Optional[Transaction] = None,
602
                            added_on: typing.Optional[int] = None) -> typing.Awaitable[int]:
603
        return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status,
1✔
604
                           content_fee, added_on)
605

606
    async def update_manually_removed_files_since_last_run(self):
1✔
607
        """
608
        Update files that have been removed from the downloads directory since the last run
609
        """
610
        def update_manually_removed_files(transaction: sqlite3.Connection):
1✔
611
            files = {}
1✔
612
            query = "select stream_hash, download_directory, file_name from file where saved_file=1 " \
1✔
613
                    "and stream_hash is not null"
614
            for (stream_hash, download_directory, file_name) in transaction.execute(query).fetchall():
1✔
615
                if download_directory and file_name:
1!
616
                    files[stream_hash] = download_directory, file_name
1✔
617
            return files
1✔
618

619
        def detect_removed(files):
1✔
620
            return [
1✔
621
                stream_hash for stream_hash, (download_directory, file_name) in files.items()
622
                if not os.path.isfile(os.path.join(binascii.unhexlify(download_directory).decode(),
623
                                                   binascii.unhexlify(file_name).decode()))
624
            ]
625

626
        def update_db_removed(transaction: sqlite3.Connection, removed):
1✔
627
            query = "update file set file_name=null, download_directory=null, saved_file=0 where stream_hash in {}"
1✔
628
            for cur in _batched_select(transaction, query, removed):
1!
629
                cur.fetchall()
×
630

631
        stream_and_file = await self.db.run(update_manually_removed_files)
1✔
632
        removed = await self.loop.run_in_executor(None, detect_removed, stream_and_file)
1✔
633
        if removed:
1✔
634
            await self.db.run(update_db_removed, removed)
1✔
635

636
    def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
1✔
637
        return self.db.run(get_all_lbry_files)
1✔
638

639
    async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
1✔
NEW
640
        def _get_all_torrent_files(transaction):
×
NEW
641
            cursor = transaction.execute(
×
642
                "select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
NEW
643
            return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
×
NEW
644
        return list(await self.db.run(_get_all_torrent_files))
×
645

646
    def change_file_status(self, stream_hash: str, new_status: str):
1✔
647
        log.debug("update file status %s -> %s", stream_hash, new_status)
1✔
648
        return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
1✔
649

650
    def stop_all_files(self):
1✔
651
        log.debug("stopping all files")
×
652
        return self.db.execute_fetchall("update file set status=?", ("stopped",))
×
653

654
    async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
1✔
655
                                                     file_name: typing.Optional[str]):
656
        if not file_name or not download_dir:
1✔
657
            encoded_file_name, encoded_download_dir = None, None
1✔
658
        else:
659
            encoded_file_name = binascii.hexlify(file_name.encode()).decode()
1✔
660
            encoded_download_dir = binascii.hexlify(download_dir.encode()).decode()
1✔
661
        return await self.db.execute_fetchall("update file set download_directory=?, file_name=? where stream_hash=?", (
1✔
662
            encoded_download_dir, encoded_file_name, stream_hash,
663
        ))
664

665
    async def save_content_fee(self, stream_hash: str, content_fee: Transaction):
1✔
666
        return await self.db.execute_fetchall("update file set content_fee=? where stream_hash=?", (
×
667
            binascii.hexlify(content_fee.raw), stream_hash,
668
        ))
669

670
    async def set_saved_file(self, stream_hash: str):
1✔
671
        return await self.db.execute_fetchall("update file set saved_file=1 where stream_hash=?", (
1✔
672
            stream_hash,
673
        ))
674

675
    async def clear_saved_file(self, stream_hash: str):
1✔
676
        return await self.db.execute_fetchall("update file set saved_file=0 where stream_hash=?", (
1✔
677
            stream_hash,
678
        ))
679

680
    async def recover_streams(self, descriptors_and_sds: typing.List[typing.Tuple['StreamDescriptor', 'BlobFile',
1✔
681
                                                                                  typing.Optional[Transaction]]],
682
                              download_directory: str):
683
        def _recover(transaction: sqlite3.Connection):
1✔
684
            stream_hashes = [x[0].stream_hash for x in descriptors_and_sds]
1✔
685
            for descriptor, sd_blob, content_fee in descriptors_and_sds:
1✔
686
                content_claim = transaction.execute(
1✔
687
                    "select * from content_claim where stream_hash=?", (descriptor.stream_hash, )
688
                ).fetchone()
689
                delete_stream(transaction, descriptor)  # this will also delete the content claim
1✔
690
                store_stream(transaction, sd_blob, descriptor)
1✔
691
                store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name),
1✔
692
                           download_directory, 0.0, 'stopped', content_fee=content_fee)
693
                if content_claim:
1!
694
                    transaction.execute("insert or ignore into content_claim values (?, ?, ?)", content_claim)
1✔
695
            transaction.executemany(
1✔
696
                "update file set status='stopped' where stream_hash=?",
697
                ((stream_hash, ) for stream_hash in stream_hashes)
698
            ).fetchall()
699
            download_dir = binascii.hexlify(self.conf.download_dir.encode()).decode()
1✔
700
            transaction.executemany(
1✔
701
                "update file set download_directory=? where stream_hash=?",
702
                ((download_dir, stream_hash) for stream_hash in stream_hashes)
703
            ).fetchall()
704
        await self.db.run_with_foreign_keys_disabled(_recover)
1✔
705

706
    def get_all_stream_hashes(self):
1✔
707
        return self.run_and_return_list("select stream_hash from stream")
1✔
708

709
    # # # # # # # # # support functions # # # # # # # # #
710

711
    def save_supports(self, claim_id_to_supports: dict):
1✔
712
        # TODO: add 'address' to support items returned for a claim from lbrycrdd and lbryum-server
713
        def _save_support(transaction):
1✔
714
            bind = "({})".format(','.join(['?'] * len(claim_id_to_supports)))
1✔
715
            transaction.execute(
1✔
716
                f"delete from support where claim_id in {bind}", tuple(claim_id_to_supports.keys())
717
            ).fetchall()
718
            for claim_id, supports in claim_id_to_supports.items():
1✔
719
                for support in supports:
1✔
720
                    transaction.execute(
1✔
721
                        "insert into support values (?, ?, ?, ?)",
722
                        ("%s:%i" % (support['txid'], support['nout']), claim_id, lbc_to_dewies(support['amount']),
723
                         support.get('address', ""))
724
                    ).fetchall()
725
        return self.db.run(_save_support)
1✔
726

727
    def get_supports(self, *claim_ids):
1✔
728
        def _format_support(outpoint, supported_id, amount, address):
1✔
729
            return {
1✔
730
                "txid": outpoint.split(":")[0],
731
                "nout": int(outpoint.split(":")[1]),
732
                "claim_id": supported_id,
733
                "amount": dewies_to_lbc(amount),
734
                "address": address,
735
            }
736

737
        def _get_supports(transaction):
1✔
738
            return [
1✔
739
                _format_support(*support_info)
740
                for support_info in _batched_select(
741
                    transaction,
742
                    "select * from support where claim_id in {}",
743
                    claim_ids
744
                )
745
            ]
746

747
        return self.db.run(_get_supports)
1✔
748

749
    # # # # # # # # # claim functions # # # # # # # # #
750

751
    async def save_claims(self, claim_infos):
1✔
752
        claim_id_to_supports = {}
1✔
753
        update_file_callbacks = []
1✔
754

755
        def _save_claims(transaction):
1✔
756
            content_claims_to_update = []
1✔
757
            for claim_info in claim_infos:
1✔
758
                outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout'])
1✔
759
                claim_id = claim_info['claim_id']
1✔
760
                name = claim_info['name']
1✔
761
                amount = lbc_to_dewies(claim_info['amount'])
1✔
762
                height = claim_info['height']
1✔
763
                address = claim_info['address']
1✔
764
                sequence = claim_info['claim_sequence']
1✔
765
                certificate_id = claim_info['value'].signing_channel_id
1✔
766
                try:
1✔
767
                    source_hash = claim_info['value'].stream.source.sd_hash
1✔
768
                except (AttributeError, ValueError):
×
769
                    source_hash = None
×
770
                serialized = binascii.hexlify(claim_info['value'].to_bytes())
1✔
771
                transaction.execute(
1✔
772
                    "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
773
                    (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
774
                ).fetchall()
775
                # if this response doesn't have support info don't overwrite the existing
776
                # support info
777
                if 'supports' in claim_info:
1!
778
                    claim_id_to_supports[claim_id] = claim_info['supports']
×
779
                if not source_hash:
1!
780
                    continue
×
781
                stream_hash = transaction.execute(
1✔
782
                    "select file.stream_hash from stream "
783
                    "inner join file on file.stream_hash=stream.stream_hash where sd_hash=?", (source_hash,)
784
                ).fetchone()
785
                if not stream_hash:
1!
786
                    continue
1✔
787
                stream_hash = stream_hash[0]
×
788
                known_outpoint = transaction.execute(
×
789
                    "select claim_outpoint from content_claim where stream_hash=?", (stream_hash,)
790
                ).fetchone()
791
                known_claim_id = transaction.execute(
×
792
                    "select claim_id from claim "
793
                    "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
794
                    "where c3.stream_hash=?", (stream_hash,)
795
                ).fetchone()
796
                if not known_claim_id:
×
797
                    content_claims_to_update.append((stream_hash, outpoint))
×
798
                elif known_outpoint != outpoint:
×
799
                    content_claims_to_update.append((stream_hash, outpoint))
×
800
            for stream_hash, outpoint in content_claims_to_update:
1!
801
                self._save_content_claim(transaction, outpoint, stream_hash)
×
802
                if stream_hash in self.content_claim_callbacks:
×
803
                    update_file_callbacks.append(self.content_claim_callbacks[stream_hash]())
×
804

805
        await self.db.run(_save_claims)
1✔
806
        if update_file_callbacks:
1!
807
            await asyncio.wait(update_file_callbacks)
×
808
        if claim_id_to_supports:
1!
809
            await self.save_supports(claim_id_to_supports)
×
810

811
    def save_claim_from_output(self, ledger, *outputs: Output):
1✔
812
        return self.save_claims([{
1✔
813
            "claim_id": output.claim_id,
814
            "name": output.claim_name,
815
            "amount": dewies_to_lbc(output.amount),
816
            "address": output.get_address(ledger),
817
            "txid": output.tx_ref.id,
818
            "nout": output.position,
819
            "value": output.claim,
820
            "height": output.tx_ref.height,
821
            "claim_sequence": -1,
822
        } for output in outputs])
823

824
    def save_claims_for_resolve(self, claim_infos):
1✔
825
        to_save = {}
×
826
        for info in claim_infos:
×
827
            if 'value' in info:
×
828
                if info['value']:
×
829
                    to_save[info['claim_id']] = info
×
830
            else:
831
                for key in ('certificate', 'claim'):
×
832
                    if info.get(key, {}).get('value'):
×
833
                        to_save[info[key]['claim_id']] = info[key]
×
834
        return self.save_claims(to_save.values())
×
835

836
    @staticmethod
1✔
837
    def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None):
1✔
838
        assert stream_hash or bt_infohash
1✔
839
        # get the claim id and serialized metadata
840
        claim_info = transaction.execute(
1✔
841
            "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
842
        ).fetchone()
843
        if not claim_info:
1!
844
            raise Exception("claim not found")
×
845
        new_claim_id, claim = claim_info[0], Claim.from_bytes(binascii.unhexlify(claim_info[1]))
1✔
846

847
        # certificate claims should not be in the content_claim table
848
        if not claim.is_stream:
1!
849
            raise Exception("claim does not contain a stream")
×
850

851
        # get the known sd hash for this stream
852
        known_sd_hash = transaction.execute(
1✔
853
            "select sd_hash from stream where stream_hash=?", (stream_hash,)
854
        ).fetchone()
855
        if not known_sd_hash:
1!
856
            raise Exception("stream not found")
×
857
        # check the claim contains the same sd hash
858
        if known_sd_hash[0] != claim.stream.source.sd_hash:
1!
859
            raise Exception("stream mismatch")
×
860

861
        # if there is a current claim associated to the file, check that the new claim is an update to it
862
        current_associated_content = transaction.execute(
1✔
863
            "select claim_outpoint from content_claim where stream_hash=?", (stream_hash,)
864
        ).fetchone()
865
        if current_associated_content:
1!
866
            current_associated_claim_id = transaction.execute(
×
867
                "select claim_id from claim where claim_outpoint=?", current_associated_content
868
            ).fetchone()[0]
869
            if current_associated_claim_id != new_claim_id:
×
870
                raise Exception(
×
871
                    f"mismatching claim ids when updating stream {current_associated_claim_id} vs {new_claim_id}"
872
                )
873

874
        # update the claim associated to the file
875
        transaction.execute("delete from content_claim where stream_hash=?", (stream_hash, )).fetchall()
1✔
876
        transaction.execute(
1✔
877
            "insert into content_claim values (?, NULL, ?)", (stream_hash, claim_outpoint)
878
        ).fetchall()
879

880
    async def save_content_claim(self, stream_hash, claim_outpoint):
1✔
881
        await self.db.run(self._save_content_claim, claim_outpoint, stream_hash)
1✔
882
        # update corresponding ManagedEncryptedFileDownloader object
883
        if stream_hash in self.content_claim_callbacks:
1!
884
            await self.content_claim_callbacks[stream_hash]()
1✔
885

886
    async def add_torrent(self, bt_infohash, length, name):
1✔
NEW
887
        def _save_torrent(transaction, bt_infohash, length, name):
×
UNCOV
888
            transaction.execute(
×
889
                "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
890
            ).fetchall()
NEW
891
        return await self.db.run(_save_torrent, bt_infohash, length, name)
×
892

893
    async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
1✔
NEW
894
        def _save_torrent_claim(transaction):
×
UNCOV
895
            transaction.execute(
×
896
                "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
897
            ).fetchall()
NEW
898
        await self.add_torrent(bt_infohash, length, name)
×
NEW
899
        await self.db.run(_save_torrent_claim)
×
900
        # update corresponding ManagedEncryptedFileDownloader object
901
        if bt_infohash in self.content_claim_callbacks:
×
902
            await self.content_claim_callbacks[bt_infohash]()
×
903

904
    async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict:
1✔
905
        claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash])
1✔
906
        claim = None
1✔
907
        if claims:
1!
908
            claim = claims[stream_hash].as_dict()
1✔
909
            if include_supports:
1!
910
                supports = await self.get_supports(claim['claim_id'])
1✔
911
                claim['supports'] = supports
1✔
912
                claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
1✔
913
        return claim
1✔
914

915
    async def get_content_claim_for_torrent(self, bt_infohash):
1✔
916
        claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
×
NEW
917
        return claims[bt_infohash] if claims else None
×
918

919
    # # # # # # # # # reflector functions # # # # # # # # #
920

921
    def update_reflected_stream(self, sd_hash, reflector_address, success=True):
1✔
922
        if success:
1!
923
            return self.db.execute_fetchall(
1✔
924
                "insert or replace into reflected_stream values (?, ?, ?)",
925
                (sd_hash, reflector_address, self.time_getter())
926
            )
927
        return self.db.execute_fetchall(
×
928
            "delete from reflected_stream where sd_hash=? and reflector_address=?",
929
            (sd_hash, reflector_address)
930
        )
931

932
    def get_streams_to_re_reflect(self):
1✔
933
        return self.run_and_return_list(
1✔
934
            "select s.sd_hash from stream s "
935
            "left outer join reflected_stream r on s.sd_hash=r.sd_hash "
936
            "where r.timestamp is null or r.timestamp < ?",
937
            int(self.time_getter()) - 86400
938
        )
939

940
    # # # # # # # # # # dht functions # # # # # # # # # # #
941
    async def get_persisted_kademlia_peers(self) -> typing.List[typing.Tuple[bytes, str, int, int]]:
1✔
942
        query = 'select node_id, address, udp_port, tcp_port from peer'
1✔
943
        return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)]
1✔
944

945
    async def save_kademlia_peers(self, peers: typing.List['KademliaPeer']):
1✔
946
        def _save_kademlia_peers(transaction: sqlite3.Connection):
1✔
947
            transaction.execute('delete from peer').fetchall()
1✔
948
            transaction.executemany(
1✔
949
                'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)',
950
                ((binascii.hexlify(p.node_id), p.address, p.udp_port, p.tcp_port) for p in peers)
951
            ).fetchall()
952
        return await self.db.run(_save_kademlia_peers)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc