• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.79
/lbry/dht/protocol/routing_table.py
1
import asyncio
1✔
2
import random
1✔
3
import logging
1✔
4
import typing
1✔
5
import itertools
1✔
6

7
from prometheus_client import Gauge
1✔
8

9
from lbry import utils
1✔
10
from lbry.dht import constants
1✔
11
from lbry.dht.error import RemoteException
1✔
12
from lbry.dht.protocol.distance import Distance
1✔
13
if typing.TYPE_CHECKING:
1!
14
    from lbry.dht.peer import KademliaPeer, PeerManager
×
15

16
log = logging.getLogger(__name__)
1✔
17

18

19
class KBucket:
1✔
20
    """
21
    Kademlia K-bucket implementation.
22
    """
23
    peer_in_routing_table_metric = Gauge(
1✔
24
        "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node",
25
        labelnames=("scope",)
26
    )
27
    peer_with_x_bit_colliding_metric = Gauge(
1✔
28
        "peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id",
29
        namespace="dht_node", labelnames=("amount",)
30
    )
31

32
    def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int,
1✔
33
                 node_id: bytes, capacity: int = constants.K):
34
        """
35
        @param range_min: The lower boundary for the range in the n-bit ID
36
                         space covered by this k-bucket
37
        @param range_max: The upper boundary for the range in the ID space
38
                         covered by this k-bucket
39
        """
40
        self._peer_manager = peer_manager
1✔
41
        self.range_min = range_min
1✔
42
        self.range_max = range_max
1✔
43
        self.peers: typing.List['KademliaPeer'] = []
1✔
44
        self._node_id = node_id
1✔
45
        self._distance_to_self = Distance(node_id)
1✔
46
        self.capacity = capacity
1✔
47

48
    def add_peer(self, peer: 'KademliaPeer') -> bool:
1✔
49
        """ Add contact to _contact list in the right order. This will move the
50
        contact to the end of the k-bucket if it is already present.
51

52
        @raise kademlia.kbucket.BucketFull: Raised when the bucket is full and
53
                                            the contact isn't in the bucket
54
                                            already
55

56
        @param peer: The contact to add
57
        @type peer: dht.contact._Contact
58
        """
59
        if peer in self.peers:
1✔
60
            # Move the existing contact to the end of the list
61
            # - using the new contact to allow add-on data
62
            #   (e.g. optimization-specific stuff) to pe updated as well
63
            self.peers.remove(peer)
1✔
64
            self.peers.append(peer)
1✔
65
            return True
1✔
66
        else:
67
            for i, _ in enumerate(self.peers):
1✔
68
                local_peer = self.peers[i]
1✔
69
                if local_peer.node_id == peer.node_id:
1✔
70
                    self.peers.remove(local_peer)
1✔
71
                    self.peers.append(peer)
1✔
72
                    return True
1✔
73
        if len(self.peers) < self.capacity:
1✔
74
            self.peers.append(peer)
1✔
75
            self.peer_in_routing_table_metric.labels("global").inc()
1✔
76
            bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
1✔
77
            self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
1✔
78
            return True
1✔
79
        else:
80
            return False
1✔
81

82
    def get_peer(self, node_id: bytes) -> 'KademliaPeer':
1✔
83
        for peer in self.peers:
1✔
84
            if peer.node_id == node_id:
1✔
85
                return peer
1✔
86

87
    def get_peers(self, count=-1, exclude_contact=None, sort_distance_to=None) -> typing.List['KademliaPeer']:
1✔
88
        """ Returns a list containing up to the first count number of contacts
89

90
        @param count: The amount of contacts to return (if 0 or less, return
91
                      all contacts)
92
        @type count: int
93
        @param exclude_contact: A node node_id to exclude; if this contact is in
94
                               the list of returned values, it will be
95
                               discarded before returning. If a C{str} is
96
                               passed as this argument, it must be the
97
                               contact's ID.
98
        @type exclude_contact: str
99

100
        @param sort_distance_to: Sort distance to the node_id, defaulting to the parent node node_id. If False don't
101
                                 sort the contacts
102

103
        @raise IndexError: If the number of requested contacts is too large
104

105
        @return: Return up to the first count number of contacts in a list
106
                If no contacts are present an empty is returned
107
        @rtype: list
108
        """
109
        peers = [peer for peer in self.peers if peer.node_id != exclude_contact]
1✔
110

111
        # Return all contacts in bucket
112
        if count <= 0:
1!
113
            count = len(peers)
1✔
114

115
        # Get current contact number
116
        current_len = len(peers)
1✔
117

118
        # If count greater than k - return only k contacts
119
        if count > constants.K:
1!
120
            count = constants.K
×
121

122
        if not current_len:
1!
123
            return peers
×
124

125
        if sort_distance_to is False:
1!
126
            pass
1✔
127
        else:
128
            sort_distance_to = sort_distance_to or self._node_id
×
129
            peers.sort(key=lambda c: Distance(sort_distance_to)(c.node_id))
×
130

131
        return peers[:min(current_len, count)]
1✔
132

133
    def get_bad_or_unknown_peers(self) -> typing.List['KademliaPeer']:
1✔
134
        peer = self.get_peers(sort_distance_to=False)
1✔
135
        return [
1✔
136
            peer for peer in peer
137
            if self._peer_manager.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port) is not True
138
        ]
139

140
    def remove_peer(self, peer: 'KademliaPeer') -> None:
1✔
141
        self.peers.remove(peer)
1✔
142
        self.peer_in_routing_table_metric.labels("global").dec()
1✔
143
        bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
1✔
144
        self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
1✔
145

146
    def key_in_range(self, key: bytes) -> bool:
1✔
147
        """ Tests whether the specified key (i.e. node ID) is in the range
148
        of the n-bit ID space covered by this k-bucket (in otherwords, it
149
        returns whether or not the specified key should be placed in this
150
        k-bucket)
151

152
        @param key: The key to test
153
        @type key: str or int
154

155
        @return: C{True} if the key is in this k-bucket's range, or C{False}
156
                 if not.
157
        @rtype: bool
158
        """
159
        return self.range_min <= self._distance_to_self(key) < self.range_max
1✔
160

161
    def __len__(self) -> int:
1✔
162
        return len(self.peers)
1✔
163

164
    def __contains__(self, item) -> bool:
1✔
165
        return item in self.peers
×
166

167

168
class TreeRoutingTable:
1✔
169
    """ This class implements a routing table used by a Node class.
170

171
    The Kademlia routing table is a binary tree whose leaves are k-buckets,
172
    where each k-bucket contains nodes with some common prefix of their IDs.
173
    This prefix is the k-bucket's position in the binary tree; it therefore
174
    covers some range of ID values, and together all of the k-buckets cover
175
    the entire n-bit ID (or key) space (with no overlap).
176

177
    @note: In this implementation, nodes in the tree (the k-buckets) are
178
    added dynamically, as needed; this technique is described in the 13-page
179
    version of the Kademlia paper, in section 2.4. It does, however, use the
180
    ping RPC-based k-bucket eviction algorithm described in section 2.2 of
181
    that paper.
182

183
    BOOTSTRAP MODE: if set to True, we always add all peers. This is so a
184
    bootstrap node does not get a bias towards its own node id and replies are
185
    the best it can provide (joining peer knows its neighbors immediately).
186
    Over time, this will need to be optimized so we use the disk as holding
187
    everything in memory won't be feasible anymore.
188
    See: https://github.com/bittorrent/bootstrap-dht
189
    """
190
    bucket_in_routing_table_metric = Gauge(
1✔
191
        "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
192
        labelnames=("scope",)
193
    )
194

195
    def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
1✔
196
                 split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False):
197
        self._loop = loop
1✔
198
        self._peer_manager = peer_manager
1✔
199
        self._parent_node_id = parent_node_id
1✔
200
        self._split_buckets_under_index = split_buckets_under_index
1✔
201
        self.buckets: typing.List[KBucket] = [
1✔
202
            KBucket(
203
                self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id,
204
                capacity=1 << 32 if is_bootstrap_node else constants.K
205
            )
206
        ]
207

208
    def get_peers(self) -> typing.List['KademliaPeer']:
1✔
209
        return list(itertools.chain.from_iterable(map(lambda bucket: bucket.peers, self.buckets)))
1✔
210

211
    def _should_split(self, bucket_index: int, to_add: bytes) -> bool:
1✔
212
        #  https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
213
        if bucket_index < self._split_buckets_under_index:
1✔
214
            return True
1✔
215
        contacts = self.get_peers()
1✔
216
        distance = Distance(self._parent_node_id)
1✔
217
        contacts.sort(key=lambda c: distance(c.node_id))
1✔
218
        kth_contact = contacts[-1] if len(contacts) < constants.K else contacts[constants.K - 1]
1✔
219
        return distance(to_add) < distance(kth_contact.node_id)
1✔
220

221
    def find_close_peers(self, key: bytes, count: typing.Optional[int] = None,
1✔
222
                         sender_node_id: typing.Optional[bytes] = None) -> typing.List['KademliaPeer']:
223
        exclude = [self._parent_node_id]
1✔
224
        if sender_node_id:
1✔
225
            exclude.append(sender_node_id)
1✔
226
        count = count or constants.K
1✔
227
        distance = Distance(key)
1✔
228
        contacts = self.get_peers()
1✔
229
        contacts = [c for c in contacts if c.node_id not in exclude]
1✔
230
        if contacts:
1✔
231
            contacts.sort(key=lambda c: distance(c.node_id))
1✔
232
            return contacts[:min(count, len(contacts))]
1✔
233
        return []
1✔
234

235
    def get_peer(self, contact_id: bytes) -> 'KademliaPeer':
1✔
236
        return self.buckets[self._kbucket_index(contact_id)].get_peer(contact_id)
1✔
237

238
    def get_refresh_list(self, start_index: int = 0, force: bool = False) -> typing.List[bytes]:
1✔
239
        refresh_ids = []
1✔
240
        for offset, _ in enumerate(self.buckets[start_index:]):
1✔
241
            refresh_ids.append(self._midpoint_id_in_bucket_range(start_index + offset))
1✔
242
        # if we have 3 or fewer populated buckets get two random ids in the range of each to try and
243
        # populate/split the buckets further
244
        buckets_with_contacts = self.buckets_with_contacts()
1✔
245
        if buckets_with_contacts <= 3:
1!
246
            for i in range(buckets_with_contacts):
1✔
247
                refresh_ids.append(self._random_id_in_bucket_range(i))
1✔
248
                refresh_ids.append(self._random_id_in_bucket_range(i))
1✔
249
        return refresh_ids
1✔
250

251
    def remove_peer(self, peer: 'KademliaPeer') -> None:
1✔
252
        if not peer.node_id:
1!
253
            return
×
254
        bucket_index = self._kbucket_index(peer.node_id)
1✔
255
        try:
1✔
256
            self.buckets[bucket_index].remove_peer(peer)
1✔
257
            self._join_buckets()
1✔
258
        except ValueError:
×
259
            return
×
260

261
    def _kbucket_index(self, key: bytes) -> int:
1✔
262
        i = 0
1✔
263
        for bucket in self.buckets:
1!
264
            if bucket.key_in_range(key):
1✔
265
                return i
1✔
266
            else:
267
                i += 1
1✔
268
        return i
×
269

270
    def _random_id_in_bucket_range(self, bucket_index: int) -> bytes:
1✔
271
        random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max))
1✔
272
        return Distance(
1✔
273
            self._parent_node_id
274
        )(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big')
275

276
    def _midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes:
1✔
277
        half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2)
1✔
278
        return Distance(self._parent_node_id)(
1✔
279
            int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big')
280
        ).to_bytes(constants.HASH_LENGTH, 'big')
281

282
    def _split_bucket(self, old_bucket_index: int) -> None:
1✔
283
        """ Splits the specified k-bucket into two new buckets which together
284
        cover the same range in the key/ID space
285

286
        @param old_bucket_index: The index of k-bucket to split (in this table's
287
                                 list of k-buckets)
288
        @type old_bucket_index: int
289
        """
290
        # Resize the range of the current (old) k-bucket
291
        old_bucket = self.buckets[old_bucket_index]
1✔
292
        split_point = old_bucket.range_max - (old_bucket.range_max - old_bucket.range_min) // 2
1✔
293
        # Create a new k-bucket to cover the range split off from the old bucket
294
        new_bucket = KBucket(self._peer_manager, split_point, old_bucket.range_max, self._parent_node_id)
1✔
295
        old_bucket.range_max = split_point
1✔
296
        # Now, add the new bucket into the routing table tree
297
        self.buckets.insert(old_bucket_index + 1, new_bucket)
1✔
298
        # Finally, copy all nodes that belong to the new k-bucket into it...
299
        for contact in old_bucket.peers:
1✔
300
            if new_bucket.key_in_range(contact.node_id):
1✔
301
                new_bucket.add_peer(contact)
1✔
302
        # ...and remove them from the old bucket
303
        for contact in new_bucket.peers:
1✔
304
            old_bucket.remove_peer(contact)
1✔
305
        self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
1✔
306

307
    def _join_buckets(self):
1✔
308
        if len(self.buckets) == 1:
1✔
309
            return
1✔
310
        to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
1✔
311
        if not to_pop:
1!
312
            return
1✔
313
        log.info("join buckets %i", len(to_pop))
×
314
        bucket_index_to_pop = to_pop[0]
×
315
        assert len(self.buckets[bucket_index_to_pop]) == 0
×
316
        can_go_lower = bucket_index_to_pop - 1 >= 0
×
317
        can_go_higher = bucket_index_to_pop + 1 < len(self.buckets)
×
318
        assert can_go_higher or can_go_lower
×
319
        bucket = self.buckets[bucket_index_to_pop]
×
320
        if can_go_lower and can_go_higher:
×
321
            midpoint = ((bucket.range_max - bucket.range_min) // 2) + bucket.range_min
×
322
            self.buckets[bucket_index_to_pop - 1].range_max = midpoint - 1
×
323
            self.buckets[bucket_index_to_pop + 1].range_min = midpoint
×
324
        elif can_go_lower:
×
325
            self.buckets[bucket_index_to_pop - 1].range_max = bucket.range_max
×
326
        elif can_go_higher:
×
327
            self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
×
328
        self.buckets.remove(bucket)
×
329
        self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
×
330
        return self._join_buckets()
×
331

332
    def buckets_with_contacts(self) -> int:
1✔
333
        count = 0
1✔
334
        for bucket in self.buckets:
1✔
335
            if len(bucket) > 0:
1✔
336
                count += 1
1✔
337
        return count
1✔
338

339
    async def add_peer(self, peer: 'KademliaPeer', probe: typing.Callable[['KademliaPeer'], typing.Awaitable]):
1✔
340
        if not peer.node_id:
1✔
341
            log.warning("Tried adding a peer with no node id!")
1✔
342
            return False
1✔
343
        for my_peer in self.get_peers():
1✔
344
            if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
1!
345
                self.remove_peer(my_peer)
×
346
                self._join_buckets()
×
347
        bucket_index = self._kbucket_index(peer.node_id)
1✔
348
        if self.buckets[bucket_index].add_peer(peer):
1✔
349
            return True
1✔
350

351
        # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
352
        if self._should_split(bucket_index, peer.node_id):
1✔
353
            self._split_bucket(bucket_index)
1✔
354
            # Retry the insertion attempt
355
            result = await self.add_peer(peer, probe)
1✔
356
            self._join_buckets()
1✔
357
            return result
1✔
358
        else:
359
            # We can't split the k-bucket
360
            #
361
            # The 13 page kademlia paper specifies that the least recently contacted node in the bucket
362
            # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
363
            # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
364
            #
365
            # A reasonable extension to this is BEP 0005, which extends the above:
366
            #
367
            #    Not all nodes that we learn about are equal. Some are "good" and some are not.
368
            #    Many nodes using the DHT are able to send queries and receive responses,
369
            #    but are not able to respond to queries from other nodes. It is important that
370
            #    each node's routing table must contain only known good nodes. A good node is
371
            #    a node has responded to one of our queries within the last 15 minutes. A node
372
            #    is also good if it has ever responded to one of our queries and has sent us a
373
            #    query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
374
            #    questionable. Nodes become bad when they fail to respond to multiple queries
375
            #    in a row. Nodes that we know are good are given priority over nodes with unknown status.
376
            #
377
            # When there are bad or questionable nodes in the bucket, the least recent is selected for
378
            # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
379
            # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
380
            # is ignored if the pinged node replies.
381

382
            not_good_contacts = self.buckets[bucket_index].get_bad_or_unknown_peers()
1✔
383
            not_recently_replied = []
1✔
384
            for my_peer in not_good_contacts:
1✔
385
                last_replied = self._peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
1✔
386
                if not last_replied or last_replied + 60 < self._loop.time():
1!
387
                    not_recently_replied.append(my_peer)
1✔
388
            if not_recently_replied:
1✔
389
                to_replace = not_recently_replied[0]
1✔
390
            else:
391
                to_replace = self.buckets[bucket_index].peers[0]
1✔
392
                last_replied = self._peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
1✔
393
                if last_replied and last_replied + 60 > self._loop.time():
1!
394
                    return False
1✔
395
            log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
1✔
396
            try:
1✔
397
                await probe(to_replace)
1✔
398
                return False
1✔
399
            except (asyncio.TimeoutError, RemoteException):
×
400
                log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
×
401
                          to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
402
                if to_replace in self.buckets[bucket_index]:
×
403
                    self.buckets[bucket_index].remove_peer(to_replace)
×
404
                return await self.add_peer(peer, probe)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc