• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.8
/lbry/testcase.py
1
import os
1✔
2
import sys
1✔
3
import json
1✔
4
import shutil
1✔
5
import logging
1✔
6
import tempfile
1✔
7
import functools
1✔
8
import asyncio
1✔
9
from asyncio.runners import _cancel_all_tasks  # type: ignore
1✔
10
import unittest
1✔
11
from unittest.case import _Outcome
1✔
12
from typing import Optional
1✔
13
from time import time
1✔
14
from binascii import unhexlify
1✔
15
from functools import partial
1✔
16

17
from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction
1✔
18
from lbry.conf import Config
1✔
19
from lbry.wallet.util import satoshis_to_coins
1✔
20
from lbry.wallet.dewies import lbc_to_dewies
1✔
21
from lbry.wallet.orchstr8 import Conductor
1✔
22
from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode
1✔
23
from lbry.schema.claim import Claim
1✔
24

25
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
1✔
26
from lbry.extras.daemon.components import Component, WalletComponent
1✔
27
from lbry.extras.daemon.components import (
1✔
28
    DHT_COMPONENT,
29
    HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
30
    UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, LIBTORRENT_COMPONENT
31
)
32
from lbry.extras.daemon.componentmanager import ComponentManager
1✔
33
from lbry.extras.daemon.exchange_rate_manager import (
1✔
34
    ExchangeRateManager, ExchangeRate, BittrexBTCFeed, BittrexUSDFeed
35
)
36
from lbry.extras.daemon.storage import SQLiteStorage
1✔
37
from lbry.blob.blob_manager import BlobManager
1✔
38
from lbry.stream.reflector.server import ReflectorServer
1✔
39
from lbry.blob_exchange.server import BlobServer
1✔
40

41

42
class ColorHandler(logging.StreamHandler):
1✔
43

44
    level_color = {
1✔
45
        logging.DEBUG: "black",
46
        logging.INFO: "light_gray",
47
        logging.WARNING: "yellow",
48
        logging.ERROR: "red"
49
    }
50

51
    color_code = dict(
1✔
52
        black=30,
53
        red=31,
54
        green=32,
55
        yellow=33,
56
        blue=34,
57
        magenta=35,
58
        cyan=36,
59
        white=37,
60
        light_gray='0;37',
61
        dark_gray='1;30'
62
    )
63

64
    def emit(self, record):
1✔
65
        try:
1✔
66
            msg = self.format(record)
1✔
67
            color_name = self.level_color.get(record.levelno, "black")
1✔
68
            color_code = self.color_code[color_name]
1✔
69
            stream = self.stream
1✔
70
            stream.write(f'\x1b[{color_code}m{msg}\x1b[0m')
1✔
71
            stream.write(self.terminator)
1✔
72
            self.flush()
1✔
73
        except Exception:
×
74
            self.handleError(record)
×
75

76

77
HANDLER = ColorHandler(sys.stdout)
1✔
78
HANDLER.setFormatter(
1✔
79
    logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
80
)
81
logging.getLogger().addHandler(HANDLER)
1✔
82

83

84
class AsyncioTestCase(unittest.TestCase):
1✔
85
    # Implementation inspired by discussion:
86
    #  https://bugs.python.org/issue32972
87

88
    LOOP_SLOW_CALLBACK_DURATION = 0.2
1✔
89
    TIMEOUT = 120.0
1✔
90

91
    maxDiff = None
1✔
92

93
    async def asyncSetUp(self):  # pylint: disable=C0103
1✔
94
        pass
1✔
95

96
    async def asyncTearDown(self):  # pylint: disable=C0103
1✔
97
        pass
1✔
98

99
    def run(self, result=None):  # pylint: disable=R0915
1✔
100
        orig_result = result
1✔
101
        if result is None:
1!
102
            result = self.defaultTestResult()
×
103
            startTestRun = getattr(result, 'startTestRun', None)  # pylint: disable=C0103
×
104
            if startTestRun is not None:
×
105
                startTestRun()
×
106

107
        result.startTest(self)
1✔
108

109
        testMethod = getattr(self, self._testMethodName)  # pylint: disable=C0103
1✔
110
        if (getattr(self.__class__, "__unittest_skip__", False) or
1!
111
                getattr(testMethod, "__unittest_skip__", False)):
112
            # If the class or method was skipped.
113
            try:
×
114
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
×
115
                            or getattr(testMethod, '__unittest_skip_why__', ''))
116
                self._addSkip(result, self, skip_why)
×
117
            finally:
118
                result.stopTest(self)
×
119
            return
×
120
        expecting_failure_method = getattr(testMethod,
1✔
121
                                           "__unittest_expecting_failure__", False)
122
        expecting_failure_class = getattr(self,
1✔
123
                                          "__unittest_expecting_failure__", False)
124
        expecting_failure = expecting_failure_class or expecting_failure_method
1✔
125
        outcome = _Outcome(result)
1✔
126

127
        self.loop = asyncio.new_event_loop()  # pylint: disable=W0201
1✔
128
        asyncio.set_event_loop(self.loop)
1✔
129
        self.loop.set_debug(True)
1✔
130
        self.loop.slow_callback_duration = self.LOOP_SLOW_CALLBACK_DURATION
1✔
131

132
        try:
1✔
133
            self._outcome = outcome
1✔
134

135
            with outcome.testPartExecutor(self):
1✔
136
                self.setUp()
1✔
137
                self.add_timeout()
1✔
138
                self.loop.run_until_complete(self.asyncSetUp())
1✔
139
            if outcome.success:
1!
140
                outcome.expecting_failure = expecting_failure
1✔
141
                with outcome.testPartExecutor(self, isTest=True):
1✔
142
                    maybe_coroutine = testMethod()
1✔
143
                    if asyncio.iscoroutine(maybe_coroutine):
1✔
144
                        self.add_timeout()
1✔
145
                        self.loop.run_until_complete(maybe_coroutine)
1✔
146
                outcome.expecting_failure = False
1✔
147
                with outcome.testPartExecutor(self):
1✔
148
                    self.add_timeout()
1✔
149
                    self.loop.run_until_complete(self.asyncTearDown())
1✔
150
                    self.tearDown()
1✔
151

152
            self.doAsyncCleanups()
1✔
153

154
            try:
1✔
155
                _cancel_all_tasks(self.loop)
1✔
156
                self.loop.run_until_complete(self.loop.shutdown_asyncgens())
1✔
157
            finally:
158
                asyncio.set_event_loop(None)
1✔
159
                self.loop.close()
1✔
160

161
            for test, reason in outcome.skipped:
1!
162
                self._addSkip(result, test, reason)
×
163
            self._feedErrorsToResult(result, outcome.errors)
1✔
164
            if outcome.success:
1!
165
                if expecting_failure:
1!
166
                    if outcome.expectedFailure:
×
167
                        self._addExpectedFailure(result, outcome.expectedFailure)
×
168
                    else:
169
                        self._addUnexpectedSuccess(result)
×
170
                else:
171
                    result.addSuccess(self)
1✔
172
            return result
1✔
173
        finally:
174
            result.stopTest(self)
1✔
175
            if orig_result is None:
1!
176
                stopTestRun = getattr(result, 'stopTestRun', None)  # pylint: disable=C0103
×
177
                if stopTestRun is not None:
×
178
                    stopTestRun()  # pylint: disable=E1102
×
179

180
            # explicitly break reference cycles:
181
            # outcome.errors -> frame -> outcome -> outcome.errors
182
            # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
183
            outcome.errors.clear()
1✔
184
            outcome.expectedFailure = None
1✔
185

186
            # clear the outcome, no more needed
187
            self._outcome = None
1✔
188

189
    def doAsyncCleanups(self):  # pylint: disable=C0103
1✔
190
        outcome = self._outcome or _Outcome()
1✔
191
        while self._cleanups:
1✔
192
            function, args, kwargs = self._cleanups.pop()
1✔
193
            with outcome.testPartExecutor(self):
1✔
194
                maybe_coroutine = function(*args, **kwargs)
1✔
195
                if asyncio.iscoroutine(maybe_coroutine):
1✔
196
                    self.add_timeout()
1✔
197
                    self.loop.run_until_complete(maybe_coroutine)
1✔
198

199
    def cancel(self):
1✔
200
        for task in asyncio.all_tasks(self.loop):
×
201
            if not task.done():
×
202
                task.print_stack()
×
203
                task.cancel()
×
204

205
    def add_timeout(self):
1✔
206
        if self.TIMEOUT:
1!
207
            self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
1✔
208

209
    def check_timeout(self, started):
1✔
210
        if time() - started >= self.TIMEOUT:
1!
211
            self.cancel()
×
212
        else:
213
            self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
1✔
214

215

216
class AdvanceTimeTestCase(AsyncioTestCase):
1✔
217

218
    async def asyncSetUp(self):
1✔
219
        self._time = 0  # pylint: disable=W0201
1✔
220
        self.loop.time = functools.wraps(self.loop.time)(lambda: self._time)
1✔
221
        await super().asyncSetUp()
1✔
222

223
    async def advance(self, seconds):
1✔
224
        while self.loop._ready:
1✔
225
            await asyncio.sleep(0)
1✔
226
        self._time += seconds
1✔
227
        await asyncio.sleep(0)
1✔
228
        while self.loop._ready:
1✔
229
            await asyncio.sleep(0)
1✔
230

231

232
class IntegrationTestCase(AsyncioTestCase):
1✔
233

234
    SEED = None
1✔
235

236
    def __init__(self, *args, **kwargs):
1✔
237
        super().__init__(*args, **kwargs)
×
238
        self.conductor: Optional[Conductor] = None
×
239
        self.blockchain: Optional[LBCWalletNode] = None
×
240
        self.wallet_node: Optional[WalletNode] = None
×
241
        self.manager: Optional[WalletManager] = None
×
242
        self.ledger: Optional[Ledger] = None
×
243
        self.wallet: Optional[Wallet] = None
×
244
        self.account: Optional[Account] = None
×
245

246
    async def asyncSetUp(self):
1✔
247
        self.conductor = Conductor(seed=self.SEED)
×
248
        await self.conductor.start_lbcd()
×
249
        self.addCleanup(self.conductor.stop_lbcd)
×
250
        await self.conductor.start_lbcwallet()
×
251
        self.addCleanup(self.conductor.stop_lbcwallet)
×
252
        await self.conductor.start_spv()
×
253
        self.addCleanup(self.conductor.stop_spv)
×
254
        await self.conductor.start_wallet()
×
255
        self.addCleanup(self.conductor.stop_wallet)
×
256
        self.blockchain = self.conductor.lbcwallet_node
×
257
        self.wallet_node = self.conductor.wallet_node
×
258
        self.manager = self.wallet_node.manager
×
259
        self.ledger = self.wallet_node.ledger
×
260
        self.wallet = self.wallet_node.wallet
×
261
        self.account = self.wallet_node.wallet.default_account
×
262

263
    async def assertBalance(self, account, expected_balance: str):  # pylint: disable=C0103
1✔
264
        balance = await account.get_balance()
×
265
        self.assertEqual(satoshis_to_coins(balance), expected_balance)
×
266

267
    def broadcast(self, tx):
1✔
268
        return self.ledger.broadcast(tx)
×
269

270
    async def broadcast_and_confirm(self, tx, ledger=None):
1✔
271
        ledger = ledger or self.ledger
×
272
        notifications = asyncio.create_task(ledger.wait(tx))
×
273
        await ledger.broadcast(tx)
×
274
        await notifications
×
275
        await self.generate_and_wait(1, [tx.id], ledger)
×
276

277
    async def on_header(self, height):
1✔
278
        if self.ledger.headers.height < height:
×
279
            await self.ledger.on_header.where(
×
280
                lambda e: e.height == height
281
            )
282
        return True
×
283

284
    async def send_to_address_and_wait(self, address, amount, blocks_to_generate=0, ledger=None):
1✔
285
        tx_watch = []
×
286
        txid = None
×
287
        done = False
×
288
        watcher = (ledger or self.ledger).on_transaction.where(
×
289
            lambda e: e.tx.id == txid or done or tx_watch.append(e.tx.id)
290
        )
291

292
        txid = await self.blockchain.send_to_address(address, amount)
×
293
        done = txid in tx_watch
×
294
        await watcher
×
295

296
        await self.generate_and_wait(blocks_to_generate, [txid], ledger)
×
297
        return txid
×
298

299
    async def generate_and_wait(self, blocks_to_generate, txids, ledger=None):
1✔
300
        if blocks_to_generate > 0:
×
301
            watcher = (ledger or self.ledger).on_transaction.where(
×
302
                lambda e: ((e.tx.id in txids and txids.remove(e.tx.id)), len(txids) <= 0)[-1]  # multi-statement lambda
303
            )
304
            await self.generate(blocks_to_generate)
×
305
            await watcher
×
306

307
    def on_address_update(self, address):
1✔
308
        return self.ledger.on_transaction.where(
×
309
            lambda e: e.address == address
310
        )
311

312
    def on_transaction_address(self, tx, address):
1✔
313
        return self.ledger.on_transaction.where(
×
314
            lambda e: e.tx.id == tx.id and e.address == address
315
        )
316

317
    async def generate(self, blocks):
1✔
318
        """ Ask lbrycrd to generate some blocks and wait until ledger has them. """
319
        prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
×
320
        self.conductor.spv_node.server.synchronized.clear()
×
321
        await self.blockchain.generate(blocks)
×
322
        height = self.blockchain.block_expected
×
323
        await prepare  # no guarantee that it didn't happen already, so start waiting from before calling generate
×
324
        while True:
×
325
            await self.conductor.spv_node.server.synchronized.wait()
×
326
            self.conductor.spv_node.server.synchronized.clear()
×
327
            if self.conductor.spv_node.server.db.db_height < height:
×
328
                continue
×
329
            if self.conductor.spv_node.server._es_height < height:
×
330
                continue
×
331
            break
×
332

333

334
class FakeExchangeRateManager(ExchangeRateManager):
1✔
335

336
    def __init__(self, market_feeds, rates):  # pylint: disable=super-init-not-called
1✔
337
        self.market_feeds = market_feeds
1✔
338
        for feed in self.market_feeds:
1✔
339
            feed.last_check = time()
1✔
340
            feed.rate = ExchangeRate(feed.market, rates[feed.market], time())
1✔
341

342
    def start(self):
1✔
343
        pass
×
344

345
    def stop(self):
1✔
346
        pass
×
347

348

349
def get_fake_exchange_rate_manager(rates=None):
1✔
350
    return FakeExchangeRateManager(
1✔
351
        [BittrexBTCFeed(), BittrexUSDFeed()],
352
        rates or {'BTCLBC': 3.0, 'USDLBC': 2.0}
353
    )
354

355

356
class ExchangeRateManagerComponent(Component):
1✔
357
    component_name = EXCHANGE_RATE_MANAGER_COMPONENT
1✔
358

359
    def __init__(self, component_manager, rates=None):
1✔
360
        super().__init__(component_manager)
×
361
        self.exchange_rate_manager = get_fake_exchange_rate_manager(rates)
×
362

363
    @property
1✔
364
    def component(self) -> ExchangeRateManager:
1✔
365
        return self.exchange_rate_manager
×
366

367
    async def start(self):
1✔
368
        self.exchange_rate_manager.start()
×
369

370
    async def stop(self):
1✔
371
        self.exchange_rate_manager.stop()
×
372

373

374
class CommandTestCase(IntegrationTestCase):
1✔
375

376
    VERBOSITY = logging.WARN
1✔
377
    blob_lru_cache_size = 0
1✔
378

379
    def __init__(self, *args, **kwargs):
1✔
380
        super().__init__(*args, **kwargs)
×
381
        self.daemon = None
×
382
        self.daemons = []
×
383
        self.server_config = None
×
384
        self.server_storage = None
×
385
        self.extra_wallet_nodes = []
×
386
        self.extra_wallet_node_port = 5280
×
387
        self.server_blob_manager = None
×
388
        self.server = None
×
389
        self.reflector = None
×
390
        self.skip_libtorrent = True
×
391

392
    async def asyncSetUp(self):
1✔
393

394
        logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
×
395
        logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
×
396
        logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
×
NEW
397
        logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY)
×
UNCOV
398
        logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
×
399

400
        await super().asyncSetUp()
×
401

402
        self.daemon = await self.add_daemon(self.wallet_node)
×
403

404
        await self.account.ensure_address_gap()
×
405
        address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
×
406
        await self.send_to_address_and_wait(address, 10, 6)
×
407

408
        server_tmp_dir = tempfile.mkdtemp()
×
409
        self.addCleanup(shutil.rmtree, server_tmp_dir)
×
410
        self.server_config = Config(
×
411
            data_dir=server_tmp_dir,
412
            wallet_dir=server_tmp_dir,
413
            save_files=True,
414
            download_dir=server_tmp_dir
415
        )
416
        self.server_config.transaction_cache_size = 10000
×
417
        self.server_storage = SQLiteStorage(self.server_config, ':memory:')
×
418
        await self.server_storage.open()
×
419

420
        self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_config)
×
421
        self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
×
422
        self.server.start_server(5567, '127.0.0.1')
×
423
        await self.server.started_listening.wait()
×
424

425
        self.reflector = ReflectorServer(self.server_blob_manager)
×
426
        self.reflector.start_server(5566, '127.0.0.1')
×
427
        await self.reflector.started_listening.wait()
×
428
        self.addCleanup(self.reflector.stop_server)
×
429

430
    async def asyncTearDown(self):
1✔
431
        await super().asyncTearDown()
×
432
        for wallet_node in self.extra_wallet_nodes:
×
433
            await wallet_node.stop(cleanup=True)
×
434
        for daemon in self.daemons:
×
435
            daemon.component_manager.get_component('wallet')._running = False
×
436
            await daemon.stop()
×
437

438
    async def add_daemon(self, wallet_node=None, seed=None):
1✔
439
        start_wallet_node = False
×
440
        if wallet_node is None:
×
441
            wallet_node = WalletNode(
×
442
                self.wallet_node.manager_class,
443
                self.wallet_node.ledger_class,
444
                port=self.extra_wallet_node_port
445
            )
446
            self.extra_wallet_node_port += 1
×
447
            start_wallet_node = True
×
448

449
        upload_dir = os.path.join(wallet_node.data_path, 'uploads')
×
450
        os.mkdir(upload_dir)
×
451

452
        conf = Config(
×
453
            # needed during instantiation to access known_hubs path
454
            data_dir=wallet_node.data_path,
455
            wallet_dir=wallet_node.data_path,
456
            save_files=True,
457
            download_dir=wallet_node.data_path
458
        )
459
        conf.upload_dir = upload_dir  # not a real conf setting
×
460
        conf.share_usage_data = False
×
461
        conf.use_upnp = False
×
462
        conf.reflect_streams = True
×
463
        conf.blockchain_name = 'lbrycrd_regtest'
×
464
        conf.lbryum_servers = [(self.conductor.spv_node.hostname, self.conductor.spv_node.port)]
×
465
        conf.reflector_servers = [('127.0.0.1', 5566)]
×
466
        conf.fixed_peers = [('127.0.0.1', 5567)]
×
467
        conf.known_dht_nodes = []
×
468
        conf.blob_lru_cache_size = self.blob_lru_cache_size
×
469
        conf.transaction_cache_size = 10000
×
470
        conf.components_to_skip = [
×
471
            DHT_COMPONENT, UPNP_COMPONENT, HASH_ANNOUNCER_COMPONENT,
472
            PEER_PROTOCOL_SERVER_COMPONENT
473
        ]
474
        if self.skip_libtorrent:
×
475
            conf.components_to_skip.append(LIBTORRENT_COMPONENT)
×
476

477
        if start_wallet_node:
×
478
            await wallet_node.start(self.conductor.spv_node, seed=seed, config=conf)
×
479
            self.extra_wallet_nodes.append(wallet_node)
×
480
        else:
481
            wallet_node.manager.config = conf
×
482
            wallet_node.manager.ledger.config['known_hubs'] = conf.known_hubs
×
483

484
        def wallet_maker(component_manager):
×
485
            wallet_component = WalletComponent(component_manager)
×
486
            wallet_component.wallet_manager = wallet_node.manager
×
487
            wallet_component._running = True
×
488
            return wallet_component
×
489

490
        daemon = Daemon(conf, ComponentManager(
×
491
            conf, skip_components=conf.components_to_skip, wallet=wallet_maker,
492
            exchange_rate_manager=partial(ExchangeRateManagerComponent, rates={
493
                'BTCLBC': 1.0, 'USDLBC': 2.0
494
            })
495
        ))
496
        await daemon.initialize()
×
497
        self.daemons.append(daemon)
×
498
        wallet_node.manager.old_db = daemon.storage
×
499
        return daemon
×
500

501
    async def confirm_tx(self, txid, ledger=None):
1✔
502
        """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
503
        # await (ledger or self.ledger).on_transaction.where(lambda e: e.tx.id == txid)
504
        on_tx = (ledger or self.ledger).on_transaction.where(lambda e: e.tx.id == txid)
×
505
        await asyncio.wait([self.generate(1), on_tx], timeout=5)
×
506

507
        # # actually, if it's in the mempool or in the block we're fine
508
        # await self.generate_and_wait(1, [txid], ledger=ledger)
509
        # return txid
510

511
        return txid
×
512

513
    async def on_transaction_dict(self, tx):
1✔
514
        await self.ledger.wait(Transaction(unhexlify(tx['hex'])))
×
515

516
    @staticmethod
1✔
517
    def get_all_addresses(tx):
518
        addresses = set()
×
519
        for txi in tx['inputs']:
×
520
            addresses.add(txi['address'])
×
521
        for txo in tx['outputs']:
×
522
            addresses.add(txo['address'])
×
523
        return list(addresses)
×
524

525
    async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
1✔
526
        txid = await self.blockchain._cli_cmnd('claimname', name, value, amount)
×
527
        if confirm:
×
528
            await self.generate(1)
×
529
        return txid
×
530

531
    async def blockchain_update_name(self, txid: str, value: str, amount: str, confirm=True):
1✔
532
        txid = await self.blockchain._cli_cmnd('updateclaim', txid, value, amount)
×
533
        if confirm:
×
534
            await self.generate(1)
×
535
        return txid
×
536

537
    async def out(self, awaitable):
1✔
538
        """ Serializes lbrynet API results to JSON then loads and returns it as dictionary. """
539
        return json.loads(jsonrpc_dumps_pretty(await awaitable, ledger=self.ledger))['result']
×
540

541
    def sout(self, value):
1✔
542
        """ Synchronous version of `out` method. """
543
        return json.loads(jsonrpc_dumps_pretty(value, ledger=self.ledger))['result']
×
544

545
    async def confirm_and_render(self, awaitable, confirm, return_tx=False) -> Transaction:
1✔
546
        tx = await awaitable
×
547
        if confirm:
×
548
            await self.ledger.wait(tx)
×
549
            await self.generate(1)
×
550
            await self.ledger.wait(tx, self.blockchain.block_expected)
×
551
        if not return_tx:
×
552
            return self.sout(tx)
×
553
        return tx
×
554

555
    async def create_nondeterministic_channel(self, name, price, pubkey_bytes, daemon=None, blocking=False):
1✔
556
        account = (daemon or self.daemon).wallet_manager.default_account
×
557
        claim_address = await account.receiving.get_or_create_usable_address()
×
558
        claim = Claim()
×
559
        claim.channel.public_key_bytes = pubkey_bytes
×
560
        tx = await Transaction.claim_create(
×
561
            name, claim, lbc_to_dewies(price),
562
            claim_address, [self.account], self.account
563
        )
564
        await tx.sign([self.account])
×
565
        await (daemon or self.daemon).broadcast_or_release(tx, blocking)
×
566
        return self.sout(tx)
×
567

568
    def create_upload_file(self, data, prefix=None, suffix=None):
1✔
569
        file_path = tempfile.mktemp(prefix=prefix or "tmp", suffix=suffix or "", dir=self.daemon.conf.upload_dir)
×
570
        with open(file_path, 'w+b') as file:
×
571
            file.write(data)
×
572
            file.flush()
×
573
            return file.name
×
574

575
    async def stream_create(
1✔
576
            self, name='hovercraft', bid='1.0', file_path=None,
577
            data=b'hi!', confirm=True, prefix=None, suffix=None, return_tx=False, **kwargs):
578
        if file_path is None and data is not None:
×
579
            file_path = self.create_upload_file(data=data, prefix=prefix, suffix=suffix)
×
580
        return await self.confirm_and_render(
×
581
            self.daemon.jsonrpc_stream_create(name, bid, file_path=file_path, **kwargs), confirm, return_tx
582
        )
583

584
    async def stream_update(
1✔
585
            self, claim_id, data=None, prefix=None, suffix=None, confirm=True, return_tx=False, **kwargs):
586
        if data is not None:
×
587
            file_path = self.create_upload_file(data=data, prefix=prefix, suffix=suffix)
×
588
            return await self.confirm_and_render(
×
589
                self.daemon.jsonrpc_stream_update(claim_id, file_path=file_path, **kwargs), confirm, return_tx
590
            )
591
        return await self.confirm_and_render(
×
592
            self.daemon.jsonrpc_stream_update(claim_id, **kwargs), confirm
593
        )
594

595
    async def stream_repost(self, claim_id, name='repost', bid='1.0', confirm=True, **kwargs):
1✔
596
        return await self.confirm_and_render(
×
597
            self.daemon.jsonrpc_stream_repost(claim_id=claim_id, name=name, bid=bid, **kwargs), confirm
598
        )
599

600
    async def stream_abandon(self, *args, confirm=True, **kwargs):
1✔
601
        if 'blocking' not in kwargs:
×
602
            kwargs['blocking'] = False
×
603
        return await self.confirm_and_render(
×
604
            self.daemon.jsonrpc_stream_abandon(*args, **kwargs), confirm
605
        )
606

607
    async def purchase_create(self, *args, confirm=True, **kwargs):
1✔
608
        return await self.confirm_and_render(
×
609
            self.daemon.jsonrpc_purchase_create(*args, **kwargs), confirm
610
        )
611

612
    async def publish(self, name, *args, confirm=True, **kwargs):
1✔
613
        return await self.confirm_and_render(
×
614
            self.daemon.jsonrpc_publish(name, *args, **kwargs), confirm
615
        )
616

617
    async def channel_create(self, name='@arena', bid='1.0', confirm=True, **kwargs):
1✔
618
        return await self.confirm_and_render(
×
619
            self.daemon.jsonrpc_channel_create(name, bid, **kwargs), confirm
620
        )
621

622
    async def channel_update(self, claim_id, confirm=True, **kwargs):
1✔
623
        return await self.confirm_and_render(
×
624
            self.daemon.jsonrpc_channel_update(claim_id, **kwargs), confirm
625
        )
626

627
    async def channel_abandon(self, *args, confirm=True, **kwargs):
1✔
628
        if 'blocking' not in kwargs:
×
629
            kwargs['blocking'] = False
×
630
        return await self.confirm_and_render(
×
631
            self.daemon.jsonrpc_channel_abandon(*args, **kwargs), confirm
632
        )
633

634
    async def collection_create(
1✔
635
            self, name='firstcollection', bid='1.0', confirm=True, **kwargs):
636
        return await self.confirm_and_render(
×
637
            self.daemon.jsonrpc_collection_create(name, bid, **kwargs), confirm
638
        )
639

640
    async def collection_update(
1✔
641
            self, claim_id, confirm=True, **kwargs):
642
        return await self.confirm_and_render(
×
643
            self.daemon.jsonrpc_collection_update(claim_id, **kwargs), confirm
644
        )
645

646
    async def collection_abandon(self, *args, confirm=True, **kwargs):
1✔
647
        if 'blocking' not in kwargs:
×
648
            kwargs['blocking'] = False
×
649
        return await self.confirm_and_render(
×
650
            self.daemon.jsonrpc_stream_abandon(*args, **kwargs), confirm
651
        )
652

653
    async def support_create(self, claim_id, bid='1.0', confirm=True, **kwargs):
1✔
654
        return await self.confirm_and_render(
×
655
            self.daemon.jsonrpc_support_create(claim_id, bid, **kwargs), confirm
656
        )
657

658
    async def support_abandon(self, *args, confirm=True, **kwargs):
1✔
659
        if 'blocking' not in kwargs:
×
660
            kwargs['blocking'] = False
×
661
        return await self.confirm_and_render(
×
662
            self.daemon.jsonrpc_support_abandon(*args, **kwargs), confirm
663
        )
664

665
    async def account_send(self, *args, confirm=True, **kwargs):
1✔
666
        return await self.confirm_and_render(
×
667
            self.daemon.jsonrpc_account_send(*args, **kwargs), confirm
668
        )
669

670
    async def wallet_send(self, *args, confirm=True, **kwargs):
1✔
671
        return await self.confirm_and_render(
×
672
            self.daemon.jsonrpc_wallet_send(*args, **kwargs), confirm
673
        )
674

675
    async def txo_spend(self, *args, confirm=True, **kwargs):
1✔
676
        txs = await self.daemon.jsonrpc_txo_spend(*args, **kwargs)
×
677
        if confirm:
×
678
            await asyncio.wait([self.ledger.wait(tx) for tx in txs])
×
679
            await self.generate(1)
×
680
            await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
×
681
        return self.sout(txs)
×
682

683
    async def blob_clean(self):
1✔
684
        return await self.out(self.daemon.jsonrpc_blob_clean())
×
685

686
    async def status(self):
1✔
687
        return await self.out(self.daemon.jsonrpc_status())
×
688

689
    async def resolve(self, uri, **kwargs):
1✔
690
        return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
×
691

692
    async def claim_search(self, **kwargs):
1✔
693
        return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items']
×
694

695
    async def get_claim_by_claim_id(self, claim_id):
1✔
696
        return await self.out(self.ledger.get_claim_by_claim_id(claim_id))
×
697

698
    async def file_list(self, *args, **kwargs):
1✔
699
        return (await self.out(self.daemon.jsonrpc_file_list(*args, **kwargs)))['items']
×
700

701
    async def txo_list(self, *args, **kwargs):
1✔
702
        return (await self.out(self.daemon.jsonrpc_txo_list(*args, **kwargs)))['items']
×
703

704
    async def txo_sum(self, *args, **kwargs):
1✔
705
        return await self.out(self.daemon.jsonrpc_txo_sum(*args, **kwargs))
×
706

707
    async def txo_plot(self, *args, **kwargs):
1✔
708
        return await self.out(self.daemon.jsonrpc_txo_plot(*args, **kwargs))
×
709

710
    async def claim_list(self, *args, **kwargs):
1✔
711
        return (await self.out(self.daemon.jsonrpc_claim_list(*args, **kwargs)))['items']
×
712

713
    async def stream_list(self, *args, **kwargs):
1✔
714
        return (await self.out(self.daemon.jsonrpc_stream_list(*args, **kwargs)))['items']
×
715

716
    async def channel_list(self, *args, **kwargs):
1✔
717
        return (await self.out(self.daemon.jsonrpc_channel_list(*args, **kwargs)))['items']
×
718

719
    async def transaction_list(self, *args, **kwargs):
1✔
720
        return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items']
×
721

722
    async def blob_list(self, *args, **kwargs):
1✔
723
        return (await self.out(self.daemon.jsonrpc_blob_list(*args, **kwargs)))['items']
×
724

725
    @staticmethod
1✔
726
    def get_claim_id(tx):
727
        return tx['outputs'][0]['claim_id']
×
728

729
    def assertItemCount(self, result, count):  # pylint: disable=invalid-name
1✔
730
        self.assertEqual(count, result['total_items'])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc