• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.1
/lbry/wallet/network.py
1
import logging
3✔
2
import asyncio
3✔
3
import json
3✔
4
import socket
3✔
5
import random
3✔
6
from time import perf_counter
3✔
7
from collections import defaultdict
3✔
8
from typing import Dict, Optional, Tuple
3✔
9
import aiohttp
3✔
10

11
from lbry import __version__
3✔
12
from lbry.utils import resolve_host
3✔
13
from lbry.error import IncompatibleWalletServerError
3✔
14
from lbry.wallet.rpc import RPCSession as BaseClientSession, Connector, RPCError, ProtocolError
3✔
15
from lbry.wallet.stream import StreamController
3✔
16
from lbry.wallet.udp import SPVStatusClientProtocol, SPVPong
3✔
17
from lbry.conf import KnownHubsList
3✔
18

19
log = logging.getLogger(__name__)
3✔
20

21

22
class ClientSession(BaseClientSession):
3✔
23
    def __init__(self, *args, network: 'Network', server, timeout=30, concurrency=32, **kwargs):
3✔
24
        self.network = network
1✔
25
        self.server = server
1✔
26
        super().__init__(*args, **kwargs)
1✔
27
        self.framer.max_size = self.max_errors = 1 << 32
1✔
28
        self.timeout = timeout
1✔
29
        self.max_seconds_idle = timeout * 2
1✔
30
        self.response_time: Optional[float] = None
1✔
31
        self.connection_latency: Optional[float] = None
1✔
32
        self._response_samples = 0
1✔
33
        self._concurrency = asyncio.Semaphore(concurrency)
1✔
34

35
    @property
3✔
36
    def concurrency(self):
3✔
37
        return self._concurrency._value
×
38

39
    @property
3✔
40
    def available(self):
3✔
41
        return not self.is_closing() and self.response_time is not None
×
42

43
    @property
3✔
44
    def server_address_and_port(self) -> Optional[Tuple[str, int]]:
3✔
45
        if not self.transport:
×
46
            return None
×
47
        return self.transport.get_extra_info('peername')
×
48

49
    async def send_timed_server_version_request(self, args=(), timeout=None):
3✔
50
        timeout = timeout or self.timeout
×
51
        log.debug("send version request to %s:%i", *self.server)
×
52
        start = perf_counter()
×
53
        result = await asyncio.wait_for(
×
54
            super().send_request('server.version', args), timeout=timeout
55
        )
56
        current_response_time = perf_counter() - start
×
57
        response_sum = (self.response_time or 0) * self._response_samples + current_response_time
×
58
        self.response_time = response_sum / (self._response_samples + 1)
×
59
        self._response_samples += 1
×
60
        return result
×
61

62
    async def send_request(self, method, args=()):
3✔
63
        log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
×
64
        try:
×
65
            await self._concurrency.acquire()
×
66
            if method == 'server.version':
×
67
                return await self.send_timed_server_version_request(args, self.timeout)
×
68
            request = asyncio.ensure_future(super().send_request(method, args))
×
69
            while not request.done():
×
70
                done, pending = await asyncio.wait([request], timeout=self.timeout)
×
71
                if pending:
×
72
                    log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received)
×
73
                    if (perf_counter() - self.last_packet_received) < self.timeout:
×
74
                        continue
×
75
                    log.warning("timeout sending %s to %s:%i", method, *self.server)
×
76
                    raise asyncio.TimeoutError
×
77
                if done:
×
78
                    try:
×
79
                        return request.result()
×
80
                    except ConnectionResetError:
×
81
                        log.error(
×
82
                            "wallet server (%s) reset connection upon our %s request, json of %i args is %i bytes",
83
                            self.server[0], method, len(args), len(json.dumps(args))
84
                        )
85
                        raise
×
86
        except (RPCError, ProtocolError) as e:
×
87
            log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s",
×
88
                        *self.server, *e.args)
89
            raise e
×
90
        except ConnectionError:
×
91
            log.warning("connection to %s:%i lost", *self.server)
×
92
            self.synchronous_close()
×
93
            raise
×
94
        except asyncio.CancelledError:
×
95
            log.warning("cancelled sending %s to %s:%i", method, *self.server)
×
96
            # self.synchronous_close()
97
            raise
×
98
        finally:
99
            self._concurrency.release()
×
100

101
    async def ensure_server_version(self, required=None, timeout=3):
3✔
102
        required = required or self.network.PROTOCOL_VERSION
×
103
        response = await asyncio.wait_for(
×
104
            self.send_request('server.version', [__version__, required]), timeout=timeout
105
        )
106
        if tuple(int(piece) for piece in response[0].split(".")) < self.network.MINIMUM_REQUIRED:
×
107
            raise IncompatibleWalletServerError(*self.server)
×
108
        return response
×
109

110
    async def keepalive_loop(self, timeout=3, max_idle=60):
3✔
111
        try:
×
112
            while True:
113
                now = perf_counter()
×
114
                if min(self.last_send, self.last_packet_received) + max_idle < now:
×
115
                    await asyncio.wait_for(
×
116
                        self.send_request('server.ping', []), timeout=timeout
117
                    )
118
                else:
119
                    await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
×
120
        except (Exception, asyncio.CancelledError) as err:
×
121
            if isinstance(err, asyncio.CancelledError):
×
122
                log.info("closing connection to %s:%i", *self.server)
×
123
            else:
124
                log.exception("lost connection to spv")
×
125
        finally:
126
            if not self.is_closing():
×
127
                self._close()
×
128

129
    async def create_connection(self, timeout=6):
3✔
130
        connector = Connector(lambda: self, *self.server)
×
131
        start = perf_counter()
×
132
        await asyncio.wait_for(connector.create_connection(), timeout=timeout)
×
133
        self.connection_latency = perf_counter() - start
×
134

135
    async def handle_request(self, request):
3✔
136
        controller = self.network.subscription_controllers[request.method]
×
137
        controller.add(request.args)
×
138

139
    def connection_lost(self, exc):
3✔
140
        log.debug("Connection lost: %s:%d", *self.server)
×
141
        super().connection_lost(exc)
×
142
        self.response_time = None
×
143
        self.connection_latency = None
×
144
        self._response_samples = 0
×
145
        # self._on_disconnect_controller.add(True)
146
        if self.network:
×
147
            self.network.disconnect()
×
148

149

150
class Network:
3✔
151

152
    PROTOCOL_VERSION = __version__
3✔
153
    MINIMUM_REQUIRED = (0, 65, 0)
3✔
154

155
    def __init__(self, ledger):
3✔
156
        self.ledger = ledger
1✔
157
        self.client: Optional[ClientSession] = None
1✔
158
        self.server_features = None
1✔
159
        # self._switch_task: Optional[asyncio.Task] = None
160
        self.running = False
1✔
161
        self.remote_height: int = 0
1✔
162

163
        self._on_connected_controller = StreamController()
1✔
164
        self.on_connected = self._on_connected_controller.stream
1✔
165

166
        self._on_header_controller = StreamController(merge_repeated_events=True)
1✔
167
        self.on_header = self._on_header_controller.stream
1✔
168

169
        self._on_status_controller = StreamController(merge_repeated_events=True)
1✔
170
        self.on_status = self._on_status_controller.stream
1✔
171

172
        self._on_hub_controller = StreamController(merge_repeated_events=True)
1✔
173
        self.on_hub = self._on_hub_controller.stream
1✔
174

175
        self.subscription_controllers = {
1✔
176
            'blockchain.headers.subscribe': self._on_header_controller,
177
            'blockchain.address.subscribe': self._on_status_controller,
178
            'blockchain.peers.subscribe': self._on_hub_controller,
179
        }
180

181
        self.aiohttp_session: Optional[aiohttp.ClientSession] = None
1✔
182
        self._urgent_need_reconnect = asyncio.Event()
1✔
183
        self._loop_task: Optional[asyncio.Task] = None
1✔
184
        self._keepalive_task: Optional[asyncio.Task] = None
1✔
185

186
    @property
3✔
187
    def config(self):
3✔
188
        return self.ledger.config
×
189

190
    @property
3✔
191
    def known_hubs(self):
3✔
192
        if 'known_hubs' not in self.config:
×
193
            return KnownHubsList()
×
194
        return self.config['known_hubs']
×
195

196
    @property
3✔
197
    def jurisdiction(self):
3✔
198
        return self.config.get("jurisdiction")
×
199

200
    def disconnect(self):
3✔
201
        if self._keepalive_task and not self._keepalive_task.done():
×
202
            self._keepalive_task.cancel()
×
203
        self._keepalive_task = None
×
204

205
    async def start(self):
3✔
206
        if not self.running:
×
207
            self.running = True
×
208
            self.aiohttp_session = aiohttp.ClientSession()
×
209
            self.on_header.listen(self._update_remote_height)
×
210
            self.on_hub.listen(self._update_hubs)
×
211
            self._loop_task = asyncio.create_task(self.network_loop())
×
212
            self._urgent_need_reconnect.set()
×
213

214
        def loop_task_done_callback(f):
×
215
            try:
×
216
                f.result()
×
217
            except (Exception, asyncio.CancelledError):
×
218
                if self.running:
×
219
                    log.exception("wallet server connection loop crashed")
×
220

221
        self._loop_task.add_done_callback(loop_task_done_callback)
×
222

223
    async def resolve_spv_dns(self):
3✔
224
        hostname_to_ip = {}
×
225
        ip_to_hostnames = defaultdict(list)
×
226

227
        async def resolve_spv(server, port):
×
228
            try:
×
229
                server_addr = await resolve_host(server, port, 'udp')
×
230
                hostname_to_ip[server] = (server_addr, port)
×
231
                ip_to_hostnames[(server_addr, port)].append(server)
×
232
            except socket.error:
×
233
                log.warning("error looking up dns for spv server %s:%i", server, port)
×
234
            except Exception:
×
235
                log.exception("error looking up dns for spv server %s:%i", server, port)
×
236

237
        # accumulate the dns results
238
        if self.config.get('explicit_servers', []):
×
239
            hubs = self.config['explicit_servers']
×
240
        elif self.known_hubs:
×
241
            hubs = self.known_hubs
×
242
        else:
243
            hubs = self.config['default_servers']
×
244
        await asyncio.gather(*(resolve_spv(server, port) for (server, port) in hubs))
×
245
        return hostname_to_ip, ip_to_hostnames
×
246

247
    async def get_n_fastest_spvs(self, timeout=3.0) -> Dict[Tuple[str, int], Optional[SPVPong]]:
3✔
248
        loop = asyncio.get_event_loop()
×
249
        pong_responses = asyncio.Queue()
×
250
        connection = SPVStatusClientProtocol(pong_responses)
×
251
        sent_ping_timestamps = {}
×
252
        _, ip_to_hostnames = await self.resolve_spv_dns()
×
253
        n = len(ip_to_hostnames)
×
254
        log.info("%i possible spv servers to try (%i urls in config)", n, len(self.config.get('explicit_servers', [])))
×
255
        pongs = {}
×
256
        known_hubs = self.known_hubs
×
257
        try:
×
258
            await loop.create_datagram_endpoint(lambda: connection, ('0.0.0.0', 0))
×
259
            # could raise OSError if it cant bind
260
            start = perf_counter()
×
261
            for server in ip_to_hostnames:
×
262
                connection.ping(server)
×
263
                sent_ping_timestamps[server] = perf_counter()
×
264
            while len(pongs) < n:
×
265
                (remote, ts), pong = await asyncio.wait_for(pong_responses.get(), timeout - (perf_counter() - start))
×
266
                latency = ts - start
×
267
                log.info("%s:%i has latency of %sms (available: %s, height: %i)",
×
268
                         '/'.join(ip_to_hostnames[remote]), remote[1], round(latency * 1000, 2),
269
                         pong.available, pong.height)
270

271
                known_hubs.hubs.setdefault((ip_to_hostnames[remote][0], remote[1]), {}).update(
×
272
                    {"country": pong.country_name}
273
                )
274
                if pong.available:
×
275
                    pongs[(ip_to_hostnames[remote][0], remote[1])] = pong
×
276
            return pongs
×
277
        except asyncio.TimeoutError:
×
278
            if pongs:
×
279
                log.info("%i/%i probed spv servers are accepting connections", len(pongs), len(ip_to_hostnames))
×
280
                return pongs
×
281
            else:
282
                log.warning("%i spv status probes failed, retrying later. servers tried: %s",
×
283
                            len(sent_ping_timestamps),
284
                            ', '.join('/'.join(hosts) + f' ({ip})' for ip, hosts in ip_to_hostnames.items()))
285
                random_server = random.choice(list(ip_to_hostnames.keys()))
×
286
                host, port = random_server
×
287
                log.warning("trying fallback to randomly selected spv: %s:%i", host, port)
×
288
                known_hubs.hubs.setdefault((host, port), {})
×
289
                return {(host, port): None}
×
290
        finally:
291
            connection.close()
×
292

293
    async def connect_to_fastest(self) -> Optional[ClientSession]:
3✔
294
        fastest_spvs = await self.get_n_fastest_spvs()
×
295
        for (host, port), pong in fastest_spvs.items():
×
296
            if (pong is not None and self.jurisdiction is not None) and \
×
297
                    (pong.country_name != self.jurisdiction):
298
                continue
×
299
            client = ClientSession(network=self, server=(host, port), timeout=self.config.get('hub_timeout', 30),
×
300
                                   concurrency=self.config.get('concurrent_hub_requests', 30))
301
            try:
×
302
                await client.create_connection()
×
303
                log.info("Connected to spv server %s:%i", host, port)
×
304
                await client.ensure_server_version()
×
305
                return client
×
306
            except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
×
307
                log.warning("Connecting to %s:%d failed", host, port)
×
308
                client._close()
×
309
        return
×
310

311
    async def network_loop(self):
3✔
312
        sleep_delay = 30
×
313
        while self.running:
×
314
            await asyncio.wait(
×
315
                map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
316
                return_when=asyncio.FIRST_COMPLETED
317
            )
318
            if self._urgent_need_reconnect.is_set():
×
319
                sleep_delay = 30
×
320
            self._urgent_need_reconnect.clear()
×
321
            if not self.is_connected:
×
322
                client = await self.connect_to_fastest()
×
323
                if not client:
×
324
                    log.warning("failed to connect to any spv servers, retrying later")
×
325
                    sleep_delay *= 2
×
326
                    sleep_delay = min(sleep_delay, 300)
×
327
                    continue
×
328
                log.debug("get spv server features %s:%i", *client.server)
×
329
                features = await client.send_request('server.features', [])
×
330
                self.client, self.server_features = client, features
×
331
                log.debug("discover other hubs %s:%i", *client.server)
×
332
                await self._update_hubs(await client.send_request('server.peers.subscribe', []))
×
333
                log.info("subscribe to headers %s:%i", *client.server)
×
334
                self._update_remote_height((await self.subscribe_headers(),))
×
335
                self._on_connected_controller.add(True)
×
336
                server_str = "%s:%i" % client.server
×
337
                log.info("maintaining connection to spv server %s", server_str)
×
338
                self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
×
339
                try:
×
340
                    if not self._urgent_need_reconnect.is_set():
×
341
                        await asyncio.wait(
×
342
                            [self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
343
                            return_when=asyncio.FIRST_COMPLETED
344
                        )
345
                    else:
346
                        await self._keepalive_task
×
347
                    if self._urgent_need_reconnect.is_set():
×
348
                        log.warning("urgent reconnect needed")
×
349
                    if self._keepalive_task and not self._keepalive_task.done():
×
350
                        self._keepalive_task.cancel()
×
351
                except asyncio.CancelledError:
×
352
                    pass
×
353
                finally:
354
                    self._keepalive_task = None
×
355
                    self.client = None
×
356
                    self.server_features = None
×
357
                    log.info("connection lost to %s", server_str)
×
358
        log.info("network loop finished")
×
359

360
    async def stop(self):
3✔
361
        self.running = False
×
362
        self.disconnect()
×
363
        if self._loop_task and not self._loop_task.done():
×
364
            self._loop_task.cancel()
×
365
        self._loop_task = None
×
366
        if self.aiohttp_session:
×
367
            await self.aiohttp_session.close()
×
368
            self.aiohttp_session = None
×
369

370
    @property
3✔
371
    def is_connected(self):
3✔
372
        return self.client and not self.client.is_closing()
1✔
373

374
    def rpc(self, list_or_method, args, restricted=True, session: Optional[ClientSession] = None):
3✔
375
        if session or self.is_connected:
×
376
            session = session or self.client
×
377
            return session.send_request(list_or_method, args)
×
378
        else:
379
            self._urgent_need_reconnect.set()
×
380
            raise ConnectionError("Attempting to send rpc request when connection is not available.")
×
381

382
    async def retriable_call(self, function, *args, **kwargs):
3✔
383
        while self.running:
×
384
            if not self.is_connected:
×
385
                log.warning("Wallet server unavailable, waiting for it to come back and retry.")
×
386
                self._urgent_need_reconnect.set()
×
387
                await self.on_connected.first
×
388
            try:
×
389
                return await function(*args, **kwargs)
×
390
            except asyncio.TimeoutError:
×
391
                log.warning("Wallet server call timed out, retrying.")
×
392
            except ConnectionError:
×
393
                log.warning("connection error")
×
394
        raise asyncio.CancelledError()  # if we got here, we are shutting down
×
395

396
    def _update_remote_height(self, header_args):
3✔
397
        self.remote_height = header_args[0]["height"]
×
398

399
    async def _update_hubs(self, hubs):
3✔
400
        if hubs and hubs != ['']:
×
401
            try:
×
402
                if self.known_hubs.add_hubs(hubs):
×
403
                    self.known_hubs.save()
×
404
            except Exception:
×
405
                log.exception("could not add hubs: %s", hubs)
×
406

407
    def get_transaction(self, tx_hash, known_height=None):
3✔
408
        # use any server if its old, otherwise restrict to who gave us the history
409
        restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10
×
410
        return self.rpc('blockchain.transaction.get', [tx_hash], restricted)
×
411

412
    def get_transaction_batch(self, txids, restricted=True):
3✔
413
        # use any server if its old, otherwise restrict to who gave us the history
414
        return self.rpc('blockchain.transaction.get_batch', txids, restricted)
×
415

416
    def get_transaction_and_merkle(self, tx_hash, known_height=None):
3✔
417
        # use any server if its old, otherwise restrict to who gave us the history
418
        restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10
×
419
        return self.rpc('blockchain.transaction.info', [tx_hash], restricted)
×
420

421
    def get_transaction_height(self, tx_hash, known_height=None):
3✔
422
        restricted = not known_height or 0 > known_height > self.remote_height - 10
×
423
        return self.rpc('blockchain.transaction.get_height', [tx_hash], restricted)
×
424

425
    def get_merkle(self, tx_hash, height):
3✔
426
        restricted = 0 > height > self.remote_height - 10
×
427
        return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted)
×
428

429
    def get_headers(self, height, count=10000, b64=False):
3✔
430
        restricted = height >= self.remote_height - 100
×
431
        return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
×
432

433
    #  --- Subscribes, history and broadcasts are always aimed towards the master client directly
434
    def get_history(self, address):
3✔
435
        return self.rpc('blockchain.address.get_history', [address], True)
×
436

437
    def broadcast(self, raw_transaction):
3✔
438
        return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
×
439

440
    def subscribe_headers(self):
3✔
441
        return self.rpc('blockchain.headers.subscribe', [True], True)
×
442

443
    async def subscribe_address(self, address, *addresses):
3✔
444
        addresses = list((address, ) + addresses)
×
445
        server_addr_and_port = self.client.server_address_and_port  # on disconnect client will be None
×
446
        try:
×
447
            return await self.rpc('blockchain.address.subscribe', addresses, True)
×
448
        except asyncio.TimeoutError:
×
449
            log.warning(
×
450
                "timed out subscribing to addresses from %s:%i",
451
                *server_addr_and_port
452
            )
453
            # abort and cancel, we can't lose a subscription, it will happen again on reconnect
454
            if self.client:
×
455
                self.client.abort()
×
456
            raise asyncio.CancelledError()
×
457

458
    def unsubscribe_address(self, address):
3✔
459
        return self.rpc('blockchain.address.unsubscribe', [address], True)
×
460

461
    def get_server_features(self):
3✔
462
        return self.rpc('server.features', (), restricted=True)
×
463

464
    # def get_claims_by_ids(self, claim_ids):
465
    #     return self.rpc('blockchain.claimtrie.getclaimsbyids', claim_ids)
466

467
    def get_claim_by_id(self, claim_id):
3✔
468
        return self.rpc('blockchain.claimtrie.getclaimbyid', [claim_id])
×
469

470
    def resolve(self, urls, session_override=None):
3✔
471
        return self.rpc('blockchain.claimtrie.resolve', urls, False, session_override)
×
472

473
    def claim_search(self, session_override=None, **kwargs):
3✔
474
        return self.rpc('blockchain.claimtrie.search', kwargs, False, session_override)
×
475

476
    async def sum_supports(self, server, **kwargs):
3✔
477
        message = {"method": "support_sum", "params": kwargs}
×
478
        async with self.aiohttp_session.post(server, json=message) as r:
×
479
            result = await r.json()
×
480
            return result['result']
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc