• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.69
/lbry/wallet/ledger.py
1
import os
3✔
2
import copy
3✔
3
import time
3✔
4
import asyncio
3✔
5
import logging
3✔
6
from datetime import datetime
3✔
7
from functools import partial
3✔
8
from operator import itemgetter
3✔
9
from collections import defaultdict
3✔
10
from binascii import hexlify, unhexlify
3✔
11
from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple
3✔
12

13
from lbry.schema.result import Outputs, INVALID, NOT_FOUND
3✔
14
from lbry.schema.url import URL
3✔
15
from lbry.crypto.hash import hash160, double_sha256, sha256
3✔
16
from lbry.crypto.base58 import Base58
3✔
17
from lbry.utils import LRUCacheWithMetrics
3✔
18

19
from lbry.wallet.tasks import TaskGroup
3✔
20
from lbry.wallet.database import Database
3✔
21
from lbry.wallet.stream import StreamController
3✔
22
from lbry.wallet.dewies import dewies_to_lbc
3✔
23
from lbry.wallet.account import Account, AddressManager, SingleKey
3✔
24
from lbry.wallet.network import Network
3✔
25
from lbry.wallet.transaction import Transaction, Output
3✔
26
from lbry.wallet.header import Headers, UnvalidatedHeaders
3✔
27
from lbry.wallet.checkpoints import HASHES
3✔
28
from lbry.wallet.constants import TXO_TYPES, CLAIM_TYPES, COIN, NULL_HASH32
3✔
29
from lbry.wallet.bip32 import PublicKey, PrivateKey
3✔
30
from lbry.wallet.coinselection import CoinSelector
3✔
31

32
log = logging.getLogger(__name__)
3✔
33

34
LedgerType = Type['BaseLedger']
3✔
35

36

37
class LedgerRegistry(type):
3✔
38

39
    ledgers: Dict[str, LedgerType] = {}
3✔
40

41
    def __new__(mcs, name, bases, attrs):
3✔
42
        cls: LedgerType = super().__new__(mcs, name, bases, attrs)
3✔
43
        if not (name == 'BaseLedger' and not bases):
3!
44
            ledger_id = cls.get_id()
3✔
45
            assert ledger_id not in mcs.ledgers, \
3✔
46
                f'Ledger with id "{ledger_id}" already registered.'
47
            mcs.ledgers[ledger_id] = cls
3✔
48
        return cls
3✔
49

50
    @classmethod
3✔
51
    def get_ledger_class(mcs, ledger_id: str) -> LedgerType:
3✔
52
        return mcs.ledgers[ledger_id]
1✔
53

54

55
class TransactionEvent(NamedTuple):
3✔
56
    address: str
3✔
57
    tx: Transaction
3✔
58

59

60
class AddressesGeneratedEvent(NamedTuple):
3✔
61
    address_manager: AddressManager
3✔
62
    addresses: List[str]
3✔
63

64

65
class BlockHeightEvent(NamedTuple):
3✔
66
    height: int
3✔
67
    change: int
3✔
68

69

70
class TransactionCacheItem:
3✔
71
    __slots__ = '_tx', 'lock', 'has_tx', 'pending_verifications'
3✔
72

73
    def __init__(self, tx: Optional[Transaction] = None, lock: Optional[asyncio.Lock] = None):
3✔
74
        self.has_tx = asyncio.Event()
×
75
        self.lock = lock or asyncio.Lock()
×
76
        self._tx = self.tx = tx
×
77
        self.pending_verifications = 0
×
78

79
    @property
3✔
80
    def tx(self) -> Optional[Transaction]:
3✔
81
        return self._tx
×
82

83
    @tx.setter
3✔
84
    def tx(self, tx: Transaction):
3✔
85
        self._tx = tx
×
86
        if tx is not None:
×
87
            self.has_tx.set()
×
88

89

90
class Ledger(metaclass=LedgerRegistry):
3✔
91
    name = 'LBRY Credits'
3✔
92
    symbol = 'LBC'
3✔
93
    network_name = 'mainnet'
3✔
94

95
    headers_class = Headers
3✔
96

97
    secret_prefix = bytes((0x1c,))
3✔
98
    pubkey_address_prefix = bytes((0x55,))
3✔
99
    script_address_prefix = bytes((0x7a,))
3✔
100
    extended_public_key_prefix = unhexlify('0488b21e')
3✔
101
    extended_private_key_prefix = unhexlify('0488ade4')
3✔
102

103
    max_target = 0x0000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
3✔
104
    genesis_hash = '9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463'
3✔
105
    genesis_bits = 0x1f00ffff
3✔
106
    target_timespan = 150
3✔
107

108
    default_fee_per_byte = 50
3✔
109
    default_fee_per_name_char = 0
3✔
110

111
    checkpoints = HASHES
3✔
112

113
    def __init__(self, config=None):
3✔
114
        self.config = config or {}
1✔
115
        self.db: Database = self.config.get('db') or Database(
1✔
116
            os.path.join(self.path, "blockchain.db")
117
        )
118
        self.db.ledger = self
1✔
119
        self.headers: Headers = self.config.get('headers') or self.headers_class(
1✔
120
            os.path.join(self.path, "headers")
121
        )
122
        self.headers.checkpoints = self.checkpoints
1✔
123
        self.network: Network = self.config.get('network') or Network(self)
1✔
124
        self.network.on_header.listen(self.receive_header)
1✔
125
        self.network.on_status.listen(self.process_status_update)
1✔
126

127
        self.accounts = []
1✔
128
        self.fee_per_byte: int = self.config.get('fee_per_byte', self.default_fee_per_byte)
1✔
129

130
        self._on_transaction_controller = StreamController()
1✔
131
        self.on_transaction = self._on_transaction_controller.stream
1✔
132
        self.on_transaction.listen(
1✔
133
            lambda e: log.info(
134
                '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
135
                self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
136
            )
137
        )
138

139
        self._on_address_controller = StreamController()
1✔
140
        self.on_address = self._on_address_controller.stream
1✔
141
        self.on_address.listen(
1✔
142
            lambda e: log.info('(%s) on_address: %s', self.get_id(), e.addresses)
143
        )
144

145
        self._on_header_controller = StreamController()
1✔
146
        self.on_header = self._on_header_controller.stream
1✔
147
        self.on_header.listen(
1✔
148
            lambda change: log.info(
149
                '%s: added %s header blocks, final height %s',
150
                self.get_id(), change, self.headers.height
151
            )
152
        )
153
        self._download_height = 0
1✔
154

155
        self._on_ready_controller = StreamController()
1✔
156
        self.on_ready = self._on_ready_controller.stream
1✔
157

158
        self._tx_cache = LRUCacheWithMetrics(self.config.get("tx_cache_size", 1024), metric_name='tx')
1✔
159
        self._update_tasks = TaskGroup()
1✔
160
        self._other_tasks = TaskGroup()  # that we dont need to start
1✔
161
        self._utxo_reservation_lock = asyncio.Lock()
1✔
162
        self._header_processing_lock = asyncio.Lock()
1✔
163
        self._address_update_locks: DefaultDict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
1✔
164
        self._history_lock = asyncio.Lock()
1✔
165

166
        self.coin_selection_strategy = None
1✔
167
        self._known_addresses_out_of_sync = set()
1✔
168

169
        self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
1✔
170
        self._balance_cache = LRUCacheWithMetrics(2 ** 15)
1✔
171

172
    @classmethod
3✔
173
    def get_id(cls):
3✔
174
        return '{}_{}'.format(cls.symbol.lower(), cls.network_name.lower())
3✔
175

176
    @classmethod
3✔
177
    def hash160_to_address(cls, h160):
3✔
178
        raw_address = cls.pubkey_address_prefix + h160
1✔
179
        return Base58.encode(bytearray(raw_address + double_sha256(raw_address)[0:4]))
1✔
180

181
    @classmethod
3✔
182
    def hash160_to_script_address(cls, h160):
3✔
183
        raw_address = cls.script_address_prefix + h160
×
184
        return Base58.encode(bytearray(raw_address + double_sha256(raw_address)[0:4]))
×
185

186
    @staticmethod
3✔
187
    def address_to_hash160(address):
3✔
188
        return Base58.decode(address)[1:21]
1✔
189

190
    @classmethod
3✔
191
    def is_pubkey_address(cls, address):
3✔
192
        decoded = Base58.decode_check(address)
×
193
        return decoded[0] == cls.pubkey_address_prefix[0]
×
194

195
    @classmethod
3✔
196
    def is_script_address(cls, address):
3✔
197
        decoded = Base58.decode_check(address)
1✔
198
        return decoded[0] == cls.script_address_prefix[0]
1✔
199

200
    @classmethod
3✔
201
    def public_key_to_address(cls, public_key):
3✔
202
        return cls.hash160_to_address(hash160(public_key))
1✔
203

204
    @staticmethod
3✔
205
    def private_key_to_wif(private_key):
3✔
206
        return b'\x1c' + private_key + b'\x01'
1✔
207

208
    @property
3✔
209
    def path(self):
3✔
210
        return os.path.join(self.config['data_path'], self.get_id())
1✔
211

212
    def add_account(self, account: Account):
3✔
213
        self.accounts.append(account)
1✔
214

215
    async def _get_account_and_address_info_for_address(self, wallet, address):
3✔
216
        match = await self.db.get_address(accounts=wallet.accounts, address=address)
1✔
217
        if match:
1✔
218
            for account in wallet.accounts:
1!
219
                if match['account'] == account.public_key.address:
1!
220
                    return account, match
1✔
221

222
    async def get_private_key_for_address(self, wallet, address) -> Optional[PrivateKey]:
3✔
223
        match = await self._get_account_and_address_info_for_address(wallet, address)
1✔
224
        if match:
1✔
225
            account, address_info = match
1✔
226
            return account.get_private_key(address_info['chain'], address_info['pubkey'].n)
1✔
227
        return None
1✔
228

229
    async def get_public_key_for_address(self, wallet, address) -> Optional[PublicKey]:
3✔
230
        match = await self._get_account_and_address_info_for_address(wallet, address)
×
231
        if match:
×
232
            _, address_info = match
×
233
            return address_info['pubkey']
×
234
        return None
×
235

236
    async def get_account_for_address(self, wallet, address):
3✔
237
        match = await self._get_account_and_address_info_for_address(wallet, address)
×
238
        if match:
×
239
            return match[0]
×
240

241
    async def get_effective_amount_estimators(self, funding_accounts: Iterable[Account]):
3✔
242
        estimators = []
1✔
243
        for account in funding_accounts:
1✔
244
            utxos = await account.get_utxos(no_tx=True, no_channel_info=True)
1✔
245
            for utxo in utxos:
1✔
246
                estimators.append(utxo.get_estimator(self))
1✔
247
        return estimators
1✔
248

249
    async def get_addresses(self, **constraints):
3✔
250
        return await self.db.get_addresses(**constraints)
×
251

252
    def get_address_count(self, **constraints):
3✔
253
        return self.db.get_address_count(**constraints)
×
254

255
    async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], min_amount=1):
3✔
256
        min_amount = min(amount // 10, min_amount)
1✔
257
        fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
1✔
258
        selector = CoinSelector(amount, fee)
1✔
259
        async with self._utxo_reservation_lock:
1✔
260
            if self.coin_selection_strategy == 'sqlite':
1✔
261
                return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
1✔
262
                                                         fee_per_byte=self.fee_per_byte)
263
            txos = await self.get_effective_amount_estimators(funding_accounts)
1✔
264
            spendables = selector.select(txos, self.coin_selection_strategy)
1✔
265
            if spendables:
1✔
266
                await self.reserve_outputs(s.txo for s in spendables)
1✔
267
            return spendables
1✔
268

269
    def reserve_outputs(self, txos):
3✔
270
        return self.db.reserve_outputs(txos)
1✔
271

272
    def release_outputs(self, txos):
3✔
273
        return self.db.release_outputs(txos)
1✔
274

275
    def release_tx(self, tx):
3✔
276
        return self.release_outputs([txi.txo_ref.txo for txi in tx.inputs])
1✔
277

278
    def get_utxos(self, **constraints):
3✔
279
        self.constraint_spending_utxos(constraints)
1✔
280
        return self.db.get_utxos(**constraints)
1✔
281

282
    def get_utxo_count(self, **constraints):
3✔
283
        self.constraint_spending_utxos(constraints)
×
284
        return self.db.get_utxo_count(**constraints)
×
285

286
    async def get_txos(self, resolve=False, **constraints) -> List[Output]:
3✔
287
        txos = await self.db.get_txos(**constraints)
×
288
        if resolve:
×
289
            return await self._resolve_for_local_results(constraints.get('accounts', []), txos)
×
290
        return txos
×
291

292
    def get_txo_count(self, **constraints):
3✔
293
        return self.db.get_txo_count(**constraints)
×
294

295
    def get_txo_sum(self, **constraints):
3✔
296
        return self.db.get_txo_sum(**constraints)
×
297

298
    def get_txo_plot(self, **constraints):
3✔
299
        return self.db.get_txo_plot(**constraints)
×
300

301
    def get_transactions(self, **constraints):
3✔
302
        return self.db.get_transactions(**constraints)
1✔
303

304
    def get_transaction_count(self, **constraints):
3✔
305
        return self.db.get_transaction_count(**constraints)
×
306

307
    async def get_local_status_and_history(self, address, history=None):
3✔
308
        if not history:
1✔
309
            address_details = await self.db.get_address(address=address)
1✔
310
            history = (address_details['history'] if address_details else '') or ''
1✔
311
        parts = history.split(':')[:-1]
1✔
312
        return (
1✔
313
            hexlify(sha256(history.encode())).decode() if history else None,
314
            list(zip(parts[0::2], map(int, parts[1::2])))
315
        )
316

317
    @staticmethod
3✔
318
    def get_root_of_merkle_tree(branches, branch_positions, working_branch):
3✔
319
        for i, branch in enumerate(branches):
×
320
            other_branch = unhexlify(branch)[::-1]
×
321
            other_branch_on_left = bool((branch_positions >> i) & 1)
×
322
            if other_branch_on_left:
×
323
                combined = other_branch + working_branch
×
324
            else:
325
                combined = working_branch + other_branch
×
326
            working_branch = double_sha256(combined)
×
327
        return hexlify(working_branch[::-1])
×
328

329
    async def start(self):
3✔
330
        if not os.path.exists(self.path):
×
331
            os.mkdir(self.path)
×
332
        await asyncio.wait(map(asyncio.create_task, [
×
333
            self.db.open(),
334
            self.headers.open()
335
        ]))
336
        fully_synced = self.on_ready.first
×
337
        asyncio.create_task(self.network.start())
×
338
        await self.network.on_connected.first
×
339
        async with self._header_processing_lock:
×
340
            await self._update_tasks.add(self.initial_headers_sync())
×
341
        self.network.on_connected.listen(self.join_network)
×
342
        asyncio.ensure_future(self.join_network())
×
343
        await fully_synced
×
344
        await self.db.release_all_outputs()
×
345
        await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts))
×
346
        await asyncio.gather(*(a.save_max_gap() for a in self.accounts))
×
347
        if len(self.accounts) > 10:
×
348
            log.info("Loaded %i accounts", len(self.accounts))
×
349
        else:
350
            await self._report_state()
×
351
        self.on_transaction.listen(self._reset_balance_cache)
×
352

353
    async def join_network(self, *_):
3✔
354
        log.info("Subscribing and updating accounts.")
×
355
        await self._update_tasks.add(self.subscribe_accounts())
×
356
        await self._update_tasks.done.wait()
×
357
        self._on_ready_controller.add(True)
×
358

359
    async def stop(self):
3✔
360
        self._update_tasks.cancel()
×
361
        self._other_tasks.cancel()
×
362
        await self._update_tasks.done.wait()
×
363
        await self._other_tasks.done.wait()
×
364
        await self.network.stop()
×
365
        await self.db.close()
×
366
        await self.headers.close()
×
367

368
    async def tasks_are_done(self):
3✔
369
        await self._update_tasks.done.wait()
×
370
        await self._other_tasks.done.wait()
×
371

372
    @property
3✔
373
    def local_height_including_downloaded_height(self):
3✔
374
        return max(self.headers.height, self._download_height)
×
375

376
    async def initial_headers_sync(self):
3✔
377
        get_chunk = partial(self.network.retriable_call, self.network.get_headers, count=1000, b64=True)
×
378
        self.headers.chunk_getter = get_chunk
×
379

380
        async def doit():
×
381
            for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
×
382
                async with self._header_processing_lock:
×
383
                    await self.headers.ensure_chunk_at(height)
×
384
        self._other_tasks.add(doit())
×
385
        await self.update_headers()
×
386

387
    async def update_headers(self, height=None, headers=None, subscription_update=False):
3✔
388
        rewound = 0
1✔
389
        while True:
390

391
            if height is None or height > len(self.headers):
1!
392
                # sometimes header subscription updates are for a header in the future
393
                # which can't be connected, so we do a normal header sync instead
394
                height = len(self.headers)
×
395
                headers = None
×
396
                subscription_update = False
×
397

398
            if not headers:
1✔
399
                header_response = await self.network.retriable_call(self.network.get_headers, height, 2001)
1✔
400
                headers = header_response['hex']
1✔
401

402
            if not headers:
1✔
403
                # Nothing to do, network thinks we're already at the latest height.
404
                return
1✔
405

406
            added = await self.headers.connect(height, unhexlify(headers))
1✔
407
            if added > 0:
1✔
408
                height += added
1✔
409
                self._on_header_controller.add(
1✔
410
                    BlockHeightEvent(self.headers.height, added))
411

412
                if rewound > 0:
1!
413
                    # we started rewinding blocks and apparently found
414
                    # a new chain
415
                    rewound = 0
1✔
416
                    await self.db.rewind_blockchain(height)
1✔
417

418
                if subscription_update:
1!
419
                    # subscription updates are for latest header already
420
                    # so we don't need to check if there are newer / more
421
                    # on another loop of update_headers(), just return instead
422
                    return
×
423

424
            elif added == 0:
1!
425
                # we had headers to connect but none got connected, probably a reorganization
426
                height -= 1
1✔
427
                rewound += 1
1✔
428
                log.warning(
1✔
429
                    "Blockchain Reorganization: attempting rewind to height %s from starting height %s",
430
                    height, height+rewound
431
                )
432
                self._tx_cache.clear()
1✔
433

434
            else:
435
                raise IndexError(f"headers.connect() returned negative number ({added})")
×
436

437
            if height < 0:
1!
438
                raise IndexError(
×
439
                    "Blockchain reorganization rewound all the way back to genesis hash. "
440
                    "Something is very wrong. Maybe you are on the wrong blockchain?"
441
                )
442

443
            if rewound >= 100:
1!
444
                raise IndexError(
×
445
                    "Blockchain reorganization dropped {} headers. This is highly unusual. "
446
                    "Will not continue to attempt reorganizing. Please, delete the ledger "
447
                    "synchronization directory inside your wallet directory (folder: '{}') and "
448
                    "restart the program to synchronize from scratch."
449
                    .format(rewound, self.get_id())
450
                )
451

452
            headers = None  # ready to download some more headers
1✔
453

454
            # if we made it this far and this was a subscription_update
455
            # it means something went wrong and now we're doing a more
456
            # robust sync, turn off subscription update shortcut
457
            subscription_update = False
1✔
458

459
    async def receive_header(self, response):
3✔
460
        async with self._header_processing_lock:
1✔
461
            header = response[0]
1✔
462
            await self.update_headers(
1✔
463
                height=header['height'], headers=header['hex'], subscription_update=True
464
            )
465

466
    async def subscribe_accounts(self):
3✔
467
        if self.network.is_connected and self.accounts:
×
468
            log.info("Subscribe to %i accounts", len(self.accounts))
×
469
            await asyncio.wait(map(asyncio.create_task, [
×
470
                self.subscribe_account(a) for a in self.accounts
471
            ]))
472

473
    async def subscribe_account(self, account: Account):
3✔
474
        for address_manager in account.address_managers.values():
×
475
            await self.subscribe_addresses(address_manager, await address_manager.get_addresses())
×
476
        await account.ensure_address_gap()
×
477
        await account.deterministic_channel_keys.ensure_cache_primed()
×
478

479
    async def unsubscribe_account(self, account: Account):
3✔
480
        for address in await account.get_addresses():
×
481
            await self.network.unsubscribe_address(address)
×
482

483
    async def announce_addresses(self, address_manager: AddressManager, addresses: List[str]):
3✔
484
        await self.subscribe_addresses(address_manager, addresses)
1✔
485
        await self._on_address_controller.add(
1✔
486
            AddressesGeneratedEvent(address_manager, addresses)
487
        )
488

489
    async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000):
3✔
490
        if self.network.is_connected and addresses:
1!
491
            addresses_remaining = list(addresses)
×
492
            while addresses_remaining:
×
493
                batch = addresses_remaining[:batch_size]
×
494
                results = await self.network.subscribe_address(*batch)
×
495
                for address, remote_status in zip(batch, results):
×
496
                    self._update_tasks.add(self.update_history(address, remote_status, address_manager))
×
497
                addresses_remaining = addresses_remaining[batch_size:]
×
498
                if self.network.client and self.network.client.server_address_and_port:
×
499
                    log.info("subscribed to %i/%i addresses on %s:%i", len(addresses) - len(addresses_remaining),
×
500
                             len(addresses), *self.network.client.server_address_and_port)
501
            if self.network.client and self.network.client.server_address_and_port:
×
502
                log.info(
×
503
                    "finished subscribing to %i addresses on %s:%i", len(addresses),
504
                    *self.network.client.server_address_and_port
505
                )
506

507
    def process_status_update(self, update):
3✔
508
        address, remote_status = update
×
509
        self._update_tasks.add(self.update_history(address, remote_status))
×
510

511
    async def update_history(self, address, remote_status, address_manager: AddressManager = None,
3✔
512
                             reattempt_update: bool = True):
513
        async with self._address_update_locks[address]:
1✔
514
            self._known_addresses_out_of_sync.discard(address)
1✔
515
            local_status, local_history = await self.get_local_status_and_history(address)
1✔
516

517
            if local_status == remote_status:
1!
518
                return True
×
519

520
            remote_history = await self.network.retriable_call(self.network.get_history, address)
1✔
521
            remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
1✔
522
            we_need = set(remote_history) - set(local_history)
1✔
523
            if not we_need:
1✔
524
                remote_missing = set(local_history) - set(remote_history)
1✔
525
                if remote_missing:
1!
526
                    log.warning(
×
527
                        "%i transactions we have for %s are not in the remote address history",
528
                        len(remote_missing), address
529
                    )
530
                return True
1✔
531

532
            to_request = {}
1✔
533
            pending_synced_history = {}
1✔
534
            already_synced = set()
1✔
535

536
            already_synced_offset = 0
1✔
537
            for i, (txid, remote_height) in enumerate(remote_history):
1✔
538
                if i == already_synced_offset and i < len(local_history) and local_history[i] == (txid, remote_height):
1✔
539
                    pending_synced_history[i] = f'{txid}:{remote_height}:'
1✔
540
                    already_synced.add((txid, remote_height))
1✔
541
                    already_synced_offset += 1
1✔
542
                    continue
1✔
543

544
            tx_indexes = {}
1✔
545

546
            for i, (txid, remote_height) in enumerate(remote_history):
1✔
547
                tx_indexes[txid] = i
1✔
548
                if (txid, remote_height) in already_synced:
1✔
549
                    continue
1✔
550
                to_request[i] = (txid, remote_height)
1✔
551

552
            log.debug(
1✔
553
                "request %i transactions, %i/%i for %s are already synced", len(to_request), len(already_synced),
554
                len(remote_history), address
555
            )
556
            remote_history_txids = {txid for txid, _ in remote_history}
1✔
557
            async for tx in self.request_synced_transactions(to_request, remote_history_txids, address):
1!
558
                self.maybe_has_channel_key(tx)
1✔
559
                pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
1✔
560
                if len(pending_synced_history) % 100 == 0:
1!
561
                    log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
1✔
562
            log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
1✔
563

564
            assert len(pending_synced_history) == len(remote_history), \
1✔
565
                f"{len(pending_synced_history)} vs {len(remote_history)} for {address}"
566
            synced_history = ""
1✔
567
            for remote_i, i in zip(range(len(remote_history)), sorted(pending_synced_history.keys())):
1✔
568
                assert i == remote_i, f"{i} vs {remote_i}"
1✔
569
                txid, height = remote_history[remote_i]
1✔
570
                if f"{txid}:{height}:" != pending_synced_history[i]:
1!
571
                    log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i])
×
572
                synced_history += pending_synced_history[i]
1✔
573
            await self.db.set_address_history(address, synced_history)
1✔
574

575
            if address_manager is None:
1!
576
                address_manager = await self.get_address_manager_for_address(address)
1✔
577

578
            if address_manager is not None:
1!
579
                await address_manager.ensure_address_gap()
1✔
580

581
            local_status, local_history = \
1✔
582
                await self.get_local_status_and_history(address, synced_history)
583

584
            if local_status != remote_status:
1!
585
                if local_history == remote_history:
1!
586
                    log.warning(
1✔
587
                        "%s has a synced history but a mismatched status", address
588
                    )
589
                    return True
1✔
590
                remote_set = set(remote_history)
×
591
                local_set = set(local_history)
×
592
                log.warning(
×
593
                    "%s is out of sync after syncing.\n"
594
                    "Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
595
                    "Histories are mismatched on %i items.\n"
596
                    "Local is missing\n"
597
                    "%s\n"
598
                    "Remote is missing\n"
599
                    "%s\n"
600
                    "******",
601
                    address, remote_status, len(remote_history), len(remote_set),
602
                    local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
603
                    "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]),
604
                    "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)])
605
                )
606
                self._known_addresses_out_of_sync.add(address)
×
607
                return False
×
608
            else:
609
                log.debug("finished syncing transaction history for %s, %i known txs", address, len(local_history))
×
610
                return True
×
611

612
    async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
3✔
613
        tx.height = remote_height
1✔
614
        if 0 < remote_height < len(self.headers):
1✔
615
            # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case
616
            if not merkle:
1!
617
                merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height)
×
618
            if 'merkle' not in merkle:
1!
619
                return
1✔
620
            merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
×
621
            header = await self.headers.get(remote_height)
×
622
            tx.position = merkle['pos']
×
623
            tx.is_verified = merkle_root == header['merkle_root']
×
624
        return tx
1✔
625

626
    def maybe_has_channel_key(self, tx):
3✔
627
        for txo in tx._outputs:
1✔
628
            if txo.can_decode_claim and txo.claim.is_channel:
1!
629
                for account in self.accounts:
×
630
                    account.deterministic_channel_keys.maybe_generate_deterministic_key_for_channel(txo)
×
631

632
    async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False):
3✔
633
        batches = [[]]
1✔
634
        remote_heights = {}
1✔
635
        cache_hits = set()
1✔
636

637
        for txid, height in sorted(to_request, key=lambda x: x[1]):
1✔
638
            if cached:
1!
639
                cached_tx = self._tx_cache.get(txid)
×
640
                if cached_tx is not None:
×
641
                    if cached_tx.tx is not None and cached_tx.tx.is_verified:
×
642
                        cache_hits.add(txid)
×
643
                        continue
×
644
                else:
645
                    self._tx_cache[txid] = TransactionCacheItem()
×
646
            remote_heights[txid] = height
1✔
647
            if len(batches[-1]) == 100:
1!
648
                batches.append([])
×
649
            batches[-1].append(txid)
1✔
650
        if not batches[-1]:
1!
651
            batches.pop()
×
652
        if cached and cache_hits:
1!
653
            yield {txid: self._tx_cache[txid].tx for txid in cache_hits}
×
654

655
        for batch in batches:
1✔
656
            txs = await self._single_batch(batch, remote_heights)
1✔
657
            if cached:
1!
658
                for txid, tx in txs.items():
×
659
                    self._tx_cache[txid].tx = tx
×
660
            yield txs
1✔
661

662
    async def request_synced_transactions(self, to_request, remote_history, address):
3✔
663
        async for txs in self.request_transactions(((txid, height) for txid, height in to_request.values())):
1✔
664
            for tx in txs.values():
1✔
665
                yield tx
1✔
666
            await self._sync_and_save_batch(address, remote_history, txs)
1✔
667

668
    async def _single_batch(self, batch, remote_heights):
3✔
669
        heights = {remote_heights[txid] for txid in batch}
1✔
670
        unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0])
1✔
671
        batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced)
1✔
672
        txs = {}
1✔
673
        for txid, (raw, merkle) in batch_result.items():
1✔
674
            remote_height = remote_heights[txid]
1✔
675
            tx = Transaction(unhexlify(raw), height=remote_height)
1✔
676
            txs[tx.id] = tx
1✔
677
            await self.maybe_verify_transaction(tx, remote_height, merkle)
1✔
678
        return txs
1✔
679

680
    async def _sync_and_save_batch(self, address, remote_history, pending_txs):
3✔
681
        await asyncio.gather(*(self._sync(tx, remote_history, pending_txs) for tx in pending_txs.values()))
1✔
682
        await self.db.save_transaction_io_batch(
1✔
683
            pending_txs.values(), address, self.address_to_hash160(address), ""
684
        )
685
        while pending_txs:
1✔
686
            self._on_transaction_controller.add(TransactionEvent(address, pending_txs.popitem()[1]))
1✔
687

688
    async def _sync(self, tx, remote_history, pending_txs):
3✔
689
        check_db_for_txos = {}
1✔
690
        for txi in tx.inputs:
1✔
691
            if txi.txo_ref.txo is not None:
1!
692
                continue
×
693
            wanted_txid = txi.txo_ref.tx_ref.id
1✔
694
            if wanted_txid not in remote_history:
1!
695
                continue
1✔
696
            if wanted_txid in pending_txs:
×
697
                txi.txo_ref = pending_txs[wanted_txid].outputs[txi.txo_ref.position].ref
×
698
            else:
699
                check_db_for_txos[txi] = txi.txo_ref.id
×
700

701
        referenced_txos = {} if not check_db_for_txos else {
1!
702
            txo.id: txo for txo in await self.db.get_txos(
703
                txoid__in=list(check_db_for_txos.values()), order_by='txo.txoid', no_tx=True
704
            )
705
        }
706

707
        for txi in check_db_for_txos:
1!
708
            if txi.txo_ref.id in referenced_txos:
×
709
                txi.txo_ref = referenced_txos[txi.txo_ref.id].ref
×
710
            else:
711
                tx_from_db = await self.db.get_transaction(txid=txi.txo_ref.tx_ref.id)
×
712
                if tx_from_db is None:
×
713
                    log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id)
×
714
                else:
715
                    txi.txo_ref = tx_from_db.outputs[txi.txo_ref.position].ref
×
716
        return tx
1✔
717

718
    async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
3✔
719
        details = await self.db.get_address(address=address)
1✔
720
        for account in self.accounts:
1!
721
            if account.id == details['account']:
1✔
722
                return account.address_managers[details['chain']]
1✔
723
        return None
×
724

725
    async def broadcast_or_release(self, tx, blocking=False):
3✔
726
        try:
×
727
            await self.broadcast(tx)
×
728
        except:
×
729
            await self.release_tx(tx)
×
730
            raise
×
731
        if blocking:
×
732
            await self.wait(tx, timeout=None)
×
733

734
    def broadcast(self, tx):
3✔
735
        # broadcast can't be a retriable call yet
736
        return self.network.broadcast(hexlify(tx.raw).decode())
×
737

738
    async def wait(self, tx: Transaction, height=-1, timeout=1):
3✔
739
        timeout = timeout or 600  # after 10 minutes there is almost 0 hope
×
740
        addresses = set()
×
741
        for txi in tx.inputs:
×
742
            if txi.txo_ref.txo is not None:
×
743
                addresses.add(
×
744
                    self.hash160_to_address(txi.txo_ref.txo.pubkey_hash)
745
                )
746
        for txo in tx.outputs:
×
747
            if txo.is_pubkey_hash:
×
748
                addresses.add(self.hash160_to_address(txo.pubkey_hash))
×
749
            elif txo.is_script_hash:
×
750
                addresses.add(self.hash160_to_script_address(txo.script_hash))
×
751
        start = int(time.perf_counter())
×
752
        while timeout and (int(time.perf_counter()) - start) <= timeout:
×
753
            if await self._wait_round(tx, height, addresses):
×
754
                return
×
755
        raise asyncio.TimeoutError(f'Timed out waiting for transaction. {tx.id}')
×
756

757
    async def _wait_round(self, tx: Transaction, height: int, addresses: Iterable[str]):
3✔
758
        records = await self.db.get_addresses(address__in=addresses)
×
759
        _, pending = await asyncio.wait([
×
760
            self.on_transaction.where(partial(
761
                lambda a, e: a == e.address and e.tx.height >= height and e.tx.id == tx.id,
762
                address_record['address']
763
            )) for address_record in records
764
        ], timeout=1)
765
        if not pending:
×
766
            return True
×
767
        records = await self.db.get_addresses(address__in=addresses)
×
768
        for record in records:
×
769
            local_history = (await self.get_local_status_and_history(
×
770
                record['address'], history=record['history']
771
            ))[1] if record['history'] else []
772
            for txid, local_height in local_history:
×
773
                if txid == tx.id:
×
774
                    if local_height >= height or (local_height == 0 and height > local_height):
×
775
                        return True
×
776
                    log.warning(
×
777
                        "local history has higher height than remote for %s (%i vs %i)", txid,
778
                        local_height, height
779
                    )
780
                    return False
×
781
            log.warning(
×
782
                "local history does not contain %s, requested height %i", tx.id, height
783
            )
784
        return False
×
785

786
    async def _inflate_outputs(
3✔
787
            self, query, accounts,
788
            include_purchase_receipt=False,
789
            include_is_my_output=False,
790
            include_sent_supports=False,
791
            include_sent_tips=False,
792
            include_received_tips=False) -> Tuple[List[Output], dict, int, int]:
793
        encoded_outputs = await query
×
794
        outputs = Outputs.from_base64(encoded_outputs or '')  # TODO: why is the server returning None?
×
795
        txs: List[Transaction] = []
×
796
        if len(outputs.txs) > 0:
×
797
            async for tx in self.request_transactions(tuple(outputs.txs), cached=True):
×
798
                txs.extend(tx.values())
×
799

800
        _txos, blocked = outputs.inflate(txs)
×
801

802
        txos = []
×
803
        for txo in _txos:
×
804
            if isinstance(txo, Output):
×
805
                # transactions and outputs are cached and shared between wallets
806
                # we don't want to leak informaion between wallet so we add the
807
                # wallet specific metadata on throw away copies of the txos
808
                txo = copy.copy(txo)
×
809
                channel = txo.channel
×
810
                txo.purchase_receipt = None
×
811
                txo.update_annotations(None)
×
812
                txo.channel = channel
×
813
            txos.append(txo)
×
814

815
        includes = (
×
816
            include_purchase_receipt, include_is_my_output,
817
            include_sent_supports, include_sent_tips
818
        )
819
        if accounts and any(includes):
×
820
            receipts = {}
×
821
            if include_purchase_receipt:
×
822
                priced_claims = []
×
823
                for txo in txos:
×
824
                    if isinstance(txo, Output) and txo.has_price:
×
825
                        priced_claims.append(txo)
×
826
                if priced_claims:
×
827
                    receipts = {
×
828
                        txo.purchased_claim_id: txo for txo in
829
                        await self.db.get_purchases(
830
                            accounts=accounts,
831
                            purchased_claim_id__in=[c.claim_id for c in priced_claims]
832
                        )
833
                    }
834
            for txo in txos:
×
835
                if isinstance(txo, Output) and txo.can_decode_claim:
×
836
                    if include_purchase_receipt:
×
837
                        txo.purchase_receipt = receipts.get(txo.claim_id)
×
838
                    if include_is_my_output:
×
839
                        mine = await self.db.get_txo_count(
×
840
                            claim_id=txo.claim_id, txo_type__in=CLAIM_TYPES, is_my_output=True,
841
                            is_spent=False, accounts=accounts
842
                        )
843
                        if mine:
×
844
                            txo.is_my_output = True
×
845
                        else:
846
                            txo.is_my_output = False
×
847
                    if include_sent_supports:
×
848
                        supports = await self.db.get_txo_sum(
×
849
                            claim_id=txo.claim_id, txo_type=TXO_TYPES['support'],
850
                            is_my_input=True, is_my_output=True,
851
                            is_spent=False, accounts=accounts
852
                        )
853
                        txo.sent_supports = supports
×
854
                    if include_sent_tips:
×
855
                        tips = await self.db.get_txo_sum(
×
856
                            claim_id=txo.claim_id, txo_type=TXO_TYPES['support'],
857
                            is_my_input=True, is_my_output=False,
858
                            accounts=accounts
859
                        )
860
                        txo.sent_tips = tips
×
861
                    if include_received_tips:
×
862
                        tips = await self.db.get_txo_sum(
×
863
                            claim_id=txo.claim_id, txo_type=TXO_TYPES['support'],
864
                            is_my_input=False, is_my_output=True,
865
                            accounts=accounts
866
                        )
867
                        txo.received_tips = tips
×
868
        return txos, blocked, outputs.offset, outputs.total
×
869

870
    async def resolve(self, accounts, urls, **kwargs):
3✔
871
        txos = []
×
872
        urls_copy = list(urls)
×
873
        resolve = partial(self.network.retriable_call, self.network.resolve)
×
874
        while urls_copy:
×
875
            batch, urls_copy = urls_copy[:100], urls_copy[100:]
×
876
            txos.extend(
×
877
                (await self._inflate_outputs(
878
                    resolve(batch), accounts, **kwargs
879
                ))[0]
880
            )
881

882
        assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received."
×
883
        result = {}
×
884
        for url, txo in zip(urls, txos):
×
885
            if txo:
×
886
                if isinstance(txo, Output) and URL.parse(url).has_stream_in_channel:
×
887
                    if not txo.channel or not txo.is_signed_by(txo.channel, self):
×
888
                        txo = {'error': {'name': INVALID, 'text': f'{url} has invalid channel signature'}}
×
889
            else:
890
                txo = {'error': {'name': NOT_FOUND, 'text': f'{url} did not resolve to a claim'}}
×
891
            result[url] = txo
×
892
        return result
×
893

894
    async def sum_supports(self, new_sdk_server, **kwargs) -> List[Dict]:
3✔
895
        return await self.network.sum_supports(new_sdk_server, **kwargs)
×
896

897
    async def claim_search(
3✔
898
            self, accounts,
899
            include_purchase_receipt=False,
900
            include_is_my_output=False,
901
            **kwargs) -> Tuple[List[Output], dict, int, int]:
902
        return await self._inflate_outputs(
×
903
            self.network.claim_search(**kwargs), accounts,
904
            include_purchase_receipt=include_purchase_receipt,
905
            include_is_my_output=include_is_my_output
906
        )
907

908
    # async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output:
909
    #     return await self.network.get_claim_by_id(claim_id)
910

911
    async def get_claim_by_claim_id(self, claim_id, accounts=None, include_purchase_receipt=False,
3✔
912
                                    include_is_my_output=False):
913
        accounts = accounts or []
×
914
        # return await self.network.get_claim_by_id(claim_id)
915
        inflated = await self._inflate_outputs(
×
916
            self.network.get_claim_by_id(claim_id), accounts,
917
            include_purchase_receipt=include_purchase_receipt,
918
            include_is_my_output=include_is_my_output,
919
        )
920
        txos = inflated[0]
×
921
        if txos:
×
922
            return txos[0]
×
923

924
    async def _report_state(self):
3✔
925
        try:
×
926
            for account in self.accounts:
×
927
                balance = dewies_to_lbc(await account.get_balance(include_claims=True))
×
928
                channel_count = await account.get_channel_count()
×
929
                claim_count = await account.get_claim_count()
×
930
                if isinstance(account.receiving, SingleKey):
×
931
                    log.info("Loaded single key account %s with %s LBC. "
×
932
                             "%d channels, %d certificates and %d claims",
933
                             account.id, balance, channel_count, len(account.channel_keys), claim_count)
934
                else:
935
                    total_receiving = len(await account.receiving.get_addresses())
×
936
                    total_change = len(await account.change.get_addresses())
×
937
                    log.info("Loaded account %s with %s LBC, %d receiving addresses (gap: %d), "
×
938
                             "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
939
                             account.id, balance, total_receiving, account.receiving.gap, total_change,
940
                             account.change.gap, channel_count, len(account.channel_keys), claim_count)
941
        except Exception:
×
942
            log.exception(
×
943
                'Failed to display wallet state, please file issue '
944
                'for this bug along with the traceback you see below:')
945

946
    async def _reset_balance_cache(self, e: TransactionEvent):
3✔
947
        account_ids = [
×
948
            r['account'] for r in await self.db.get_addresses(('account',), address=e.address)
949
        ]
950
        for account_id in account_ids:
×
951
            if account_id in self._balance_cache:
×
952
                del self._balance_cache[account_id]
×
953

954
    @staticmethod
3✔
955
    def constraint_spending_utxos(constraints):
3✔
956
        constraints['txo_type__in'] = (0, TXO_TYPES['purchase'])
1✔
957

958
    async def get_purchases(self, resolve=False, **constraints):
3✔
959
        purchases = await self.db.get_purchases(**constraints)
×
960
        if resolve:
×
961
            claim_ids = [p.purchased_claim_id for p in purchases]
×
962
            try:
×
963
                resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
×
964
            except Exception:
×
965
                log.exception("Resolve failed while looking up purchased claim ids:")
×
966
                resolved = []
×
967
            lookup = {claim.claim_id: claim for claim in resolved}
×
968
            for purchase in purchases:
×
969
                purchase.purchased_claim = lookup.get(purchase.purchased_claim_id)
×
970
        return purchases
×
971

972
    def get_purchase_count(self, resolve=False, **constraints):
3✔
973
        return self.db.get_purchase_count(**constraints)
×
974

975
    async def _resolve_for_local_results(self, accounts, txos):
3✔
976
        txos = await self._resolve_for_local_claim_results(accounts, txos)
×
977
        txos = await self._resolve_for_local_support_results(accounts, txos)
×
978
        return txos
×
979

980
    async def _resolve_for_local_claim_results(self, accounts, txos):
3✔
981
        results = []
×
982
        response = await self.resolve(
×
983
            accounts, [txo.permanent_url for txo in txos if txo.can_decode_claim]
984
        )
985
        for txo in txos:
×
986
            resolved = response.get(txo.permanent_url) if txo.can_decode_claim else None
×
987
            if isinstance(resolved, Output):
×
988
                resolved.update_annotations(txo)
×
989
                results.append(resolved)
×
990
            else:
991
                if isinstance(resolved, dict) and 'error' in resolved:
×
992
                    txo.meta['error'] = resolved['error']
×
993
                results.append(txo)
×
994
        return results
×
995

996
    async def _resolve_for_local_support_results(self, accounts, txos):
3✔
997
        channel_ids = set()
×
998
        signed_support_txos = []
×
999
        for txo in txos:
×
1000
            support = txo.can_decode_support
×
1001
            if support and support.signing_channel_id:
×
1002
                channel_ids.add(support.signing_channel_id)
×
1003
                signed_support_txos.append(txo)
×
1004
        if channel_ids:
×
1005
            channels = {
×
1006
                channel.claim_id: channel for channel in
1007
                (await self.claim_search(accounts, claim_ids=list(channel_ids)))[0]
1008
            }
1009
            for txo in signed_support_txos:
×
1010
                txo.channel = channels.get(txo.support.signing_channel_id)
×
1011
        return txos
×
1012

1013
    async def get_claims(self, resolve=False, **constraints):
3✔
1014
        claims = await self.db.get_claims(**constraints)
×
1015
        if resolve:
×
1016
            return await self._resolve_for_local_results(constraints.get('accounts', []), claims)
×
1017
        return claims
×
1018

1019
    def get_claim_count(self, **constraints):
3✔
1020
        return self.db.get_claim_count(**constraints)
×
1021

1022
    async def get_streams(self, resolve=False, **constraints):
3✔
1023
        streams = await self.db.get_streams(**constraints)
×
1024
        if resolve:
×
1025
            return await self._resolve_for_local_results(constraints.get('accounts', []), streams)
×
1026
        return streams
×
1027

1028
    def get_stream_count(self, **constraints):
3✔
1029
        return self.db.get_stream_count(**constraints)
×
1030

1031
    async def get_channels(self, resolve=False, **constraints):
3✔
1032
        channels = await self.db.get_channels(**constraints)
×
1033
        if resolve:
×
1034
            return await self._resolve_for_local_results(constraints.get('accounts', []), channels)
×
1035
        return channels
×
1036

1037
    def get_channel_count(self, **constraints):
3✔
1038
        return self.db.get_channel_count(**constraints)
×
1039

1040
    async def resolve_collection(self, collection, offset=0, page_size=1):
3✔
1041
        claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
×
1042
        try:
×
1043
            resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
×
1044
        except Exception:
×
1045
            log.exception("Resolve failed while looking up collection claim ids:")
×
1046
            return []
×
1047
        claims = []
×
1048
        for claim_id in claim_ids:
×
1049
            found = False
×
1050
            for txo in resolve_results:
×
1051
                if txo.claim_id == claim_id:
×
1052
                    claims.append(txo)
×
1053
                    found = True
×
1054
                    break
×
1055
            if not found:
×
1056
                claims.append(None)
×
1057
        return claims
×
1058

1059
    async def get_collections(self, resolve_claims=0, resolve=False, **constraints):
3✔
1060
        collections = await self.db.get_collections(**constraints)
×
1061
        if resolve:
×
1062
            collections = await self._resolve_for_local_results(constraints.get('accounts', []), collections)
×
1063
        if resolve_claims > 0:
×
1064
            for collection in collections:
×
1065
                collection.claims = await self.resolve_collection(collection, page_size=resolve_claims)
×
1066
        return collections
×
1067

1068
    def get_collection_count(self, resolve_claims=0, **constraints):
3✔
1069
        return self.db.get_collection_count(**constraints)
×
1070

1071
    def get_supports(self, **constraints):
3✔
1072
        return self.db.get_supports(**constraints)
×
1073

1074
    def get_support_count(self, **constraints):
3✔
1075
        return self.db.get_support_count(**constraints)
×
1076

1077
    async def get_transaction_history(self, read_only=False, **constraints):
3✔
1078
        txs: List[Transaction] = await self.db.get_transactions(
×
1079
            include_is_my_output=True, include_is_spent=True,
1080
            read_only=read_only, **constraints
1081
        )
1082
        headers = self.headers
×
1083
        history = []
×
1084
        for tx in txs:  # pylint: disable=too-many-nested-blocks
×
1085
            ts = headers.estimated_timestamp(tx.height)
×
1086
            item = {
×
1087
                'txid': tx.id,
1088
                'timestamp': ts,
1089
                'date': datetime.fromtimestamp(ts).isoformat(' ')[:-3] if tx.height > 0 else None,
1090
                'confirmations': (headers.height + 1) - tx.height if tx.height > 0 else 0,
1091
                'claim_info': [],
1092
                'update_info': [],
1093
                'support_info': [],
1094
                'abandon_info': [],
1095
                'purchase_info': []
1096
            }
1097
            is_my_inputs = all(txi.is_my_input for txi in tx.inputs)
×
1098
            if is_my_inputs:
×
1099
                # fees only matter if we are the ones paying them
1100
                item['value'] = dewies_to_lbc(tx.net_account_balance + tx.fee)
×
1101
                item['fee'] = dewies_to_lbc(-tx.fee)
×
1102
            else:
1103
                # someone else paid the fees
1104
                item['value'] = dewies_to_lbc(tx.net_account_balance)
×
1105
                item['fee'] = '0.0'
×
1106
            for txo in tx.my_claim_outputs:
×
1107
                item['claim_info'].append({
×
1108
                    'address': txo.get_address(self),
1109
                    'balance_delta': dewies_to_lbc(-txo.amount),
1110
                    'amount': dewies_to_lbc(txo.amount),
1111
                    'claim_id': txo.claim_id,
1112
                    'claim_name': txo.claim_name,
1113
                    'nout': txo.position,
1114
                    'is_spent': txo.is_spent,
1115
                })
1116
            for txo in tx.my_update_outputs:
×
1117
                if is_my_inputs:  # updating my own claim
×
1118
                    previous = None
×
1119
                    for txi in tx.inputs:
×
1120
                        if txi.txo_ref.txo is not None:
×
1121
                            other_txo = txi.txo_ref.txo
×
1122
                            if (other_txo.is_claim or other_txo.script.is_support_claim) \
×
1123
                                and other_txo.claim_id == txo.claim_id:
1124
                                previous = other_txo
×
1125
                                break
×
1126
                    if previous is not None:
×
1127
                        item['update_info'].append({
×
1128
                            'address': txo.get_address(self),
1129
                            'balance_delta': dewies_to_lbc(previous.amount - txo.amount),
1130
                            'amount': dewies_to_lbc(txo.amount),
1131
                            'claim_id': txo.claim_id,
1132
                            'claim_name': txo.claim_name,
1133
                            'nout': txo.position,
1134
                            'is_spent': txo.is_spent,
1135
                        })
1136
                else:  # someone sent us their claim
1137
                    item['update_info'].append({
×
1138
                        'address': txo.get_address(self),
1139
                        'balance_delta': dewies_to_lbc(0),
1140
                        'amount': dewies_to_lbc(txo.amount),
1141
                        'claim_id': txo.claim_id,
1142
                        'claim_name': txo.claim_name,
1143
                        'nout': txo.position,
1144
                        'is_spent': txo.is_spent,
1145
                    })
1146
            for txo in tx.my_support_outputs:
×
1147
                item['support_info'].append({
×
1148
                    'address': txo.get_address(self),
1149
                    'balance_delta': dewies_to_lbc(txo.amount if not is_my_inputs else -txo.amount),
1150
                    'amount': dewies_to_lbc(txo.amount),
1151
                    'claim_id': txo.claim_id,
1152
                    'claim_name': txo.claim_name,
1153
                    'is_tip': not is_my_inputs,
1154
                    'nout': txo.position,
1155
                    'is_spent': txo.is_spent,
1156
                })
1157
            if is_my_inputs:
×
1158
                for txo in tx.other_support_outputs:
×
1159
                    item['support_info'].append({
×
1160
                        'address': txo.get_address(self),
1161
                        'balance_delta': dewies_to_lbc(-txo.amount),
1162
                        'amount': dewies_to_lbc(txo.amount),
1163
                        'claim_id': txo.claim_id,
1164
                        'claim_name': txo.claim_name,
1165
                        'is_tip': is_my_inputs,
1166
                        'nout': txo.position,
1167
                        'is_spent': txo.is_spent,
1168
                    })
1169
            for txo in tx.my_abandon_outputs:
×
1170
                item['abandon_info'].append({
×
1171
                    'address': txo.get_address(self),
1172
                    'balance_delta': dewies_to_lbc(txo.amount),
1173
                    'amount': dewies_to_lbc(txo.amount),
1174
                    'claim_id': txo.claim_id,
1175
                    'claim_name': txo.claim_name,
1176
                    'nout': txo.position
1177
                })
1178
            for txo in tx.any_purchase_outputs:
×
1179
                item['purchase_info'].append({
×
1180
                    'address': txo.get_address(self),
1181
                    'balance_delta': dewies_to_lbc(txo.amount if not is_my_inputs else -txo.amount),
1182
                    'amount': dewies_to_lbc(txo.amount),
1183
                    'claim_id': txo.purchased_claim_id,
1184
                    'nout': txo.position,
1185
                    'is_spent': txo.is_spent,
1186
                })
1187
            history.append(item)
×
1188
        return history
×
1189

1190
    def get_transaction_history_count(self, read_only=False, **constraints):
3✔
1191
        return self.db.get_transaction_count(read_only=read_only, **constraints)
×
1192

1193
    async def get_detailed_balance(self, accounts, confirmations=0):
3✔
1194
        result = {
×
1195
            'total': 0,
1196
            'available': 0,
1197
            'reserved': 0,
1198
            'reserved_subtotals': {
1199
                'claims': 0,
1200
                'supports': 0,
1201
                'tips': 0
1202
            }
1203
        }
1204
        for account in accounts:
×
1205
            balance = self._balance_cache.get(account.id)
×
1206
            if not balance:
×
1207
                balance = self._balance_cache[account.id] = \
×
1208
                    await account.get_detailed_balance(confirmations)
1209
            for key, value in balance.items():
×
1210
                if key == 'reserved_subtotals':
×
1211
                    for subkey, subvalue in value.items():
×
1212
                        result['reserved_subtotals'][subkey] += subvalue
×
1213
                else:
1214
                    result[key] += value
×
1215
        return result
×
1216

1217

1218
class TestNetLedger(Ledger):
3✔
1219
    network_name = 'testnet'
3✔
1220
    pubkey_address_prefix = bytes((111,))
3✔
1221
    script_address_prefix = bytes((196,))
3✔
1222
    extended_public_key_prefix = unhexlify('043587cf')
3✔
1223
    extended_private_key_prefix = unhexlify('04358394')
3✔
1224
    checkpoints = {}
3✔
1225

1226

1227
class RegTestLedger(Ledger):
3✔
1228
    network_name = 'regtest'
3✔
1229
    headers_class = UnvalidatedHeaders
3✔
1230
    pubkey_address_prefix = bytes((111,))
3✔
1231
    script_address_prefix = bytes((196,))
3✔
1232
    extended_public_key_prefix = unhexlify('043587cf')
3✔
1233
    extended_private_key_prefix = unhexlify('04358394')
3✔
1234

1235
    max_target = 0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
3✔
1236
    genesis_hash = '6e3fcf1299d4ec5d79c3a4c91d624a4acf9e2e173d95a1a0504f677669687556'
3✔
1237
    genesis_bits = 0x207fffff
3✔
1238
    target_timespan = 1
3✔
1239
    checkpoints = {}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc