• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3788717008

pending completion
3788717008

Pull #3711

github

GitHub
Merge 69297ea9c into 625865165
Pull Request #3711: Bump to Python 3.9 attempt 3.

2802 of 6558 branches covered (42.73%)

Branch coverage included in aggregate %.

25 of 41 new or added lines in 17 files covered. (60.98%)

33 existing lines in 9 files now uncovered.

12281 of 19915 relevant lines covered (61.67%)

1.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/lbry/dht/protocol/iterative_find.py
1
import asyncio
3✔
2
from itertools import chain
3✔
3
from collections import defaultdict, OrderedDict
3✔
4
from collections.abc import AsyncIterator
3✔
5
import typing
3✔
6
import logging
3✔
7
from typing import TYPE_CHECKING
3✔
8
from lbry.dht import constants
3✔
9
from lbry.dht.error import RemoteException, TransportNotConnected
3✔
10
from lbry.dht.protocol.distance import Distance
3✔
11
from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address
3✔
12
from lbry.dht.serialization.datagram import PAGE_KEY
3✔
13

14
if TYPE_CHECKING:
3!
15
    from lbry.dht.protocol.protocol import KademliaProtocol
×
16
    from lbry.dht.peer import PeerManager, KademliaPeer
×
17

18
log = logging.getLogger(__name__)
3✔
19

20

21
class FindResponse:
3✔
22
    @property
3✔
23
    def found(self) -> bool:
3✔
24
        raise NotImplementedError()
×
25

26
    def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
3✔
27
        raise NotImplementedError()
×
28

29
    def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]:
3✔
30
        for contact_triple in self.get_close_triples():
1✔
31
            node_id, address, udp_port = contact_triple
1✔
32
            try:
1✔
33
                yield make_kademlia_peer(node_id, address, udp_port)
1✔
34
            except ValueError:
×
35
                log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address,
×
36
                            peer_info.udp_port, address, udp_port)
37

38

39
class FindNodeResponse(FindResponse):
3✔
40
    def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
3✔
41
        self.key = key
1✔
42
        self.close_triples = close_triples
1✔
43

44
    @property
3✔
45
    def found(self) -> bool:
3✔
46
        return self.key in [triple[0] for triple in self.close_triples]
1✔
47

48
    def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
3✔
49
        return self.close_triples
1✔
50

51

52
class FindValueResponse(FindResponse):
3✔
53
    def __init__(self, key: bytes, result_dict: typing.Dict):
3✔
54
        self.key = key
1✔
55
        self.token = result_dict[b'token']
1✔
56
        self.close_triples: typing.List[typing.Tuple[bytes, bytes, int]] = result_dict.get(b'contacts', [])
1✔
57
        self.found_compact_addresses = result_dict.get(key, [])
1✔
58
        self.pages = int(result_dict.get(PAGE_KEY, 0))
1✔
59

60
    @property
3✔
61
    def found(self) -> bool:
3✔
62
        return len(self.found_compact_addresses) > 0
1✔
63

64
    def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
3✔
65
        return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]
1✔
66

67

68
class IterativeFinder(AsyncIterator):
3✔
69
    def __init__(self, loop: asyncio.AbstractEventLoop,
3✔
70
                 protocol: 'KademliaProtocol', key: bytes,
71
                 max_results: typing.Optional[int] = constants.K,
72
                 shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
73
        if len(key) != constants.HASH_LENGTH:
1!
74
            raise ValueError("invalid key length: %i" % len(key))
×
75
        self.loop = loop
1✔
76
        self.peer_manager = protocol.peer_manager
1✔
77
        self.protocol = protocol
1✔
78

79
        self.key = key
1✔
80
        self.max_results = max(constants.K, max_results)
1✔
81

82
        self.active: typing.Dict['KademliaPeer', int] = OrderedDict()  # peer: distance, sorted
1✔
83
        self.contacted: typing.Set['KademliaPeer'] = set()
1✔
84
        self.distance = Distance(key)
1✔
85

86
        self.iteration_queue = asyncio.Queue()
1✔
87

88
        self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
1✔
89
        self.iteration_count = 0
1✔
90
        self.running = False
1✔
91
        self.tasks: typing.List[asyncio.Task] = []
1✔
92
        for peer in shortlist:
1✔
93
            if peer.node_id:
1✔
94
                self._add_active(peer, force=True)
1✔
95
            else:
96
                # seed nodes
97
                self._schedule_probe(peer)
1✔
98

99
    async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
3✔
100
        """
101
        Send the rpc request to the peer and return an object with the FindResponse interface
102
        """
103
        raise NotImplementedError()
×
104

105
    def search_exhausted(self):
3✔
106
        """
107
        This method ends the iterator due no more peers to contact.
108
        Override to provide last time results.
109
        """
110
        self.iteration_queue.put_nowait(None)
1✔
111

112
    def check_result_ready(self, response: FindResponse):
3✔
113
        """
114
        Called after adding peers from an rpc result to the shortlist.
115
        This method is responsible for putting a result for the generator into the Queue
116
        """
117
        raise NotImplementedError()
×
118

119
    def get_initial_result(self) -> typing.List['KademliaPeer']:  #pylint: disable=no-self-use
3✔
120
        """
121
        Get an initial or cached result to be put into the Queue. Used for findValue requests where the blob
122
        has peers in the local data store of blobs announced to us
123
        """
124
        return []
1✔
125

126
    def _add_active(self, peer, force=False):
3✔
127
        if not force and self.peer_manager.peer_is_good(peer) is False:
1!
128
            return
×
129
        if peer in self.contacted:
1✔
130
            return
1✔
131
        if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
1✔
132
            self.active[peer] = self.distance(peer.node_id)
1✔
133
            self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
1✔
134

135
    async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
3✔
136
        self._add_active(peer)
1✔
137
        for new_peer in response.get_close_kademlia_peers(peer):
1✔
138
            self._add_active(new_peer)
1✔
139
        self.check_result_ready(response)
1✔
140
        self._log_state(reason="check result")
1✔
141

142
    def _reset_closest(self, peer):
3✔
143
        if peer in self.active:
×
144
            del self.active[peer]
×
145

146
    async def _send_probe(self, peer: 'KademliaPeer'):
3✔
147
        try:
1✔
148
            response = await self.send_probe(peer)
1✔
149
        except asyncio.TimeoutError:
×
150
            self._reset_closest(peer)
×
151
            return
×
152
        except asyncio.CancelledError:
×
153
            log.debug("%s[%x] cancelled probe",
×
154
                      type(self).__name__, id(self))
155
            raise
×
156
        except ValueError as err:
×
157
            log.warning(str(err))
×
158
            self._reset_closest(peer)
×
159
            return
×
160
        except TransportNotConnected:
×
161
            await self._aclose(reason="not connected")
×
162
            return
×
163
        except RemoteException:
×
164
            self._reset_closest(peer)
×
165
            return
×
166
        return await self._handle_probe_result(peer, response)
1✔
167

168
    def _search_round(self):
3✔
169
        """
170
        Send up to constants.alpha (5) probes to closest active peers
171
        """
172

173
        added = 0
1✔
174
        for index, peer in enumerate(self.active.keys()):
1✔
175
            if index == 0:
1✔
176
                log.debug("%s[%x] closest to probe: %s",
1✔
177
                          type(self).__name__, id(self),
178
                          peer.node_id.hex()[:8])
179
            if peer in self.contacted:
1✔
180
                continue
1✔
181
            if len(self.running_probes) >= constants.ALPHA:
1✔
182
                break
1✔
183
            if index > (constants.K + len(self.running_probes)):
1✔
184
                break
1✔
185
            origin_address = (peer.address, peer.udp_port)
1✔
186
            if peer.node_id == self.protocol.node_id:
1!
187
                continue
×
188
            if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
1!
189
                continue
×
190
            self._schedule_probe(peer)
1✔
191
            added += 1
1✔
192
        log.debug("%s[%x] running %d probes for key %s",
1✔
193
                  type(self).__name__, id(self),
194
                  len(self.running_probes), self.key.hex()[:8])
195
        if not added and not self.running_probes:
1✔
196
            log.debug("%s[%x] search for %s exhausted",
1✔
197
                      type(self).__name__, id(self),
198
                      self.key.hex()[:8])
199
            self.search_exhausted()
1✔
200

201
    def _schedule_probe(self, peer: 'KademliaPeer'):
3✔
202
        self.contacted.add(peer)
1✔
203

204
        t = self.loop.create_task(self._send_probe(peer))
1✔
205

206
        def callback(_):
1✔
207
            self.running_probes.pop(peer, None)
1✔
208
            if self.running:
1!
209
                self._search_round()
1✔
210

211
        t.add_done_callback(callback)
1✔
212
        self.running_probes[peer] = t
1✔
213

214
    def _log_state(self, reason="?"):
3✔
215
        log.debug("%s[%x] [%s] %s: %i active nodes %i contacted %i produced %i queued",
1✔
216
                  type(self).__name__, id(self), self.key.hex()[:8],
217
                  reason, len(self.active), len(self.contacted),
218
                  self.iteration_count, self.iteration_queue.qsize())
219

220
    def __aiter__(self):
3✔
221
        if self.running:
1!
222
            raise Exception("already running")
×
223
        self.running = True
1✔
224
        self.loop.call_soon(self._search_round)
1✔
225
        return self
1✔
226

227
    async def __anext__(self) -> typing.List['KademliaPeer']:
3✔
228
        try:
1✔
229
            if self.iteration_count == 0:
1✔
230
                result = self.get_initial_result() or await self.iteration_queue.get()
1✔
231
            else:
232
                result = await self.iteration_queue.get()
1✔
233
            if not result:
1✔
234
                raise StopAsyncIteration
1✔
235
            self.iteration_count += 1
1✔
236
            return result
1✔
237
        except asyncio.CancelledError:
1!
UNCOV
238
            await self._aclose(reason="cancelled")
×
UNCOV
239
            raise
×
240
        except StopAsyncIteration:
1✔
241
            await self._aclose(reason="no more results")
1✔
242
            raise
1✔
243

244
    async def _aclose(self, reason="?"):
3✔
245
        log.debug("%s[%x] [%s] shutdown because %s: %i active nodes %i contacted %i produced %i queued",
1✔
246
                  type(self).__name__, id(self), self.key.hex()[:8],
247
                  reason, len(self.active), len(self.contacted),
248
                  self.iteration_count, self.iteration_queue.qsize())
249
        self.running = False
1✔
250
        self.iteration_queue.put_nowait(None)
1✔
251
        for task in chain(self.tasks, self.running_probes.values()):
1!
UNCOV
252
            task.cancel()
×
253
        self.tasks.clear()
1✔
254
        self.running_probes.clear()
1✔
255

256
    async def aclose(self):
3✔
257
        if self.running:
1!
258
            await self._aclose(reason="aclose")
×
259
        log.debug("%s[%x] [%s] async close completed",
1✔
260
                  type(self).__name__, id(self), self.key.hex()[:8])
261

262
class IterativeNodeFinder(IterativeFinder):
3✔
263
    def __init__(self, loop: asyncio.AbstractEventLoop,
3✔
264
                 protocol: 'KademliaProtocol', key: bytes,
265
                 max_results: typing.Optional[int] = constants.K,
266
                 shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
267
        super().__init__(loop, protocol, key, max_results, shortlist)
1✔
268
        self.yielded_peers: typing.Set['KademliaPeer'] = set()
1✔
269

270
    async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
3✔
271
        log.debug("probe %s:%d (%s) for NODE %s",
1✔
272
                  peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '', self.key.hex()[:8])
273
        response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
1✔
274
        return FindNodeResponse(self.key, response)
1✔
275

276
    def search_exhausted(self):
3✔
277
        self.put_result(self.active.keys(), finish=True)
1✔
278

279
    def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
3✔
280
        not_yet_yielded = [
1✔
281
            peer for peer in from_iter
282
            if peer not in self.yielded_peers
283
            and peer.node_id != self.protocol.node_id
284
            and self.peer_manager.peer_is_good(peer) is True  # return only peers who answered
285
        ]
286
        not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
1✔
287
        to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
1✔
288
        if to_yield:
1✔
289
            self.yielded_peers.update(to_yield)
1✔
290
            self.iteration_queue.put_nowait(to_yield)
1✔
291
        if finish:
1!
292
            self.iteration_queue.put_nowait(None)
1✔
293

294
    def check_result_ready(self, response: FindNodeResponse):
3✔
295
        found = response.found and self.key != self.protocol.node_id
1✔
296

297
        if found:
1!
298
            log.debug("found")
×
299
            return self.put_result(self.active.keys(), finish=True)
×
300

301

302
class IterativeValueFinder(IterativeFinder):
3✔
303
    def __init__(self, loop: asyncio.AbstractEventLoop,
3✔
304
                 protocol: 'KademliaProtocol', key: bytes,
305
                 max_results: typing.Optional[int] = constants.K,
306
                 shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
307
        super().__init__(loop, protocol, key, max_results, shortlist)
1✔
308
        self.blob_peers: typing.Set['KademliaPeer'] = set()
1✔
309
        # this tracks the index of the most recent page we requested from each peer
310
        self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
1✔
311
        # this tracks the set of blob peers returned by each peer
312
        self.discovered_peers: typing.Dict['KademliaPeer', typing.Set['KademliaPeer']] = defaultdict(set)
1✔
313

314
    async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
3✔
315
        log.debug("probe %s:%d (%s) for VALUE %s",
1✔
316
                  peer.address, peer.udp_port, peer.node_id.hex()[:8], self.key.hex()[:8])
317
        page = self.peer_pages[peer]
1✔
318
        response = await self.protocol.get_rpc_peer(peer).find_value(self.key, page=page)
1✔
319
        parsed = FindValueResponse(self.key, response)
1✔
320
        if not parsed.found:
1✔
321
            return parsed
1✔
322
        already_known = len(self.discovered_peers[peer])
1✔
323
        decoded_peers = set()
1✔
324
        for compact_addr in parsed.found_compact_addresses:
1✔
325
            try:
1✔
326
                decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr))
1✔
327
            except ValueError:
×
328
                log.warning("misbehaving peer %s:%i returned invalid peer for blob",
×
329
                            peer.address, peer.udp_port)
330
                self.peer_manager.report_failure(peer.address, peer.udp_port)
×
331
                parsed.found_compact_addresses.clear()
×
332
                return parsed
×
333
        self.discovered_peers[peer].update(decoded_peers)
1✔
334
        log.debug("probed %s:%i page %i, %i known", peer.address, peer.udp_port, page,
1✔
335
                  already_known + len(parsed.found_compact_addresses))
336
        if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses):
1!
337
            log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port)
×
338
        elif len(parsed.found_compact_addresses) >= constants.K and self.peer_pages[peer] < parsed.pages:
1!
339
            # the peer returned a full page and indicates it has more
340
            self.peer_pages[peer] += 1
×
341
            if peer in self.contacted:
×
342
                # the peer must be removed from self.contacted so that it will be probed for the next page
343
                self.contacted.remove(peer)
×
344
        return parsed
1✔
345

346
    def check_result_ready(self, response: FindValueResponse):
3✔
347
        if response.found:
1✔
348
            blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
1✔
349
                          for compact_addr in response.found_compact_addresses]
350
            to_yield = []
1✔
351
            for blob_peer in blob_peers:
1✔
352
                if blob_peer not in self.blob_peers:
1✔
353
                    self.blob_peers.add(blob_peer)
1✔
354
                    to_yield.append(blob_peer)
1✔
355
            if to_yield:
1✔
356
                self.iteration_queue.put_nowait(to_yield)
1✔
357

358
    def get_initial_result(self) -> typing.List['KademliaPeer']:
3✔
359
        if self.protocol.data_store.has_peers_for_blob(self.key):
1!
360
            return self.protocol.data_store.get_peers_for_blob(self.key)
×
361
        return []
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc