• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698125137

pending completion
4698125137

Pull #1432

github

GitHub
Merge ef21858a9 into ca6ea5f58
Pull Request #1432: chore: Add deb generation to release process

34321 of 44357 relevant lines covered (77.37%)

16844.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.4
/dozer-storage/src/lmdb_multimap.rs
1
use std::ops::Bound;
2

3
use dozer_types::borrow::Cow;
4
use lmdb::{Cursor, Database, DatabaseFlags, RoCursor, RwTransaction, Transaction, WriteFlags};
5
use lmdb_sys::{MDB_LAST_DUP, MDB_SET};
6

7
use crate::{
8
    errors::StorageError,
9
    lmdb_map::database_key_flag,
10
    lmdb_storage::{LmdbEnvironment, RwLmdbEnvironment},
11
    Encode, Iterator, LmdbKey, LmdbKeyType,
12
};
13

14
#[derive(Debug)]
×
15
pub struct LmdbMultimap<K, V> {
16
    db: Database,
17
    _key: std::marker::PhantomData<*const K>,
18
    _value: std::marker::PhantomData<*const V>,
19
}
20

21
impl<K, V> Clone for LmdbMultimap<K, V> {
22
    fn clone(&self) -> Self {
×
23
        Self {
×
24
            db: self.db,
×
25
            _key: std::marker::PhantomData,
×
26
            _value: std::marker::PhantomData,
×
27
        }
×
28
    }
×
29
}
30

31
impl<K, V> Copy for LmdbMultimap<K, V> {}
32

33
// Safety: `Database` is `Send` and `Sync`.
34
unsafe impl<K, V> Send for LmdbMultimap<K, V> {}
35
unsafe impl<K, V> Sync for LmdbMultimap<K, V> {}
36

37
impl<K: LmdbKey, V: LmdbKey> LmdbMultimap<K, V> {
38
    pub fn create(env: &mut RwLmdbEnvironment, name: Option<&str>) -> Result<Self, StorageError> {
880✔
39
        let db = env.create_database(name, database_flag::<K, V>())?;
880✔
40

41
        Ok(Self {
880✔
42
            db,
880✔
43
            _key: std::marker::PhantomData,
880✔
44
            _value: std::marker::PhantomData,
880✔
45
        })
880✔
46
    }
880✔
47

48
    pub fn open<E: LmdbEnvironment>(env: &E, name: Option<&str>) -> Result<Self, StorageError> {
5✔
49
        let db = env.open_database(name)?;
5✔
50

51
        Ok(Self {
5✔
52
            db,
5✔
53
            _key: std::marker::PhantomData,
5✔
54
            _value: std::marker::PhantomData,
5✔
55
        })
5✔
56
    }
5✔
57

58
    pub fn database(&self) -> Database {
4,861✔
59
        self.db
4,861✔
60
    }
4,861✔
61

62
    pub fn count_data<T: Transaction>(&self, txn: &T) -> Result<usize, StorageError> {
50✔
63
        Ok(txn.stat(self.db)?.entries())
50✔
64
    }
50✔
65

66
    pub fn get_first<'a, T: Transaction>(
48✔
67
        &self,
48✔
68
        txn: &'a T,
48✔
69
        key: K::Encode<'_>,
48✔
70
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
48✔
71
        self.get(txn, key, true)
48✔
72
    }
48✔
73

74
    pub fn get_last<'a, T: Transaction>(
10✔
75
        &self,
10✔
76
        txn: &'a T,
10✔
77
        key: K::Encode<'_>,
10✔
78
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
10✔
79
        self.get(txn, key, false)
10✔
80
    }
10✔
81

82
    fn get<'a, T: Transaction>(
58✔
83
        &self,
58✔
84
        txn: &'a T,
58✔
85
        key: K::Encode<'_>,
58✔
86
        first: bool,
58✔
87
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
58✔
88
        let key = key.encode()?;
58✔
89
        let cursor = txn.open_ro_cursor(self.db)?;
58✔
90

91
        match cursor.get(Some(key.as_ref()), None, MDB_SET) {
58✔
92
            Ok((_, value)) => {
16✔
93
                if first {
16✔
94
                    Ok(Some(V::decode(value)?))
8✔
95
                } else {
96
                    let (_, value) = cursor.get(None, None, MDB_LAST_DUP)?;
8✔
97
                    Ok(Some(V::decode(value)?))
8✔
98
                }
99
            }
100
            Err(lmdb::Error::NotFound) => Ok(None),
42✔
101
            Err(err) => Err(err.into()),
×
102
        }
103
    }
58✔
104

105
    /// Returns if the key-value pair was actually inserted.
106
    pub fn insert(
96,876✔
107
        &self,
96,876✔
108
        txn: &mut RwTransaction,
96,876✔
109
        key: K::Encode<'_>,
96,876✔
110
        value: V::Encode<'_>,
96,876✔
111
    ) -> Result<bool, StorageError> {
96,876✔
112
        let key = key.encode()?;
96,876✔
113
        let value = value.encode()?;
96,876✔
114
        match txn.put(self.db, &key, &value, WriteFlags::NO_DUP_DATA) {
96,876✔
115
            Ok(()) => Ok(true),
97,265✔
116
            Err(lmdb::Error::KeyExist) => Ok(false),
1✔
117
            Err(err) => Err(err.into()),
×
118
        }
119
    }
97,266✔
120

121
    /// Returns if the key-value pair was actually removed.
122
    pub fn remove(
46✔
123
        &self,
46✔
124
        txn: &mut RwTransaction,
46✔
125
        key: K::Encode<'_>,
46✔
126
        value: V::Encode<'_>,
46✔
127
    ) -> Result<bool, StorageError> {
46✔
128
        let key = key.encode()?;
46✔
129
        let value = value.encode()?;
46✔
130
        match txn.del(self.db, &key, Some(value.as_ref())) {
46✔
131
            Ok(()) => Ok(true),
45✔
132
            Err(lmdb::Error::NotFound) => Ok(false),
1✔
133
            Err(err) => Err(err.into()),
×
134
        }
135
    }
46✔
136

137
    pub fn iter<'txn, T: Transaction>(
×
138
        &self,
×
139
        txn: &'txn T,
×
140
    ) -> Result<Iterator<'txn, RoCursor<'txn>, K, V>, StorageError> {
×
141
        let cursor = txn.open_ro_cursor(self.db)?;
×
142
        Iterator::new(cursor, Bound::Unbounded, true)
×
143
    }
×
144

145
    pub fn range<'txn, T: Transaction>(
4,175✔
146
        &self,
4,175✔
147
        txn: &'txn T,
4,175✔
148
        starting_key: Bound<K::Encode<'_>>,
4,175✔
149
        ascending: bool,
4,175✔
150
    ) -> Result<Iterator<'txn, RoCursor<'txn>, K, V>, StorageError> {
4,175✔
151
        let cursor = txn.open_ro_cursor(self.db)?;
4,175✔
152
        Iterator::new(cursor, starting_key, ascending)
4,175✔
153
    }
4,175✔
154
}
155

156
fn database_flag<K: LmdbKey, V: LmdbKey>() -> DatabaseFlags {
880✔
157
    let mut flags = database_key_flag::<K>();
880✔
158
    flags |= DatabaseFlags::DUP_SORT;
880✔
159
    match V::TYPE {
880✔
160
        LmdbKeyType::U32 => flags |= DatabaseFlags::DUP_FIXED | DatabaseFlags::INTEGER_DUP,
×
161
        #[cfg(target_pointer_width = "64")]
162
        LmdbKeyType::U64 => flags |= DatabaseFlags::DUP_FIXED | DatabaseFlags::INTEGER_DUP,
712✔
163
        LmdbKeyType::FixedSizeOtherThanU32OrUsize => flags |= DatabaseFlags::DUP_FIXED,
168✔
164
        LmdbKeyType::VariableSize => (),
×
165
    };
166
    flags
880✔
167
}
880✔
168

169
#[cfg(test)]
170
mod tests {
171
    use dozer_types::borrow::IntoOwned;
172
    use tempdir::TempDir;
173

174
    use crate::lmdb_storage::{LmdbEnvironmentManager, LmdbEnvironmentOptions};
175

176
    use super::*;
177

178
    #[test]
1✔
179
    fn test_lmdb_multimap() {
1✔
180
        let temp_dir = TempDir::new("test_lmdb_map").unwrap();
1✔
181
        let mut env = LmdbEnvironmentManager::create_rw(
1✔
182
            temp_dir.path(),
1✔
183
            "env",
1✔
184
            LmdbEnvironmentOptions::default(),
1✔
185
        )
1✔
186
        .unwrap();
1✔
187
        let map = LmdbMultimap::<u64, u64>::create(&mut env, None).unwrap();
1✔
188

1✔
189
        let txn = env.txn_mut().unwrap();
1✔
190
        assert!(map.get_first(txn, &0).unwrap().is_none());
1✔
191
        assert!(map.get_last(txn, &0).unwrap().is_none());
1✔
192
        assert!(map.insert(txn, &1u64, &2u64).unwrap());
1✔
193
        assert!(!map.insert(txn, &1u64, &2u64).unwrap());
1✔
194
        assert!(map.insert(txn, &1u64, &3u64).unwrap());
1✔
195
        assert!(map.get_first(txn, &0).unwrap().is_none());
1✔
196
        assert!(map.get_last(txn, &0).unwrap().is_none());
1✔
197
        assert_eq!(map.get_first(txn, &1).unwrap().unwrap().into_owned(), 2);
1✔
198
        assert_eq!(map.get_last(txn, &1).unwrap().unwrap().into_owned(), 3);
1✔
199
        assert!(map.remove(txn, &1u64, &2u64).unwrap());
1✔
200
        assert!(!map.remove(txn, &1u64, &2u64).unwrap());
1✔
201
    }
1✔
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc