• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lbryio / lbry-sdk / 3788717008

pending completion
3788717008

Pull #3711

github

GitHub
Merge 69297ea9c into 625865165
Pull Request #3711: Bump to Python 3.9 attempt 3.

2802 of 6558 branches covered (42.73%)

Branch coverage included in aggregate %.

25 of 41 new or added lines in 17 files covered. (60.98%)

33 existing lines in 9 files now uncovered.

12281 of 19915 relevant lines covered (61.67%)

1.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.83
/lbry/file/source_manager.py
1
import os
3✔
2
import asyncio
3✔
3
import logging
3✔
4
import typing
3✔
5
from typing import Optional
3✔
6
from lbry.file.source import ManagedDownloadSource
3✔
7
if typing.TYPE_CHECKING:
3!
8
    from lbry.conf import Config
×
9
    from lbry.extras.daemon.analytics import AnalyticsManager
×
10
    from lbry.extras.daemon.storage import SQLiteStorage
×
11

12
log = logging.getLogger(__name__)
3✔
13

14
COMPARISON_OPERATORS = {
3!
15
    'eq': lambda a, b: a == b,
16
    'ne': lambda a, b: a != b,
17
    'g': lambda a, b: a > b,
18
    'l': lambda a, b: a < b,
19
    'ge': lambda a, b: a >= b,
20
    'le': lambda a, b: a <= b,
21
}
22

23

24
class SourceManager:
3✔
25
    filter_fields = {
3✔
26
        'rowid',
27
        'status',
28
        'file_name',
29
        'added_on',
30
        'download_path',
31
        'claim_name',
32
        'claim_height',
33
        'claim_id',
34
        'outpoint',
35
        'txid',
36
        'nout',
37
        'channel_claim_id',
38
        'channel_name',
39
        'completed'
40
    }
41

42
    set_filter_fields = {
3✔
43
        "claim_ids": "claim_id",
44
        "channel_claim_ids": "channel_claim_id",
45
        "outpoints": "outpoint"
46
    }
47

48
    source_class = ManagedDownloadSource
3✔
49

50
    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage',
3✔
51
                 analytics_manager: Optional['AnalyticsManager'] = None):
52
        self.loop = loop
1✔
53
        self.config = config
1✔
54
        self.storage = storage
1✔
55
        self.analytics_manager = analytics_manager
1✔
56
        self._sources: typing.Dict[str, ManagedDownloadSource] = {}
1✔
57
        self.started = asyncio.Event()
1✔
58

59
    def add(self, source: ManagedDownloadSource):
3✔
60
        self._sources[source.identifier] = source
1✔
61

62
    async def remove(self, source: ManagedDownloadSource):
3✔
63
        if source.identifier not in self._sources:
×
64
            return
×
65
        self._sources.pop(source.identifier)
×
NEW
66
        await source.stop_tasks()
×
67

68
    async def initialize_from_database(self):
3✔
69
        raise NotImplementedError()
×
70

71
    async def start(self):
3✔
72
        await self.initialize_from_database()
1✔
73
        self.started.set()
1✔
74

75
    async def stop(self):
3✔
76
        while self._sources:
1✔
77
            _, source = self._sources.popitem()
1✔
78
            await source.stop_tasks()
1✔
79
        self.started.clear()
1✔
80

81
    async def create(self, file_path: str, key: Optional[bytes] = None,
3✔
82
                     iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
83
        raise NotImplementedError()
×
84

85
    async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
3✔
NEW
86
        await self.remove(source)
×
87
        if delete_file and source.output_file_exists:
×
88
            os.remove(source.full_path)
×
89

90
    def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
3✔
91
                     comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]:
92
        """
93
        Get a list of filtered and sorted ManagedStream objects
94

95
        :param sort_by: field to sort by
96
        :param reverse: reverse sorting
97
        :param comparison: comparison operator used for filtering
98
        :param search_by: fields and values to filter by
99
        """
100
        if sort_by and sort_by not in self.filter_fields:
1!
101
            raise ValueError(f"'{sort_by}' is not a valid field to sort by")
×
102
        if comparison and comparison not in COMPARISON_OPERATORS:
1!
103
            raise ValueError(f"'{comparison}' is not a valid comparison")
×
104
        if 'full_status' in search_by:
1!
105
            del search_by['full_status']
×
106

107
        for search in search_by:
1✔
108
            if search not in self.filter_fields:
1!
109
                raise ValueError(f"'{search}' is not a valid search operation")
×
110

111
        compare_sets = {}
1✔
112
        if isinstance(search_by.get('claim_id'), list):
1!
113
            compare_sets['claim_ids'] = search_by.pop('claim_id')
×
114
        if isinstance(search_by.get('outpoint'), list):
1!
115
            compare_sets['outpoints'] = search_by.pop('outpoint')
×
116
        if isinstance(search_by.get('channel_claim_id'), list):
1!
117
            compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
×
118

119
        if search_by or compare_sets:
1!
120
            comparison = comparison or 'eq'
1✔
121
            streams = []
1✔
122
            for stream in self._sources.values():
1!
123
                if compare_sets and not all(
×
124
                        getattr(stream, self.set_filter_fields[set_search]) in val
125
                        for set_search, val in compare_sets.items()):
126
                    continue
×
127
                if search_by and not all(
×
128
                        COMPARISON_OPERATORS[comparison](getattr(stream, search), val)
129
                        for search, val in search_by.items()):
130
                    continue
×
131
                streams.append(stream)
×
132
        else:
133
            streams = list(self._sources.values())
×
134
        if sort_by:
1!
135
            streams.sort(key=lambda s: getattr(s, sort_by) or "")
×
136
            if reverse:
×
137
                streams.reverse()
×
138
        return streams
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc