• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 4599645360

pending completion
4599645360

push

github

GitHub
Bump cryptography from 2.5 to 39.0.1

2807 of 6557 branches covered (42.81%)

Branch coverage included in aggregate %.

12289 of 19915 relevant lines covered (61.71%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.94
/lbry/blob_exchange/client.py
1
import asyncio
1✔
2
import time
1✔
3
import logging
1✔
4
import typing
1✔
5
import binascii
1✔
6
from typing import Optional
1✔
7
from lbry.error import InvalidBlobHashError, InvalidDataError
1✔
8
from lbry.blob_exchange.serialization import BlobResponse, BlobRequest
1✔
9
from lbry.utils import cache_concurrent
1✔
10
if typing.TYPE_CHECKING:
1!
11
    from lbry.blob.blob_file import AbstractBlob
×
12
    from lbry.blob.writer import HashBlobWriter
×
13
    from lbry.connection_manager import ConnectionManager
×
14

15
log = logging.getLogger(__name__)
1✔
16

17

18
class BlobExchangeClientProtocol(asyncio.Protocol):
1✔
19
    def __init__(self, loop: asyncio.AbstractEventLoop, peer_timeout: typing.Optional[float] = 10,
1✔
20
                 connection_manager: typing.Optional['ConnectionManager'] = None):
21
        self.loop = loop
1✔
22
        self.peer_port: typing.Optional[int] = None
1✔
23
        self.peer_address: typing.Optional[str] = None
1✔
24
        self.transport: typing.Optional[asyncio.Transport] = None
1✔
25
        self.peer_timeout = peer_timeout
1✔
26
        self.connection_manager = connection_manager
1✔
27
        self.writer: typing.Optional['HashBlobWriter'] = None
1✔
28
        self.blob: typing.Optional['AbstractBlob'] = None
1✔
29

30
        self._blob_bytes_received = 0
1✔
31
        self._response_fut: typing.Optional[asyncio.Future] = None
1✔
32
        self.buf = b''
1✔
33

34
        # this is here to handle the race when the downloader is closed right as response_fut gets a result
35
        self.closed = asyncio.Event()
1✔
36

37
    def data_received(self, data: bytes):
1✔
38
        if self.connection_manager:
1✔
39
            if not self.peer_address:
1!
40
                addr_info = self.transport.get_extra_info('peername')
×
41
                self.peer_address, self.peer_port = addr_info
×
42
            # assert self.peer_address is not None
43
            self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data))
1✔
44
        if not self.transport or self.transport.is_closing():
1!
45
            log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port,
×
46
                        binascii.hexlify(data))
47
            if self._response_fut and not self._response_fut.done():
×
48
                self._response_fut.cancel()
×
49
            return
×
50
        if not self._response_fut:
1!
51
            log.warning("Protocol received data before expected, probable race on keep alive. Closing transport.")
×
52
            return self.close()
×
53
        if self._blob_bytes_received and not self.writer.closed():
1✔
54
            return self._write(data)
1✔
55

56
        response = BlobResponse.deserialize(self.buf + data)
1✔
57
        if not response.responses and not self._response_fut.done():
1✔
58
            self.buf += data
1✔
59
            return
1✔
60
        else:
61
            self.buf = b''
1✔
62

63
        if response.responses and self.blob:
1✔
64
            blob_response = response.get_blob_response()
1✔
65
            if blob_response and not blob_response.error and blob_response.blob_hash == self.blob.blob_hash:
1✔
66
                # set the expected length for the incoming blob if we didn't know it
67
                self.blob.set_length(blob_response.length)
1✔
68
            elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash:
1!
69
                # the server started sending a blob we didn't request
70
                log.warning("%s started sending blob we didn't request %s instead of %s", self.peer_address,
×
71
                            blob_response.blob_hash, self.blob.blob_hash)
72
                return
×
73
        if response.responses:
1✔
74
            log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict())
1✔
75
            # fire the Future with the response to our request
76
            self._response_fut.set_result(response)
1✔
77
        if response.blob_data and self.writer and not self.writer.closed():
1✔
78
            # log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port)
79
            # write blob bytes if we're writing a blob and have blob bytes to write
80
            self._write(response.blob_data)
1✔
81

82
    def _write(self, data: bytes):
1✔
83
        if len(data) > (self.blob.get_length() - self._blob_bytes_received):
1!
84
            data = data[:(self.blob.get_length() - self._blob_bytes_received)]
×
85
            log.warning("got more than asked from %s:%d, probable sendfile bug", self.peer_address, self.peer_port)
×
86
        self._blob_bytes_received += len(data)
1✔
87
        try:
1✔
88
            self.writer.write(data)
1✔
89
        except OSError as err:
×
90
            log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err)
×
91
            if self._response_fut and not self._response_fut.done():
×
92
                self._response_fut.set_exception(err)
×
93
        except asyncio.TimeoutError as err:
×
94
            log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port)
×
95
            if self._response_fut and not self._response_fut.done():
×
96
                self._response_fut.set_exception(err)
×
97

98
    async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:  # pylint: disable=too-many-return-statements
1✔
99
        """
100
        :return: download success (bool), connected protocol (BlobExchangeClientProtocol)
101
        """
102
        start_time = time.perf_counter()
1✔
103
        request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
1✔
104
        blob_hash = self.blob.blob_hash
1✔
105
        if not self.peer_address:
1!
106
            addr_info = self.transport.get_extra_info('peername')
×
107
            self.peer_address, self.peer_port = addr_info
×
108
        try:
1✔
109
            msg = request.serialize()
1✔
110
            log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
1✔
111
            self.transport.write(msg)
1✔
112
            if self.connection_manager:
1✔
113
                self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
1✔
114
            response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout)
1✔
115
            availability_response = response.get_availability_response()
1✔
116
            price_response = response.get_price_response()
1✔
117
            blob_response = response.get_blob_response()
1✔
118
            if self.closed.is_set():
1!
119
                msg = f"cancelled blob request for {blob_hash} immediately after we got a response"
×
120
                log.warning(msg)
×
121
                raise asyncio.CancelledError(msg)
×
122
            if (not blob_response or blob_response.error) and\
1✔
123
                    (not availability_response or not availability_response.available_blobs):
124
                log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
1✔
125
                            self.peer_port)
126
                log.warning(response.to_dict())
1✔
127
                return self._blob_bytes_received, self.close()
1✔
128
            elif availability_response and availability_response.available_blobs and \
1!
129
                    availability_response.available_blobs != [self.blob.blob_hash]:
130
                log.warning("blob availability response doesn't match our request from %s:%i",
×
131
                            self.peer_address, self.peer_port)
132
                return self._blob_bytes_received, self.close()
×
133
            elif not availability_response:
1!
134
                log.warning("response from %s:%i did not include an availability response (we requested %s)",
×
135
                            self.peer_address, self.peer_port, blob_hash)
136
                return self._blob_bytes_received, self.close()
×
137

138
            if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED':
1!
139
                log.warning("data rate rejected by %s:%i", self.peer_address, self.peer_port)
×
140
                return self._blob_bytes_received, self.close()
×
141
            if not blob_response or blob_response.error:
1!
142
                log.warning("blob can't be downloaded from %s:%i", self.peer_address, self.peer_port)
×
143
                return self._blob_bytes_received, self.close()
×
144
            if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash:
1!
145
                log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port)
×
146
                return self._blob_bytes_received, self.close()
×
147
            if self.blob.length is not None and self.blob.length != blob_response.length:
1!
148
                log.warning("incoming blob unexpected length from %s:%i", self.peer_address, self.peer_port)
×
149
                return self._blob_bytes_received, self.close()
×
150
            msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
1✔
151
                f" timeout in {self.peer_timeout}"
152
            log.debug(msg)
1✔
153
            msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
1✔
154
            await asyncio.wait_for(self.writer.finished, self.peer_timeout)
1✔
155
            # wait for the io to finish
156
            await self.blob.verified.wait()
1✔
157
            log.info("%s at %fMB/s", msg,
1✔
158
                     round((float(self._blob_bytes_received) /
159
                            float(time.perf_counter() - start_time)) / 1000000.0, 2))
160
            # await self.blob.finished_writing.wait()  not necessary, but a dangerous change. TODO: is it needed?
161
            return self._blob_bytes_received, self
1✔
162
        except asyncio.TimeoutError:
1!
163
            return self._blob_bytes_received, self.close()
×
164
        except (InvalidBlobHashError, InvalidDataError):
1✔
165
            log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port)
×
166
            return self._blob_bytes_received, self.close()
×
167

168
    def close(self):
1✔
169
        self.closed.set()
1✔
170
        if self._response_fut and not self._response_fut.done():
1!
171
            self._response_fut.cancel()
×
172
        if self.writer and not self.writer.closed():
1✔
173
            self.writer.close_handle()
1✔
174
        self._response_fut = None
1✔
175
        self.writer = None
1✔
176
        self.blob = None
1✔
177
        if self.transport:
1✔
178
            self.transport.close()
1✔
179
        self.transport = None
1✔
180
        self.buf = b''
1✔
181

182
    async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
1✔
183
        self.closed.clear()
1✔
184
        blob_hash = blob.blob_hash
1✔
185
        if blob.get_is_verified() or not blob.is_writeable():
1!
186
            return 0, self
×
187
        try:
1✔
188
            self._blob_bytes_received = 0
1✔
189
            self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
1✔
190
            self._response_fut = asyncio.Future()
1✔
191
            return await self._download_blob()
1✔
192
        except OSError:
1!
193
            # i'm not sure how to fix this race condition - jack
194
            log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port)
×
195
            # return self._blob_bytes_received, self.transport
196
            raise
×
197
        except asyncio.TimeoutError:
1!
198
            if self._response_fut and not self._response_fut.done():
×
199
                self._response_fut.cancel()
×
200
            self.close()
×
201
            return self._blob_bytes_received, None
×
202
        except asyncio.CancelledError:
1✔
203
            self.close()
1✔
204
            raise
1✔
205
        finally:
206
            if self.writer and not self.writer.closed():
1!
207
                self.writer.close_handle()
×
208
                self.writer = None
×
209

210
    def connection_made(self, transport: asyncio.Transport):
1✔
211
        addr = transport.get_extra_info('peername')
1✔
212
        self.peer_address, self.peer_port = addr[0], addr[1]
1✔
213
        self.transport = transport
1✔
214
        if self.connection_manager:
1✔
215
            self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}")
1✔
216
        log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
1✔
217

218
    def connection_lost(self, exc):
1✔
219
        if self.connection_manager:
1✔
220
            self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}")
1✔
221
        log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(exc),
1✔
222
                  str(type(exc)))
223
        self.close()
1✔
224

225

226
@cache_concurrent
1✔
227
async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['AbstractBlob'], address: str,
1✔
228
                       tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float,
229
                       connected_protocol: Optional['BlobExchangeClientProtocol'] = None,
230
                       connection_id: int = 0, connection_manager: Optional['ConnectionManager'] = None)\
231
        -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
232
    """
233
    Returns [<amount of bytes received>, <client protocol if connected>]
234
    """
235

236
    protocol = connected_protocol
1✔
237
    if not connected_protocol or not connected_protocol.transport or connected_protocol.transport.is_closing():
1✔
238
        connected_protocol = None
1✔
239
        protocol = BlobExchangeClientProtocol(
1✔
240
            loop, blob_download_timeout, connection_manager
241
        )
242
    else:
243
        log.debug("reusing connection for %s:%d", address, tcp_port)
1✔
244
    try:
1✔
245
        if not connected_protocol:
1✔
246
            await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
1✔
247
                                   peer_connect_timeout)
248
            connected_protocol = protocol
1✔
249
        if blob is None or blob.get_is_verified() or not blob.is_writeable():
1!
250
            # blob is None happens when we are just opening a connection
251
            # file exists but not verified means someone is writing right now, give it time, come back later
252
            return 0, connected_protocol
×
253
        return await connected_protocol.download_blob(blob)
1✔
254
    except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
1✔
255
        return 0, None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc