• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/lbry/extras/daemon/migrator/migrate5to6.py
1
import sqlite3
×
2
import os
×
3
import json
×
4
import logging
×
5
from binascii import hexlify
×
6
from lbry.schema.claim import Claim
×
7

8
log = logging.getLogger(__name__)
×
9

10
CREATE_TABLES_QUERY = """
×
11
            pragma foreign_keys=on;
12
            pragma journal_mode=WAL;
13

14
            create table if not exists blob (
15
                blob_hash char(96) primary key not null,
16
                blob_length integer not null,
17
                next_announce_time integer not null,
18
                should_announce integer not null default 0,
19
                status text not null
20
            );
21

22
            create table if not exists stream (
23
                stream_hash char(96) not null primary key,
24
                sd_hash char(96) not null references blob,
25
                stream_key text not null,
26
                stream_name text not null,
27
                suggested_filename text not null
28
            );
29

30
            create table if not exists stream_blob (
31
                stream_hash char(96) not null references stream,
32
                blob_hash char(96) references blob,
33
                position integer not null,
34
                iv char(32) not null,
35
                primary key (stream_hash, blob_hash)
36
            );
37

38
            create table if not exists claim (
39
                claim_outpoint text not null primary key,
40
                claim_id char(40) not null,
41
                claim_name text not null,
42
                amount integer not null,
43
                height integer not null,
44
                serialized_metadata blob not null,
45
                channel_claim_id text,
46
                address text not null,
47
                claim_sequence integer not null
48
            );
49

50
            create table if not exists file (
51
                stream_hash text primary key not null references stream,
52
                file_name text not null,
53
                download_directory text not null,
54
                blob_data_rate real not null,
55
                status text not null
56
            );
57

58
            create table if not exists content_claim (
59
                stream_hash text unique not null references file,
60
                claim_outpoint text not null references claim,
61
                primary key (stream_hash, claim_outpoint)
62
            );
63

64
            create table if not exists support (
65
                support_outpoint text not null primary key,
66
                claim_id text not null,
67
                amount integer not null,
68
                address text not null
69
            );
70
    """
71

72

73
def run_operation(db):
×
74
    def _decorate(fn):
×
75
        def _wrapper(*args):
×
76
            cursor = db.cursor()
×
77
            try:
×
78
                result = fn(cursor, *args)
×
79
                db.commit()
×
80
                return result
×
81
            except sqlite3.IntegrityError:
×
82
                db.rollback()
×
83
                raise
×
84
        return _wrapper
×
85
    return _decorate
×
86

87

88
def verify_sd_blob(sd_hash, blob_dir):
×
89
    with open(os.path.join(blob_dir, sd_hash), "r") as sd_file:
×
90
        data = sd_file.read()
×
91
        sd_length = len(data)
×
92
        decoded = json.loads(data)
×
93
    assert set(decoded.keys()) == {
×
94
        'stream_name', 'blobs', 'stream_type', 'key', 'suggested_file_name', 'stream_hash'
95
    }, "invalid sd blob"
96
    for blob in sorted(decoded['blobs'], key=lambda x: int(x['blob_num']), reverse=True):
×
97
        if blob['blob_num'] == len(decoded['blobs']) - 1:
×
98
            assert {'length', 'blob_num', 'iv'} == set(blob.keys()), 'invalid stream terminator'
×
99
            assert blob['length'] == 0, 'non zero length stream terminator'
×
100
        else:
101
            assert {'blob_hash', 'length', 'blob_num', 'iv'} == set(blob.keys()), 'invalid stream blob'
×
102
            assert blob['length'] > 0, 'zero length stream blob'
×
103
    return decoded, sd_length
×
104

105

106
def do_migration(conf):
×
107
    new_db_path = os.path.join(conf.data_dir, "lbrynet.sqlite")
×
108
    connection = sqlite3.connect(new_db_path)
×
109

110
    metadata_db = sqlite3.connect(os.path.join(conf.data_dir, "blockchainname.db"))
×
111
    lbryfile_db = sqlite3.connect(os.path.join(conf.data_dir, 'lbryfile_info.db'))
×
112
    blobs_db = sqlite3.connect(os.path.join(conf.data_dir, 'blobs.db'))
×
113

114
    name_metadata_cursor = metadata_db.cursor()
×
115
    lbryfile_cursor = lbryfile_db.cursor()
×
116
    blobs_db_cursor = blobs_db.cursor()
×
117

118
    old_rowid_to_outpoint = {
×
119
        rowid: (txid, nout) for (rowid, txid, nout) in
120
        lbryfile_cursor.execute("select * from lbry_file_metadata").fetchall()
121
    }
122

123
    old_sd_hash_to_outpoint = {
×
124
        sd_hash: (txid, nout) for (txid, nout, sd_hash) in
125
        name_metadata_cursor.execute("select txid, n, sd_hash from name_metadata").fetchall()
126
    }
127

128
    sd_hash_to_stream_hash = dict(
×
129
        lbryfile_cursor.execute("select sd_blob_hash, stream_hash from lbry_file_descriptors").fetchall()
130
    )
131

132
    stream_hash_to_stream_blobs = {}
×
133

134
    for (blob_hash, stream_hash, position, iv, length) in lbryfile_db.execute(
×
135
            "select * from lbry_file_blobs").fetchall():
136
        stream_blobs = stream_hash_to_stream_blobs.get(stream_hash, [])
×
137
        stream_blobs.append((blob_hash, length, position, iv))
×
138
        stream_hash_to_stream_blobs[stream_hash] = stream_blobs
×
139

140
    claim_outpoint_queries = {}
×
141

142
    for claim_query in metadata_db.execute(
×
143
            "select distinct c.txid, c.n, c.claimId, c.name, claim_cache.claim_sequence, claim_cache.claim_address, "
144
            "claim_cache.height, claim_cache.amount, claim_cache.claim_pb "
145
            "from claim_cache inner join claim_ids c on claim_cache.claim_id=c.claimId"):
146
        txid, nout = claim_query[0], claim_query[1]
×
147
        if (txid, nout) in claim_outpoint_queries:
×
148
            continue
×
149
        claim_outpoint_queries[(txid, nout)] = claim_query
×
150

151
    @run_operation(connection)
×
152
    def _populate_blobs(transaction, blob_infos):
×
153
        transaction.executemany(
×
154
            "insert into blob values (?, ?, ?, ?, ?)",
155
            [(blob_hash, blob_length, int(next_announce_time), should_announce, "finished")
156
             for (blob_hash, blob_length, _, next_announce_time, should_announce) in blob_infos]
157
        )
158

159
    @run_operation(connection)
×
160
    def _import_file(transaction, sd_hash, stream_hash, key, stream_name, suggested_file_name, data_rate,
×
161
                     status, stream_blobs):
162
        try:
×
163
            transaction.execute(
×
164
                "insert or ignore into stream values (?, ?, ?, ?, ?)",
165
                (stream_hash, sd_hash, key, stream_name, suggested_file_name)
166
            )
167
        except sqlite3.IntegrityError:
×
168
            # failed because the sd isn't a known blob, we'll try to read the blob file and recover it
169
            return sd_hash
×
170

171
        # insert any stream blobs that were missing from the blobs table
172
        transaction.executemany(
×
173
            "insert or ignore into blob values (?, ?, ?, ?, ?)",
174
            [
175
                (blob_hash, length, 0, 0, "pending")
176
                for (blob_hash, length, position, iv) in stream_blobs
177
            ]
178
        )
179

180
        # insert the stream blobs
181
        for blob_hash, length, position, iv in stream_blobs:
×
182
            transaction.execute(
×
183
                "insert or ignore into stream_blob values (?, ?, ?, ?)",
184
                (stream_hash, blob_hash, position, iv)
185
            )
186

187
        download_dir = conf.download_dir
×
188
        if not isinstance(download_dir, bytes):
×
189
            download_dir = download_dir.encode()
×
190

191
        # insert the file
192
        transaction.execute(
×
193
            "insert or ignore into file values (?, ?, ?, ?, ?)",
194
            (stream_hash, stream_name, hexlify(download_dir),
195
             data_rate, status)
196
        )
197

198
    @run_operation(connection)
×
199
    def _add_recovered_blobs(transaction, blob_infos, sd_hash, sd_length):
×
200
        transaction.execute(
×
201
            "insert or replace into blob values (?, ?, ?, ?, ?)", (sd_hash, sd_length, 0, 1, "finished")
202
        )
203
        for blob in sorted(blob_infos, key=lambda x: x['blob_num'], reverse=True):
×
204
            if blob['blob_num'] < len(blob_infos) - 1:
×
205
                transaction.execute(
×
206
                    "insert or ignore into blob values (?, ?, ?, ?, ?)",
207
                    (blob['blob_hash'], blob['length'], 0, 0, "pending")
208
                )
209

210
    @run_operation(connection)
×
211
    def _make_db(new_db):
×
212
        # create the new tables
213
        new_db.executescript(CREATE_TABLES_QUERY)
×
214

215
        # first migrate the blobs
216
        blobs = blobs_db_cursor.execute("select * from blobs").fetchall()
×
217
        _populate_blobs(blobs)  # pylint: disable=no-value-for-parameter
×
218
        log.info("migrated %i blobs", new_db.execute("select count(*) from blob").fetchone()[0])
×
219

220
        # used to store the query arguments if we need to try re-importing the lbry file later
221
        file_args = {}  # <sd_hash>: args tuple
×
222

223
        file_outpoints = {}  # <outpoint tuple>: sd_hash
×
224

225
        # get the file and stream queries ready
226
        for (rowid, sd_hash, stream_hash, key, stream_name, suggested_file_name, data_rate, status) in \
×
227
            lbryfile_db.execute(
228
                "select distinct lbry_files.rowid, d.sd_blob_hash, lbry_files.*, o.blob_data_rate, o.status "
229
                "from lbry_files "
230
                "inner join lbry_file_descriptors d on lbry_files.stream_hash=d.stream_hash "
231
                "inner join lbry_file_options o on lbry_files.stream_hash=o.stream_hash"):
232

233
            # this is try to link the file to a content claim after we've imported all the files
234
            if rowid in old_rowid_to_outpoint:
×
235
                file_outpoints[old_rowid_to_outpoint[rowid]] = sd_hash
×
236
            elif sd_hash in old_sd_hash_to_outpoint:
×
237
                file_outpoints[old_sd_hash_to_outpoint[sd_hash]] = sd_hash
×
238

239
            sd_hash_to_stream_hash[sd_hash] = stream_hash
×
240
            if stream_hash in stream_hash_to_stream_blobs:
×
241
                file_args[sd_hash] = (
×
242
                    sd_hash, stream_hash, key, stream_name,
243
                    suggested_file_name, data_rate or 0.0,
244
                    status, stream_hash_to_stream_blobs.pop(stream_hash)
245
                )
246

247
        # used to store the query arguments if we need to try re-importing the claim
248
        claim_queries = {}  # <sd_hash>: claim query tuple
×
249

250
        # get the claim queries ready, only keep those with associated files
251
        for outpoint, sd_hash in file_outpoints.items():
×
252
            if outpoint in claim_outpoint_queries:
×
253
                claim_queries[sd_hash] = claim_outpoint_queries[outpoint]
×
254

255
        # insert the claims
256
        new_db.executemany(
×
257
            "insert or ignore into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
258
            [
259
                (
260
                    "%s:%i" % (claim_arg_tup[0], claim_arg_tup[1]), claim_arg_tup[2], claim_arg_tup[3],
261
                    claim_arg_tup[7], claim_arg_tup[6], claim_arg_tup[8],
262
                    Claim.from_bytes(claim_arg_tup[8]).signing_channel_id, claim_arg_tup[5], claim_arg_tup[4]
263
                )
264
                for sd_hash, claim_arg_tup in claim_queries.items() if claim_arg_tup
265
            ]     # sd_hash,  (txid, nout, claim_id, name, sequence, address, height, amount, serialized)
266
        )
267

268
        log.info("migrated %i claims", new_db.execute("select count(*) from claim").fetchone()[0])
×
269

270
        damaged_stream_sds = []
×
271
        # import the files and get sd hashes of streams to attempt recovering
272
        for sd_hash, file_query in file_args.items():
×
273
            failed_sd = _import_file(*file_query)
×
274
            if failed_sd:
×
275
                damaged_stream_sds.append(failed_sd)
×
276

277
        # recover damaged streams
278
        if damaged_stream_sds:
×
279
            blob_dir = os.path.join(conf.data_dir, "blobfiles")
×
280
            damaged_sds_on_disk = [] if not os.path.isdir(blob_dir) else list({p for p in os.listdir(blob_dir)
×
281
                                                                               if p in damaged_stream_sds})
282
            for damaged_sd in damaged_sds_on_disk:
×
283
                try:
×
284
                    decoded, sd_length = verify_sd_blob(damaged_sd, blob_dir)
×
285
                    blobs = decoded['blobs']
×
286
                    _add_recovered_blobs(blobs, damaged_sd, sd_length)  # pylint: disable=no-value-for-parameter
×
287
                    _import_file(*file_args[damaged_sd])
×
288
                    damaged_stream_sds.remove(damaged_sd)
×
289
                except (OSError, ValueError, TypeError, AssertionError, sqlite3.IntegrityError):
×
290
                    continue
×
291

292
        log.info("migrated %i files", new_db.execute("select count(*) from file").fetchone()[0])
×
293

294
        # associate the content claims to their respective files
295
        for claim_arg_tup in claim_queries.values():
×
296
            if claim_arg_tup and (claim_arg_tup[0], claim_arg_tup[1]) in file_outpoints \
×
297
                    and file_outpoints[(claim_arg_tup[0], claim_arg_tup[1])] in sd_hash_to_stream_hash:
298
                try:
×
299
                    new_db.execute(
×
300
                        "insert or ignore into content_claim values (?, ?)",
301
                        (
302
                            sd_hash_to_stream_hash.get(file_outpoints.get((claim_arg_tup[0], claim_arg_tup[1]))),
303
                            "%s:%i" % (claim_arg_tup[0], claim_arg_tup[1])
304
                        )
305
                    )
306
                except sqlite3.IntegrityError:
×
307
                    continue
×
308

309
        log.info("migrated %i content claims", new_db.execute("select count(*) from content_claim").fetchone()[0])
×
310
    try:
×
311
        _make_db()  # pylint: disable=no-value-for-parameter
×
312
    except sqlite3.OperationalError as err:
×
313
        if err.message == "table blob has 7 columns but 5 values were supplied":
×
314
            log.warning("detected a failed previous migration to revision 6, repairing it")
×
315
            connection.close()
×
316
            os.remove(new_db_path)
×
317
            return do_migration(conf)
×
318
        raise err
×
319

320
    connection.close()
×
321
    blobs_db.close()
×
322
    lbryfile_db.close()
×
323
    metadata_db.close()
×
324
    # os.remove(os.path.join(db_dir, "blockchainname.db"))
325
    # os.remove(os.path.join(db_dir, 'lbryfile_info.db'))
326
    # os.remove(os.path.join(db_dir, 'blobs.db'))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc