• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645556

pending completion
4599645556

Pull #3731

github

GitHub
Merge 18e9c5c9c into eb5da9511
Pull Request #3731: Bump cryptography from 2.5 to 39.0.1

2806 of 6558 branches covered (42.79%)

Branch coverage included in aggregate %.

12283 of 19915 relevant lines covered (61.68%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/lbry/stream/reflector/client.py
1
import asyncio
1✔
2
import json
1✔
3
import logging
1✔
4
import typing
1✔
5

6
if typing.TYPE_CHECKING:
1!
7
    from lbry.blob.blob_manager import BlobManager
×
8
    from lbry.stream.descriptor import StreamDescriptor
×
9

10
REFLECTOR_V1 = 0
1✔
11
REFLECTOR_V2 = 1
1✔
12

13
MAX_RESPONSE_SIZE = 2000000
1✔
14

15
log = logging.getLogger(__name__)
1✔
16

17

18
class StreamReflectorClient(asyncio.Protocol):
1✔
19
    def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'):
1✔
20
        self.loop = asyncio.get_event_loop()
1✔
21
        self.transport: typing.Optional[asyncio.WriteTransport] = None
1✔
22
        self.blob_manager = blob_manager
1✔
23
        self.descriptor = descriptor
1✔
24
        self.response_buff = b''
1✔
25
        self.reflected_blobs = []
1✔
26
        self.connected = asyncio.Event()
1✔
27
        self.response_queue = asyncio.Queue(maxsize=1)
1✔
28
        self.pending_request: typing.Optional[asyncio.Task] = None
1✔
29

30
    def connection_made(self, transport):
1✔
31
        self.transport = transport
1✔
32
        log.debug("Connected to reflector")
1✔
33
        self.connected.set()
1✔
34

35
    def connection_lost(self, exc: typing.Optional[Exception]):
1✔
36
        self.transport = None
1✔
37
        self.connected.clear()
1✔
38
        if self.pending_request:
1✔
39
            self.pending_request.cancel()
1✔
40
        if self.reflected_blobs:
1✔
41
            log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
1✔
42

43
    def data_received(self, data):
1✔
44
        if len(self.response_buff + (data or b'')) > MAX_RESPONSE_SIZE:
1!
45
            log.warning("response message to large from reflector server: %i bytes",
×
46
                        len(self.response_buff + (data or b'')))
47
            self.response_buff = b''
×
48
            self.transport.close()
×
49
            return
×
50
        self.response_buff += (data or b'')
1✔
51
        try:
1✔
52
            response = json.loads(self.response_buff.decode())
1✔
53
            self.response_buff = b''
1✔
54
            self.response_queue.put_nowait(response)
1✔
55
        except ValueError:
1✔
56
            if not data:
1!
57
                log.warning("got undecodable response from reflector server")
×
58
                self.response_buff = b''
×
59
            return
1✔
60

61
    async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
1✔
62
        msg = json.dumps(request_dict, sort_keys=True)
1✔
63
        try:
1✔
64
            self.transport.write(msg.encode())
1✔
65
            self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
1✔
66
            return await self.pending_request
1✔
67
        except (AttributeError, asyncio.CancelledError) as err:
1✔
68
            # attribute error happens when we transport.write after disconnect
69
            # cancelled error happens when the pending_request task is cancelled by a disconnect
70
            if self.transport:
1!
71
                self.transport.close()
×
72
            raise err if isinstance(err, asyncio.CancelledError) else asyncio.CancelledError()
1✔
73
        finally:
74
            self.pending_request = None
1✔
75

76
    async def send_handshake(self) -> None:
1✔
77
        response_dict = await self.send_request({'version': REFLECTOR_V2})
1✔
78
        if 'version' not in response_dict:
1!
79
            raise ValueError("Need protocol version number!")
×
80
        server_version = int(response_dict['version'])
1✔
81
        if server_version != REFLECTOR_V2:
1!
82
            raise ValueError(f"I can't handle protocol version {server_version}!")
×
83
        return
1✔
84

85
    async def send_descriptor(self) -> typing.Tuple[bool, typing.List[str]]:  # returns a list of needed blob hashes
1✔
86
        sd_blob = self.blob_manager.get_blob(self.descriptor.sd_hash)
1✔
87
        assert self.blob_manager.is_blob_verified(self.descriptor.sd_hash), "need to have sd blob to send at this point"
1✔
88
        response = await self.send_request({
1✔
89
            'sd_blob_hash': sd_blob.blob_hash,
90
            'sd_blob_size': sd_blob.length
91
        })
92
        if 'send_sd_blob' not in response:
1!
93
            raise ValueError("I don't know whether to send the sd blob or not!")
×
94
        needed = response.get('needed_blobs', [])
1✔
95
        sent_sd = False
1✔
96
        if response['send_sd_blob']:
1✔
97
            try:
1✔
98
                sent = await sd_blob.sendfile(self)
1✔
99
                if sent == -1:
1!
100
                    log.warning("failed to send sd blob")
×
101
                    raise asyncio.CancelledError()
×
102
                received = await asyncio.wait_for(self.response_queue.get(), 30)
1✔
103
            except asyncio.CancelledError as err:
×
104
                if self.transport:
×
105
                    self.transport.close()
×
106
                raise err
×
107
            if received.get('received_sd_blob'):
1!
108
                sent_sd = True
1✔
109
                if not needed:
1!
110
                    for blob in self.descriptor.blobs[:-1]:
1✔
111
                        if self.blob_manager.is_blob_verified(blob.blob_hash, blob.length):
1!
112
                            needed.append(blob.blob_hash)
1✔
113
                log.info("Sent reflector descriptor %s", sd_blob.blob_hash[:8])
1✔
114
                self.reflected_blobs.append(sd_blob.blob_hash)
1✔
115
            else:
116
                log.warning("Reflector failed to receive descriptor %s", sd_blob.blob_hash[:8])
×
117
        return sent_sd, needed
1✔
118

119
    async def send_blob(self, blob_hash: str):
1✔
120
        assert self.blob_manager.is_blob_verified(blob_hash), "need to have a blob to send at this point"
1✔
121
        blob = self.blob_manager.get_blob(blob_hash)
1✔
122
        response = await self.send_request({
1✔
123
            'blob_hash': blob.blob_hash,
124
            'blob_size': blob.length
125
        })
126
        if 'send_blob' not in response:
1!
127
            raise ValueError("I don't know whether to send the blob or not!")
×
128
        if response['send_blob']:
1!
129
            try:
1✔
130
                sent = await blob.sendfile(self)
1✔
131
                if sent == -1:
1!
132
                    log.warning("failed to send blob")
×
133
                    raise asyncio.CancelledError()
×
134
                received = await asyncio.wait_for(self.response_queue.get(), 30)
1✔
135
            except asyncio.CancelledError as err:
1✔
136
                if self.transport:
×
137
                    self.transport.close()
×
138
                raise err
×
139
            if received.get('received_blob'):
1!
140
                self.reflected_blobs.append(blob.blob_hash)
1✔
141
                log.info("Sent reflector blob %s", blob.blob_hash[:8])
1✔
142
            else:
143
                log.warning("Reflector failed to receive blob %s", blob.blob_hash[:8])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc