• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4309615408

pending completion
4309615408

push

github

GitHub
chore: Remove an unused `Arc<RwLock>` (#1106)

6 of 6 new or added lines in 2 files covered. (100.0%)

28466 of 40109 relevant lines covered (70.97%)

50957.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.09
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use std::cmp::Ordering;
2

3
use super::intersection::intersection;
4
use super::iterator::{CacheIterator, KeyEndpoint};
5
use crate::cache::expression::Skip;
6
use crate::cache::lmdb::cache::{id_from_bytes, id_to_bytes, LmdbCacheCommon};
7
use crate::cache::RecordWithId;
8
use crate::cache::{
9
    expression::{Operator, QueryExpression, SortDirection},
10
    index,
11
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
12
};
13
use crate::errors::{CacheError, IndexError};
14
use dozer_storage::lmdb::Transaction;
15
use dozer_types::types::{Field, IndexDefinition, Schema};
16
use itertools::Either;
17

18
pub struct LmdbQueryHandler<'a, T: Transaction> {
19
    common: &'a LmdbCacheCommon,
20
    txn: &'a T,
21
    schema: &'a Schema,
22
    secondary_indexes: &'a [IndexDefinition],
23
    query: &'a QueryExpression,
24
}
25
impl<'a, T: Transaction> LmdbQueryHandler<'a, T> {
26
    pub fn new(
4,457✔
27
        common: &'a LmdbCacheCommon,
4,457✔
28
        txn: &'a T,
4,457✔
29
        schema: &'a Schema,
4,457✔
30
        secondary_indexes: &'a [IndexDefinition],
4,457✔
31
        query: &'a QueryExpression,
4,457✔
32
    ) -> Self {
4,457✔
33
        Self {
4,457✔
34
            common,
4,457✔
35
            txn,
4,457✔
36
            schema,
4,457✔
37
            secondary_indexes,
4,457✔
38
            query,
4,457✔
39
        }
4,457✔
40
    }
4,457✔
41

42
    pub fn count(&self) -> Result<usize, CacheError> {
2,220✔
43
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,220✔
44
        let execution = planner.plan()?;
2,220✔
45
        match execution {
2,219✔
46
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
2,085✔
47
            Plan::SeqScan(_) => Ok(match self.query.skip {
133✔
48
                Skip::Skip(skip) => self
127✔
49
                    .common
127✔
50
                    .db
127✔
51
                    .count(self.txn)?
127✔
52
                    .saturating_sub(skip)
127✔
53
                    .min(self.query.limit.unwrap_or(usize::MAX)),
127✔
54
                Skip::After(_) => self.all_ids()?.count(),
6✔
55
            }),
56
            Plan::ReturnEmpty => Ok(0),
1✔
57
        }
58
    }
2,220✔
59

60
    pub fn query(&self) -> Result<Vec<RecordWithId>, CacheError> {
2,237✔
61
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,237✔
62
        let execution = planner.plan()?;
2,237✔
63
        match execution {
2,236✔
64
            Plan::IndexScans(index_scans) => {
2,088✔
65
                self.collect_records(self.build_index_scan(index_scans)?)
2,088✔
66
            }
67
            Plan::SeqScan(_seq_scan) => self.collect_records(self.all_ids()?),
147✔
68
            Plan::ReturnEmpty => Ok(vec![]),
1✔
69
        }
70
    }
2,237✔
71

72
    pub fn all_ids(&self) -> Result<impl Iterator<Item = u64> + '_, CacheError> {
153✔
73
        let cursor = self.common.id.open_ro_cursor(self.txn)?;
153✔
74
        Ok(skip(
153✔
75
            CacheIterator::new(cursor, None, SortDirection::Ascending)
153✔
76
                .map(map_index_database_entry_to_id),
153✔
77
            self.query.skip,
153✔
78
        )
153✔
79
        .take(self.query.limit.unwrap_or(usize::MAX)))
153✔
80
    }
153✔
81

82
    fn build_index_scan(
4,173✔
83
        &self,
4,173✔
84
        index_scans: Vec<IndexScan>,
4,173✔
85
    ) -> Result<impl Iterator<Item = u64> + '_, CacheError> {
4,173✔
86
        debug_assert!(
87
            !index_scans.is_empty(),
4,173✔
88
            "Planner should not generate empty index scan"
×
89
        );
90
        let full_scan = if index_scans.len() == 1 {
4,173✔
91
            // The fast path, without intersection calculation.
92
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
4,171✔
93
        } else {
94
            // Intersection of multiple index scans.
95
            let iterators = index_scans
2✔
96
                .iter()
2✔
97
                .map(|index_scan| self.query_with_secondary_index(index_scan))
4✔
98
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
99
            Either::Right(intersection(
2✔
100
                iterators,
2✔
101
                self.common.cache_options.intersection_chunk_size,
2✔
102
            ))
2✔
103
        };
104
        Ok(skip(full_scan, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
4,173✔
105
    }
4,173✔
106

107
    fn query_with_secondary_index(
4,175✔
108
        &'a self,
4,175✔
109
        index_scan: &IndexScan,
4,175✔
110
    ) -> Result<impl Iterator<Item = u64> + 'a, CacheError> {
4,175✔
111
        let schema_id = self
4,175✔
112
            .schema
4,175✔
113
            .identifier
4,175✔
114
            .ok_or(CacheError::SchemaHasNoIdentifier)?;
4,175✔
115
        let index_db = *self
4,175✔
116
            .common
4,175✔
117
            .secondary_indexes
4,175✔
118
            .get(&(schema_id, index_scan.index_id))
4,175✔
119
            .ok_or(CacheError::SecondaryIndexDatabaseNotFound)?;
4,175✔
120

×
121
        let RangeSpec {
122
            start,
4,175✔
123
            end,
4,175✔
124
            direction,
4,175✔
125
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
4,175✔
126

×
127
        let cursor = index_db.open_ro_cursor(self.txn)?;
4,175✔
128

×
129
        Ok(CacheIterator::new(cursor, start, direction)
4,175✔
130
            .take_while(move |(key, _)| {
4,175✔
131
                if let Some(end_key) = &end {
2,321,224✔
132
                    match index_db.cmp(self.txn, key, end_key.key()) {
1,876,308✔
133
                        Ordering::Less => matches!(direction, SortDirection::Ascending),
839,684✔
134
                        Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
359,209✔
135
                        Ordering::Greater => matches!(direction, SortDirection::Descending),
677,415✔
136
                    }
×
137
                } else {
138
                    true
444,916✔
139
                }
×
140
            })
2,321,224✔
141
            .map(map_index_database_entry_to_id))
4,175✔
142
    }
4,175✔
143

×
144
    fn collect_records(
2,235✔
145
        &self,
2,235✔
146
        ids: impl Iterator<Item = u64>,
2,235✔
147
    ) -> Result<Vec<RecordWithId>, CacheError> {
2,235✔
148
        ids.map(|id| {
1,127,623✔
149
            self.common
1,127,623✔
150
                .db
1,127,623✔
151
                .get(self.txn, id_to_bytes(id))
1,127,623✔
152
                .map(|record| RecordWithId::new(id, record))
1,127,623✔
153
        })
1,127,623✔
154
        .collect()
2,235✔
155
    }
2,235✔
156
}
×
157

158
#[derive(Debug)]
×
159
struct RangeSpec {
×
160
    start: Option<KeyEndpoint>,
161
    end: Option<KeyEndpoint>,
162
    direction: SortDirection,
163
}
164

165
fn get_range_spec(
4,175✔
166
    index_scan_kind: &IndexScanKind,
4,175✔
167
    is_single_field_sorted_inverted: bool,
4,175✔
168
) -> Result<RangeSpec, CacheError> {
4,175✔
169
    match &index_scan_kind {
4,175✔
170
        IndexScanKind::SortedInverted {
×
171
            eq_filters,
4,145✔
172
            range_query,
4,145✔
173
        } => {
4,145✔
174
            let comparison_key = build_sorted_inverted_comparision_key(
4,145✔
175
                eq_filters,
4,145✔
176
                range_query.as_ref(),
4,145✔
177
                is_single_field_sorted_inverted,
4,145✔
178
            );
4,145✔
179
            // There're 3 cases:
×
180
            // 1. Range query with operator.
181
            // 2. Range query without operator (only order by).
182
            // 3. No range query.
183
            Ok(if let Some(range_query) = range_query {
4,145✔
184
                match range_query.operator_and_value {
3,624✔
185
                    Some((operator, _)) => {
2,228✔
186
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
2,228✔
187
                        let comparison_key = comparison_key.expect("here's at least a range query");
2,228✔
188
                        let null_key = build_sorted_inverted_comparision_key(
2,228✔
189
                            eq_filters,
2,228✔
190
                            Some(&SortedInvertedRangeQuery {
2,228✔
191
                                field_index: range_query.field_index,
2,228✔
192
                                operator_and_value: Some((operator, Field::Null)),
2,228✔
193
                                sort_direction: range_query.sort_direction,
2,228✔
194
                            }),
2,228✔
195
                            is_single_field_sorted_inverted,
2,228✔
196
                        )
2,228✔
197
                        .expect("we provided a range query");
2,228✔
198
                        get_key_interval_from_range_query(
2,228✔
199
                            comparison_key,
2,228✔
200
                            null_key,
2,228✔
201
                            operator,
2,228✔
202
                            range_query.sort_direction,
2,228✔
203
                        )
2,228✔
204
                    }
×
205
                    None => {
206
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
207
                        if let Some(comparison_key) = comparison_key {
1,396✔
208
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
×
209
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
210
                            let null_key = build_sorted_inverted_comparision_key(
962✔
211
                                eq_filters,
962✔
212
                                Some(&SortedInvertedRangeQuery {
962✔
213
                                    field_index: range_query.field_index,
962✔
214
                                    operator_and_value: Some((Operator::LT, Field::Null)),
962✔
215
                                    sort_direction: range_query.sort_direction,
962✔
216
                                }),
962✔
217
                                is_single_field_sorted_inverted,
962✔
218
                            )
962✔
219
                            .expect("we provided a range query");
962✔
220
                            match range_query.sort_direction {
962✔
221
                                SortDirection::Ascending => RangeSpec {
578✔
222
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
578✔
223
                                    end: Some(KeyEndpoint::Including(null_key)),
578✔
224
                                    direction: SortDirection::Ascending,
578✔
225
                                },
578✔
226
                                SortDirection::Descending => RangeSpec {
384✔
227
                                    start: Some(KeyEndpoint::Including(null_key)),
384✔
228
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
229
                                    direction: SortDirection::Descending,
384✔
230
                                },
384✔
231
                            }
×
232
                        } else {
233
                            // Just all of them.
234
                            RangeSpec {
434✔
235
                                start: None,
434✔
236
                                end: None,
434✔
237
                                direction: range_query.sort_direction,
434✔
238
                            }
434✔
239
                        }
×
240
                    }
241
                }
242
            } else {
243
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
244
                let comparison_key = comparison_key
521✔
245
                    .expect("here's at least a eq filter because there's no range query");
521✔
246
                RangeSpec {
521✔
247
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
521✔
248
                    end: Some(KeyEndpoint::Including(comparison_key)),
521✔
249
                    direction: SortDirection::Ascending, // doesn't matter
521✔
250
                }
521✔
251
            })
×
252
        }
253
        IndexScanKind::FullText { filter } => match filter.op {
30✔
254
            Operator::Contains => {
×
255
                let token = match &filter.val {
30✔
256
                    Field::String(token) => token,
28✔
257
                    Field::Text(token) => token,
2✔
258
                    _ => return Err(CacheError::Index(IndexError::ExpectedStringFullText)),
×
259
                };
×
260
                let key = index::get_full_text_secondary_index(token);
30✔
261
                Ok(RangeSpec {
30✔
262
                    start: Some(KeyEndpoint::Including(key.clone())),
30✔
263
                    end: Some(KeyEndpoint::Including(key)),
30✔
264
                    direction: SortDirection::Ascending, // doesn't matter
30✔
265
                })
30✔
266
            }
×
267
            Operator::MatchesAll | Operator::MatchesAny => {
268
                unimplemented!("matches all and matches any are not implemented")
×
269
            }
×
270
            other => panic!("operator {other:?} is not supported by full text index"),
×
271
        },
×
272
    }
273
}
4,175✔
274

×
275
fn build_sorted_inverted_comparision_key(
7,335✔
276
    eq_filters: &[(usize, Field)],
7,335✔
277
    range_query: Option<&SortedInvertedRangeQuery>,
7,335✔
278
    is_single_field_index: bool,
7,335✔
279
) -> Option<Vec<u8>> {
7,335✔
280
    let mut fields = vec![];
7,335✔
281
    eq_filters.iter().for_each(|filter| {
7,335✔
282
        fields.push(&filter.1);
5,761✔
283
    });
7,335✔
284
    if let Some(range_query) = range_query {
7,335✔
285
        if let Some((_, val)) = &range_query.operator_and_value {
6,814✔
286
            fields.push(val);
5,418✔
287
        }
5,418✔
288
    }
521✔
289
    if fields.is_empty() {
7,335✔
290
        None
434✔
291
    } else {
×
292
        Some(index::get_secondary_index(&fields, is_single_field_index))
6,901✔
293
    }
×
294
}
7,335✔
295

×
296
/// Here we use the invariant that `null` is greater than anything.
297
fn get_key_interval_from_range_query(
2,228✔
298
    comparison_key: Vec<u8>,
2,228✔
299
    null_key: Vec<u8>,
2,228✔
300
    operator: Operator,
2,228✔
301
    sort_direction: SortDirection,
2,228✔
302
) -> RangeSpec {
2,228✔
303
    match (operator, sort_direction) {
2,228✔
304
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
148✔
305
            start: None,
148✔
306
            end: Some(KeyEndpoint::Excluding(comparison_key)),
148✔
307
            direction: SortDirection::Ascending,
148✔
308
        },
148✔
309
        (Operator::LT, SortDirection::Descending) => RangeSpec {
384✔
310
            start: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
311
            end: None,
384✔
312
            direction: SortDirection::Descending,
384✔
313
        },
384✔
314
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
28✔
315
            start: None,
28✔
316
            end: Some(KeyEndpoint::Including(comparison_key)),
28✔
317
            direction: SortDirection::Ascending,
28✔
318
        },
28✔
319
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
320
            start: Some(KeyEndpoint::Including(comparison_key)),
×
321
            end: None,
×
322
            direction: SortDirection::Descending,
×
323
        },
×
324
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
176✔
325
            start: Some(KeyEndpoint::Excluding(comparison_key)),
176✔
326
            end: Some(KeyEndpoint::Excluding(null_key)),
176✔
327
            direction: SortDirection::Ascending,
176✔
328
        },
176✔
329
        (Operator::GT, SortDirection::Descending) => RangeSpec {
386✔
330
            start: Some(KeyEndpoint::Excluding(null_key)),
386✔
331
            end: Some(KeyEndpoint::Excluding(comparison_key)),
386✔
332
            direction: SortDirection::Descending,
386✔
333
        },
386✔
334
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
722✔
335
            start: Some(KeyEndpoint::Including(comparison_key)),
722✔
336
            end: Some(KeyEndpoint::Excluding(null_key)),
722✔
337
            direction: SortDirection::Ascending,
722✔
338
        },
722✔
339
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
384✔
340
            start: Some(KeyEndpoint::Excluding(null_key)),
384✔
341
            end: Some(KeyEndpoint::Including(comparison_key)),
384✔
342
            direction: SortDirection::Descending,
384✔
343
        },
384✔
344
        (other, _) => {
×
345
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
346
        }
×
347
    }
348
}
2,228✔
349

×
350
fn map_index_database_entry_to_id((_, id): (&[u8], &[u8])) -> u64 {
2,377,419✔
351
    id_from_bytes(
2,377,419✔
352
        id.try_into()
2,377,419✔
353
            .expect("All values must be u64 ids in index database"),
2,377,419✔
354
    )
2,377,419✔
355
}
2,377,419✔
356

×
357
fn skip(iter: impl Iterator<Item = u64>, skip: Skip) -> impl Iterator<Item = u64> {
4,326✔
358
    match skip {
4,326✔
359
        Skip::Skip(n) => Either::Left(iter.skip(n)),
3,774✔
360
        Skip::After(after) => Either::Right(skip_after(iter, after)),
552✔
361
    }
×
362
}
4,326✔
363

×
364
fn skip_after(iter: impl Iterator<Item = u64>, after: u64) -> impl Iterator<Item = u64> {
552✔
365
    iter.skip_while(move |id| *id != after).skip(1)
154,692✔
366
}
552✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc