• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.39
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use super::intersection::intersection;
2
use crate::cache::expression::Skip;
3
use crate::cache::lmdb::cache::main_environment::MainEnvironment;
4
use crate::cache::lmdb::cache::query::secondary::build_index_scan;
5
use crate::cache::lmdb::cache::LmdbCache;
6
use crate::cache::CacheRecord;
7
use crate::cache::{
8
    expression::QueryExpression,
9
    plan::{IndexScan, Plan, QueryPlanner},
10
};
11
use crate::errors::{CacheError, PlanError};
12
use dozer_storage::errors::StorageError;
13
use dozer_storage::lmdb::{RoTransaction, Transaction};
14
use dozer_storage::LmdbEnvironment;
15
use dozer_types::borrow::IntoOwned;
16
use itertools::Either;
17

18
pub struct LmdbQueryHandler<'a, C: LmdbCache> {
19
    cache: &'a C,
20
    query: &'a QueryExpression,
21
}
22

23
impl<'a, C: LmdbCache> LmdbQueryHandler<'a, C> {
24
    pub fn new(cache: &'a C, query: &'a QueryExpression) -> Self {
255✔
25
        Self { cache, query }
255✔
26
    }
255✔
27

28
    pub fn count(&self) -> Result<usize, CacheError> {
29
        match self.plan()? {
111✔
30
            Plan::IndexScans(index_scans) => {
57✔
31
                let secondary_txns = self.create_secondary_txns(&index_scans)?;
57✔
32
                let ids = self.combine_secondary_queries(&index_scans, &secondary_txns)?;
57✔
33
                self.count_secondary_queries(ids)
57✔
34
            }
35
            Plan::SeqScan(_) => Ok(match self.query.skip {
52✔
36
                Skip::Skip(skip) => self
52✔
37
                    .cache
52✔
38
                    .main_env()
52✔
39
                    .count()?
52✔
40
                    .saturating_sub(skip)
52✔
41
                    .min(self.query.limit.unwrap_or(usize::MAX)),
52✔
42
                Skip::After(_) => self.all_ids(&self.cache.main_env().begin_txn()?)?.count(),
×
43
            }),
44
            Plan::ReturnEmpty => Ok(0),
1✔
45
        }
46
    }
111✔
47

48
    pub fn query(&self) -> Result<Vec<CacheRecord>, CacheError> {
49
        match self.plan()? {
144✔
50
            Plan::IndexScans(index_scans) => {
60✔
51
                let secondary_txns = self.create_secondary_txns(&index_scans)?;
60✔
52
                let main_txn = self.cache.main_env().begin_txn()?;
60✔
53
                #[allow(clippy::let_and_return)] // Must do let binding unless won't compile
54
                let result = self.collect_records(
60✔
55
                    &main_txn,
60✔
56
                    self.combine_secondary_queries(&index_scans, &secondary_txns)?,
60✔
57
                );
58
                result
60✔
59
            }
60
            Plan::SeqScan(_seq_scan) => {
82✔
61
                let main_txn = self.cache.main_env().begin_txn()?;
82✔
62
                #[allow(clippy::let_and_return)] // Must do let binding unless won't compile
63
                let result = self.collect_records(&main_txn, self.all_ids(&main_txn)?);
82✔
64
                result
82✔
65
            }
66
            Plan::ReturnEmpty => Ok(vec![]),
1✔
67
        }
68
    }
144✔
69

70
    fn plan(&self) -> Result<Plan, PlanError> {
255✔
71
        let (schema, secondary_indexes) = self.cache.main_env().schema();
255✔
72
        let planner = QueryPlanner::new(
255✔
73
            schema,
255✔
74
            secondary_indexes,
255✔
75
            self.query.filter.as_ref(),
255✔
76
            &self.query.order_by,
255✔
77
        );
255✔
78
        planner.plan()
255✔
79
    }
255✔
80

81
    fn all_ids<'txn, T: Transaction>(
82✔
82
        &self,
82✔
83
        main_txn: &'txn T,
82✔
84
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'txn, CacheError> {
82✔
85
        let schema_is_append_only = self.cache.main_env().schema().0.is_append_only();
82✔
86
        let all_ids = self
82✔
87
            .cache
82✔
88
            .main_env()
82✔
89
            .operation_log()
82✔
90
            .present_operation_ids(main_txn, schema_is_append_only)?
82✔
91
            .map(|result| {
2,872✔
92
                result
2,872✔
93
                    .map(|id| id.into_owned())
2,872✔
94
                    .map_err(CacheError::Storage)
2,872✔
95
            });
2,872✔
96
        Ok(skip(all_ids, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
82✔
97
    }
82✔
98

99
    fn create_secondary_txns(
117✔
100
        &self,
117✔
101
        index_scans: &[IndexScan],
117✔
102
    ) -> Result<Vec<RoTransaction<'_>>, StorageError> {
117✔
103
        index_scans
117✔
104
            .iter()
117✔
105
            .map(|index_scan| self.cache.secondary_env(index_scan.index_id).begin_txn())
119✔
106
            .collect()
117✔
107
    }
117✔
108

109
    fn combine_secondary_queries<'txn, T: Transaction>(
117✔
110
        &self,
117✔
111
        index_scans: &[IndexScan],
117✔
112
        secondary_txns: &'txn [T],
117✔
113
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'txn, CacheError> {
117✔
114
        debug_assert!(
115
            !index_scans.is_empty(),
117✔
116
            "Planner should not generate empty index scan"
×
117
        );
118
        let combined = if index_scans.len() == 1 {
117✔
119
            // The fast path, without intersection calculation.
120
            Either::Left(build_index_scan(
115✔
121
                &secondary_txns[0],
115✔
122
                self.cache.secondary_env(index_scans[0].index_id),
115✔
123
                &index_scans[0].kind,
115✔
124
            )?)
115✔
125
        } else {
126
            // Intersection of multiple index scans.
127
            let iterators = index_scans
2✔
128
                .iter()
2✔
129
                .zip(secondary_txns)
2✔
130
                .map(|(index_scan, secondary_txn)| {
4✔
131
                    build_index_scan(
4✔
132
                        secondary_txn,
4✔
133
                        self.cache.secondary_env(index_scan.index_id),
4✔
134
                        &index_scan.kind,
4✔
135
                    )
4✔
136
                })
4✔
137
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
138
            Either::Right(intersection(
2✔
139
                iterators,
2✔
140
                self.cache.main_env().intersection_chunk_size(),
2✔
141
            ))
2✔
142
        };
143
        Ok(skip(combined, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
117✔
144
    }
117✔
145

146
    fn filter_secondary_queries<'txn, T: Transaction>(
199✔
147
        &'txn self,
199✔
148
        main_txn: &'txn T,
199✔
149
        ids: impl Iterator<Item = Result<u64, CacheError>> + 'txn,
199✔
150
    ) -> impl Iterator<Item = Result<u64, CacheError>> + 'txn {
199✔
151
        let schema_is_append_only = self.cache.main_env().schema().0.is_append_only();
199✔
152
        ids.filter_map(move |id| match id {
4,267✔
153
            Ok(id) => match self.cache.main_env().operation_log().contains_operation_id(
4,267✔
154
                main_txn,
4,267✔
155
                schema_is_append_only,
4,267✔
156
                id,
4,267✔
157
            ) {
4,267✔
158
                Ok(true) => Some(Ok(id)),
4,267✔
159
                Ok(false) => None,
×
160
                Err(err) => Some(Err(err.into())),
×
161
            },
162
            Err(err) => Some(Err(err)),
×
163
        })
4,267✔
164
    }
199✔
165

166
    fn count_secondary_queries(
57✔
167
        &self,
57✔
168
        ids: impl Iterator<Item = Result<u64, CacheError>>,
57✔
169
    ) -> Result<usize, CacheError> {
57✔
170
        let main_txn = self.cache.main_env().begin_txn()?;
57✔
171

172
        let mut result = 0;
57✔
173
        for maybe_id in self.filter_secondary_queries(&main_txn, ids) {
708✔
174
            maybe_id?;
708✔
175
            result += 1;
708✔
176
        }
177
        Ok(result)
57✔
178
    }
57✔
179

180
    fn collect_records<'txn, T: Transaction>(
142✔
181
        &'txn self,
142✔
182
        main_txn: &'txn T,
142✔
183
        ids: impl Iterator<Item = Result<u64, CacheError>> + 'txn,
142✔
184
    ) -> Result<Vec<CacheRecord>, CacheError> {
142✔
185
        self.filter_secondary_queries(main_txn, ids)
142✔
186
            .map(|id| {
3,559✔
187
                id.and_then(|id| {
3,559✔
188
                    self.cache
3,559✔
189
                        .main_env()
3,559✔
190
                        .operation_log()
3,559✔
191
                        .get_record_by_operation_id_unchecked(main_txn, id)
3,559✔
192
                        .map_err(Into::into)
3,559✔
193
                })
3,559✔
194
            })
3,559✔
195
            .collect()
142✔
196
    }
142✔
197
}
198

199
fn skip(
199✔
200
    iter: impl Iterator<Item = Result<u64, CacheError>>,
199✔
201
    skip: Skip,
199✔
202
) -> impl Iterator<Item = Result<u64, CacheError>> {
199✔
203
    match skip {
199✔
204
        Skip::Skip(n) => Either::Left(iter.skip(n)),
199✔
205
        Skip::After(after) => Either::Right(skip_after(iter, after)),
×
206
    }
207
}
199✔
208

209
struct SkipAfter<T: Iterator<Item = Result<u64, CacheError>>> {
210
    inner: T,
211
    after: Option<u64>,
212
}
213

214
impl<T: Iterator<Item = Result<u64, CacheError>>> Iterator for SkipAfter<T> {
215
    type Item = Result<u64, CacheError>;
216

217
    fn next(&mut self) -> Option<Self::Item> {
×
218
        loop {
219
            if let Some(after) = self.after {
×
220
                match self.inner.next() {
×
221
                    Some(Ok(id)) => {
×
222
                        if id == after {
×
223
                            self.after = None;
×
224
                        }
×
225
                    }
226
                    Some(Err(e)) => return Some(Err(e)),
×
227
                    None => return None,
×
228
                }
229
            } else {
230
                return self.inner.next();
×
231
            }
232
        }
233
    }
×
234
}
235

236
fn skip_after<T: Iterator<Item = Result<u64, CacheError>>>(iter: T, after: u64) -> SkipAfter<T> {
×
237
    SkipAfter {
×
238
        inner: iter,
×
239
        after: Some(after),
×
240
    }
×
241
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc