• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3708794183

pending completion
3708794183

Pull #3657

github

GitHub
Merge 636b7ed47 into 625865165
Pull Request #3657: wip: add initial support for streaming torrent files

2754 of 6491 branches covered (42.43%)

Branch coverage included in aggregate %.

64 of 245 new or added lines in 12 files covered. (26.12%)

20 existing lines in 5 files now uncovered.

12055 of 19808 relevant lines covered (60.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.37
/lbry/file/source_manager.py
1
import os
1✔
2
import asyncio
1✔
3
import logging
1✔
4
import typing
1✔
5
from typing import Optional
1✔
6
from lbry.file.source import ManagedDownloadSource
1✔
7
if typing.TYPE_CHECKING:
1!
8
    from lbry.conf import Config
×
9
    from lbry.extras.daemon.analytics import AnalyticsManager
×
10
    from lbry.extras.daemon.storage import SQLiteStorage
×
11

12
log = logging.getLogger(__name__)
1✔
13

14
COMPARISON_OPERATORS = {
1!
15
    'eq': lambda a, b: a == b,
16
    'ne': lambda a, b: a != b,
17
    'g': lambda a, b: a > b,
18
    'l': lambda a, b: a < b,
19
    'ge': lambda a, b: a >= b,
20
    'le': lambda a, b: a <= b,
21
}
22

23

24
class SourceManager:
1✔
25
    filter_fields = {
1✔
26
        'identifier',
27
        'rowid',
28
        'status',
29
        'file_name',
30
        'added_on',
31
        'download_path',
32
        'claim_name',
33
        'claim_height',
34
        'claim_id',
35
        'outpoint',
36
        'txid',
37
        'nout',
38
        'channel_claim_id',
39
        'channel_name',
40
        'completed'
41
    }
42

43
    set_filter_fields = {
1✔
44
        "claim_ids": "claim_id",
45
        "channel_claim_ids": "channel_claim_id",
46
        "outpoints": "outpoint"
47
    }
48

49
    source_class = ManagedDownloadSource
1✔
50

51
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage',
1✔
52
                 analytics_manager: Optional['AnalyticsManager'] = None):
53
        self.loop = loop
1✔
54
        self.config = config
1✔
55
        self.storage = storage
1✔
56
        self.analytics_manager = analytics_manager
1✔
57
        self._sources: typing.Dict[str, ManagedDownloadSource] = {}
1✔
58
        self.started = asyncio.Event()
1✔
59

60
    def add(self, source: ManagedDownloadSource):
1✔
61
        self._sources[source.identifier] = source
1✔
62

63
    def remove(self, source: ManagedDownloadSource):
1✔
64
        if source.identifier not in self._sources:
×
65
            return
×
66
        self._sources.pop(source.identifier)
×
67
        source.stop_tasks()
×
68

69
    async def initialize_from_database(self):
1✔
70
        raise NotImplementedError()
×
71

72
    async def start(self):
1✔
73
        await self.initialize_from_database()
1✔
74
        self.started.set()
1✔
75

76
    def stop(self):
1✔
77
        while self._sources:
1✔
78
            _, source = self._sources.popitem()
1✔
79
            source.stop_tasks()
1✔
80
        self.started.clear()
1✔
81

82
    async def create(self, file_path: str, key: Optional[bytes] = None,
1✔
83
                     iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
84
        raise NotImplementedError()
×
85

86
    async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
1✔
NEW
87
        await self.storage.delete_torrent(source.identifier)
×
88
        self.remove(source)
×
89
        if delete_file and source.output_file_exists:
×
90
            os.remove(source.full_path)
×
91

92
    def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
1✔
93
                     comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]:
94
        """
95
        Get a list of filtered and sorted ManagedStream objects
96

97
        :param sort_by: field to sort by
98
        :param reverse: reverse sorting
99
        :param comparison: comparison operator used for filtering
100
        :param search_by: fields and values to filter by
101
        """
102
        if sort_by and sort_by not in self.filter_fields:
1!
103
            raise ValueError(f"'{sort_by}' is not a valid field to sort by")
×
104
        if comparison and comparison not in COMPARISON_OPERATORS:
1!
105
            raise ValueError(f"'{comparison}' is not a valid comparison")
×
106
        if 'full_status' in search_by:
1!
107
            del search_by['full_status']
×
108

109
        for search in search_by:
1✔
110
            if search not in self.filter_fields:
1!
111
                raise ValueError(f"'{search}' is not a valid search operation")
×
112

113
        compare_sets = {}
1✔
114
        if isinstance(search_by.get('claim_id'), list):
1!
115
            compare_sets['claim_ids'] = search_by.pop('claim_id')
×
116
        if isinstance(search_by.get('outpoint'), list):
1!
117
            compare_sets['outpoints'] = search_by.pop('outpoint')
×
118
        if isinstance(search_by.get('channel_claim_id'), list):
1!
119
            compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
×
120

121
        if search_by or compare_sets:
1!
122
            comparison = comparison or 'eq'
1✔
123
            streams = []
1✔
124
            for stream in self._sources.values():
1!
125
                if compare_sets and not all(
×
126
                        getattr(stream, self.set_filter_fields[set_search]) in val
127
                        for set_search, val in compare_sets.items()):
128
                    continue
×
129
                if search_by and not all(
×
130
                        COMPARISON_OPERATORS[comparison](getattr(stream, search), val)
131
                        for search, val in search_by.items()):
132
                    continue
×
133
                streams.append(stream)
×
134
        else:
135
            streams = list(self._sources.values())
×
136
        if sort_by:
1!
137
            streams.sort(key=lambda s: getattr(s, sort_by) or "")
×
138
            if reverse:
×
139
                streams.reverse()
×
140
        return streams
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc