• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829268814

pending completion
4829268814

Pull #1516

github

GitHub
Merge 845b68ec1 into f2ab0e6ce
Pull Request #1516: Prepare v0.1.19

35090 of 44737 relevant lines covered (78.44%)

11496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.4
/dozer-storage/src/lmdb_multimap.rs
1
use std::ops::Bound;
2

3
use dozer_types::borrow::Cow;
4
use lmdb::{Cursor, Database, DatabaseFlags, RoCursor, RwTransaction, Transaction, WriteFlags};
5
use lmdb_sys::{MDB_LAST_DUP, MDB_SET};
6

7
use crate::{
8
    errors::StorageError,
9
    lmdb_map::database_key_flag,
10
    lmdb_storage::{LmdbEnvironment, RwLmdbEnvironment},
11
    Encode, Iterator, LmdbKey, LmdbKeyType,
12
};
13

14
#[derive(Debug)]
×
15
pub struct LmdbMultimap<K, V> {
16
    db: Database,
17
    _key: std::marker::PhantomData<*const K>,
18
    _value: std::marker::PhantomData<*const V>,
19
}
20

21
impl<K, V> Clone for LmdbMultimap<K, V> {
22
    fn clone(&self) -> Self {
×
23
        Self {
×
24
            db: self.db,
×
25
            _key: std::marker::PhantomData,
×
26
            _value: std::marker::PhantomData,
×
27
        }
×
28
    }
×
29
}
30

31
impl<K, V> Copy for LmdbMultimap<K, V> {}
32

33
// Safety: `Database` is `Send` and `Sync`.
34
unsafe impl<K, V> Send for LmdbMultimap<K, V> {}
35
unsafe impl<K, V> Sync for LmdbMultimap<K, V> {}
36

37
impl<K: LmdbKey, V: LmdbKey> LmdbMultimap<K, V> {
38
    pub fn create(env: &mut RwLmdbEnvironment, name: Option<&str>) -> Result<Self, StorageError> {
566✔
39
        let db = env.create_database(name, database_flag::<K, V>())?;
566✔
40

41
        Ok(Self {
566✔
42
            db,
566✔
43
            _key: std::marker::PhantomData,
566✔
44
            _value: std::marker::PhantomData,
566✔
45
        })
566✔
46
    }
566✔
47

48
    pub fn open<E: LmdbEnvironment>(env: &E, name: Option<&str>) -> Result<Self, StorageError> {
5✔
49
        let db = env.open_database(name)?;
5✔
50

51
        Ok(Self {
5✔
52
            db,
5✔
53
            _key: std::marker::PhantomData,
5✔
54
            _value: std::marker::PhantomData,
5✔
55
        })
5✔
56
    }
5✔
57

58
    pub fn database(&self) -> Database {
529✔
59
        self.db
529✔
60
    }
529✔
61

62
    pub fn count_data<T: Transaction>(&self, txn: &T) -> Result<usize, StorageError> {
38✔
63
        Ok(txn.stat(self.db)?.entries())
38✔
64
    }
38✔
65

66
    pub fn get_first<'a, T: Transaction>(
34✔
67
        &self,
34✔
68
        txn: &'a T,
34✔
69
        key: K::Encode<'_>,
34✔
70
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
34✔
71
        self.get(txn, key, true)
34✔
72
    }
34✔
73

74
    pub fn get_last<'a, T: Transaction>(
8✔
75
        &self,
8✔
76
        txn: &'a T,
8✔
77
        key: K::Encode<'_>,
8✔
78
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
8✔
79
        self.get(txn, key, false)
8✔
80
    }
8✔
81

82
    fn get<'a, T: Transaction>(
42✔
83
        &self,
42✔
84
        txn: &'a T,
42✔
85
        key: K::Encode<'_>,
42✔
86
        first: bool,
42✔
87
    ) -> Result<Option<Cow<'a, V>>, StorageError> {
42✔
88
        let key = key.encode()?;
42✔
89
        let cursor = txn.open_ro_cursor(self.db)?;
42✔
90

91
        match cursor.get(Some(key.as_ref()), None, MDB_SET) {
42✔
92
            Ok((_, value)) => {
12✔
93
                if first {
12✔
94
                    Ok(Some(V::decode(value)?))
6✔
95
                } else {
96
                    let (_, value) = cursor.get(None, None, MDB_LAST_DUP)?;
6✔
97
                    Ok(Some(V::decode(value)?))
6✔
98
                }
99
            }
100
            Err(lmdb::Error::NotFound) => Ok(None),
30✔
101
            Err(err) => Err(err.into()),
×
102
        }
103
    }
42✔
104

105
    /// Returns if the key-value pair was actually inserted.
106
    pub fn insert(
18,572✔
107
        &self,
18,572✔
108
        txn: &mut RwTransaction,
18,572✔
109
        key: K::Encode<'_>,
18,572✔
110
        value: V::Encode<'_>,
18,572✔
111
    ) -> Result<bool, StorageError> {
18,572✔
112
        let key = key.encode()?;
18,572✔
113
        let value = value.encode()?;
18,572✔
114
        match txn.put(self.db, &key, &value, WriteFlags::NO_DUP_DATA) {
18,572✔
115
            Ok(()) => Ok(true),
19,039✔
116
            Err(lmdb::Error::KeyExist) => Ok(false),
1✔
117
            Err(err) => Err(err.into()),
×
118
        }
119
    }
19,040✔
120

121
    /// Returns if the key-value pair was actually removed.
122
    pub fn remove(
40✔
123
        &self,
40✔
124
        txn: &mut RwTransaction,
40✔
125
        key: K::Encode<'_>,
40✔
126
        value: V::Encode<'_>,
40✔
127
    ) -> Result<bool, StorageError> {
40✔
128
        let key = key.encode()?;
40✔
129
        let value = value.encode()?;
40✔
130
        match txn.del(self.db, &key, Some(value.as_ref())) {
40✔
131
            Ok(()) => Ok(true),
39✔
132
            Err(lmdb::Error::NotFound) => Ok(false),
1✔
133
            Err(err) => Err(err.into()),
×
134
        }
135
    }
40✔
136

137
    pub fn iter<'txn, T: Transaction>(
×
138
        &self,
×
139
        txn: &'txn T,
×
140
    ) -> Result<Iterator<'txn, RoCursor<'txn>, K, V>, StorageError> {
×
141
        let cursor = txn.open_ro_cursor(self.db)?;
×
142
        Iterator::new(cursor, Bound::Unbounded, true)
×
143
    }
×
144

145
    pub fn range<'txn, T: Transaction>(
95✔
146
        &self,
95✔
147
        txn: &'txn T,
95✔
148
        starting_key: Bound<K::Encode<'_>>,
95✔
149
        ascending: bool,
95✔
150
    ) -> Result<Iterator<'txn, RoCursor<'txn>, K, V>, StorageError> {
95✔
151
        let cursor = txn.open_ro_cursor(self.db)?;
95✔
152
        Iterator::new(cursor, starting_key, ascending)
95✔
153
    }
95✔
154
}
155

156
fn database_flag<K: LmdbKey, V: LmdbKey>() -> DatabaseFlags {
566✔
157
    let mut flags = database_key_flag::<K>();
566✔
158
    flags |= DatabaseFlags::DUP_SORT;
566✔
159
    match V::TYPE {
566✔
160
        LmdbKeyType::U32 => flags |= DatabaseFlags::DUP_FIXED | DatabaseFlags::INTEGER_DUP,
×
161
        #[cfg(target_pointer_width = "64")]
162
        LmdbKeyType::U64 => flags |= DatabaseFlags::DUP_FIXED | DatabaseFlags::INTEGER_DUP,
448✔
163
        LmdbKeyType::FixedSizeOtherThanU32OrUsize => flags |= DatabaseFlags::DUP_FIXED,
118✔
164
        LmdbKeyType::VariableSize => (),
×
165
    };
166
    flags
566✔
167
}
566✔
168

169
#[cfg(test)]
170
mod tests {
171
    use dozer_types::borrow::IntoOwned;
172
    use tempdir::TempDir;
173

174
    use crate::lmdb_storage::{LmdbEnvironmentManager, LmdbEnvironmentOptions};
175

176
    use super::*;
177

178
    #[test]
1✔
179
    fn test_lmdb_multimap() {
1✔
180
        let temp_dir = TempDir::new("test_lmdb_map").unwrap();
1✔
181
        let mut env = LmdbEnvironmentManager::create_rw(
1✔
182
            temp_dir.path(),
1✔
183
            "env",
1✔
184
            LmdbEnvironmentOptions::default(),
1✔
185
        )
1✔
186
        .unwrap();
1✔
187
        let map = LmdbMultimap::<u64, u64>::create(&mut env, None).unwrap();
1✔
188

1✔
189
        let txn = env.txn_mut().unwrap();
1✔
190
        assert!(map.get_first(txn, &0).unwrap().is_none());
1✔
191
        assert!(map.get_last(txn, &0).unwrap().is_none());
1✔
192
        assert!(map.insert(txn, &1u64, &2u64).unwrap());
1✔
193
        assert!(!map.insert(txn, &1u64, &2u64).unwrap());
1✔
194
        assert!(map.insert(txn, &1u64, &3u64).unwrap());
1✔
195
        assert!(map.get_first(txn, &0).unwrap().is_none());
1✔
196
        assert!(map.get_last(txn, &0).unwrap().is_none());
1✔
197
        assert_eq!(map.get_first(txn, &1).unwrap().unwrap().into_owned(), 2);
1✔
198
        assert_eq!(map.get_last(txn, &1).unwrap().unwrap().into_owned(), 3);
1✔
199
        assert!(map.remove(txn, &1u64, &2u64).unwrap());
1✔
200
        assert!(!map.remove(txn, &1u64, &2u64).unwrap());
1✔
201
    }
1✔
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc