• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.25
/lbry/dht/blob_announcer.py
1
import asyncio
1✔
2
import typing
1✔
3
import logging
1✔
4

5
from prometheus_client import Counter, Gauge
1✔
6

7
if typing.TYPE_CHECKING:
1!
8
    from lbry.dht.node import Node
×
9
    from lbry.extras.daemon.storage import SQLiteStorage
×
10

11
log = logging.getLogger(__name__)
1✔
12

13

14
class BlobAnnouncer:
1✔
15
    announcements_sent_metric = Counter(
1✔
16
        "announcements_sent", "Number of announcements sent and their respective status.", namespace="dht_node",
17
        labelnames=("peers", "error"),
18
    )
19
    announcement_queue_size_metric = Gauge(
1✔
20
        "announcement_queue_size", "Number of hashes waiting to be announced.", namespace="dht_node",
21
        labelnames=("scope",)
22
    )
23

24
    def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
1✔
25
        self.loop = loop
1✔
26
        self.node = node
1✔
27
        self.storage = storage
1✔
28
        self.announce_task: asyncio.Task = None
1✔
29
        self.announce_queue: typing.List[str] = []
1✔
30
        self._done = asyncio.Event()
1✔
31
        self.announced = set()
1✔
32

33
    async def _run_consumer(self):
1✔
34
        while self.announce_queue:
1✔
35
            try:
1✔
36
                blob_hash = self.announce_queue.pop()
1✔
37
                peers = len(await self.node.announce_blob(blob_hash))
1✔
38
                self.announcements_sent_metric.labels(peers=peers, error=False).inc()
1✔
39
                if peers > 4:
1!
40
                    self.announced.add(blob_hash)
1✔
41
                else:
42
                    log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
×
43
            except Exception as err:
×
44
                self.announcements_sent_metric.labels(peers=0, error=True).inc()
×
45
                log.warning("error announcing %s: %s", blob_hash[:8], str(err))
×
46

47
    async def _announce(self, batch_size: typing.Optional[int] = 10):
1✔
48
        while batch_size:
1!
49
            if not self.node.joined.is_set():
1!
50
                await self.node.joined.wait()
×
51
            await asyncio.sleep(60)
1✔
52
            if not self.node.protocol.routing_table.get_peers():
1!
53
                log.warning("No peers in DHT, announce round skipped")
×
54
                continue
×
55
            self.announce_queue.extend(await self.storage.get_blobs_to_announce())
1✔
56
            self.announcement_queue_size_metric.labels(scope="global").set(len(self.announce_queue))
1✔
57
            log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
1✔
58
            while len(self.announce_queue) > 0:
1✔
59
                log.info("%i blobs to announce", len(self.announce_queue))
1✔
60
                await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)])
1✔
61
                announced = list(filter(None, self.announced))
1✔
62
                if announced:
1!
63
                    await self.storage.update_last_announced_blobs(announced)
1✔
64
                    log.info("announced %i blobs", len(announced))
1✔
65
                    self.announced.clear()
1✔
66
            self._done.set()
1✔
67
            self._done.clear()
1✔
68

69
    def start(self, batch_size: typing.Optional[int] = 10):
1✔
70
        assert not self.announce_task or self.announce_task.done(), "already running"
1✔
71
        self.announce_task = self.loop.create_task(self._announce(batch_size))
1✔
72

73
    def stop(self):
1✔
74
        if self.announce_task and not self.announce_task.done():
1✔
75
            self.announce_task.cancel()
1✔
76

77
    def wait(self):
1✔
78
        return self._done.wait()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc