• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4309615408

pending completion
4309615408

push

github

GitHub
chore: Remove an unused `Arc<RwLock>` (#1106)

6 of 6 new or added lines in 2 files covered. (100.0%)

28466 of 40109 relevant lines covered (70.97%)

50957.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.91
/dozer-types/src/types/mod.rs
1
use ahash::AHasher;
2
use geo::{point, GeodesicDistance, Point};
3
use ordered_float::OrderedFloat;
4
use std::array::TryFromSliceError;
5
use std::cmp::Ordering;
6
use std::fmt::{Display, Formatter};
7
use std::hash::{Hash, Hasher};
8
use std::str::FromStr;
9

10
use crate::errors::types::TypeError;
11
use prettytable::{Cell, Row, Table};
12
use serde::{self, Deserialize, Serialize};
13

14
mod field;
15

16
use crate::errors::types::TypeError::InvalidFieldValue;
17
pub use field::{field_test_cases, Field, FieldBorrow, FieldType, DATE_FORMAT};
18

19
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
1,284,796✔
20
pub enum SourceDefinition {
21
    Table { connection: String, name: String },
22
    Alias { name: String },
23
    Dynamic,
24
}
25
impl Default for SourceDefinition {
26
    fn default() -> Self {
180✔
27
        SourceDefinition::Dynamic
180✔
28
    }
180✔
29
}
30

31
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
1,281,067✔
32
pub struct FieldDefinition {
33
    pub name: String,
34
    pub typ: FieldType,
35
    pub nullable: bool,
36
    #[serde(default)]
37
    pub source: SourceDefinition,
38
}
39

40
impl FieldDefinition {
41
    pub fn new(name: String, typ: FieldType, nullable: bool, source: SourceDefinition) -> Self {
34,720✔
42
        Self {
34,720✔
43
            name,
34,720✔
44
            typ,
34,720✔
45
            nullable,
34,720✔
46
            source,
34,720✔
47
        }
34,720✔
48
    }
34,720✔
49
}
50

51
#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
1,450,303✔
52
pub struct SchemaIdentifier {
53
    pub id: u32,
54
    pub version: u16,
55
}
56

57
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
74,735✔
58
pub struct Schema {
59
    /// Unique identifier and version for this schema. This value is required only if the schema
60
    /// is represented by a valid entry in the schema registry. For nested schemas, this field
61
    /// is not applicable
62
    pub identifier: Option<SchemaIdentifier>,
63

64
    /// fields contains a list of FieldDefinition for all the fields that appear in a record.
65
    /// Not necessarily all these fields will end up in the final object structure stored in
66
    /// the cache. Some fields might only be used for indexing purposes only.
67
    pub fields: Vec<FieldDefinition>,
68

69
    /// Indexes of the fields forming the primary key for this schema. If the value is empty
70
    /// only Insert Operation are supported. Updates and Deletes are not supported without a
71
    /// primary key definition
72
    #[serde(default)]
73
    pub primary_index: Vec<usize>,
74
}
75

76
#[derive(Clone, Serialize, Deserialize, Debug)]
65✔
77
pub enum ReplicationChangesTrackingType {
78
    FullChanges,
79
    OnlyPK,
80
    Nothing,
81
}
82

83
impl Default for ReplicationChangesTrackingType {
84
    fn default() -> Self {
90✔
85
        ReplicationChangesTrackingType::Nothing
90✔
86
    }
90✔
87
}
88

89
#[derive(Clone, Serialize, Deserialize, Debug)]
225✔
90
pub struct SourceSchema {
91
    pub name: String,
92
    pub schema: Schema,
93
    #[serde(default)]
94
    pub replication_type: ReplicationChangesTrackingType,
95
}
96

97
impl SourceSchema {
98
    pub fn new(
20✔
99
        name: String,
20✔
100
        schema: Schema,
20✔
101
        replication_type: ReplicationChangesTrackingType,
20✔
102
    ) -> Self {
20✔
103
        Self {
20✔
104
            name,
20✔
105
            schema,
20✔
106
            replication_type,
20✔
107
        }
20✔
108
    }
20✔
109
}
110

111
impl Schema {
112
    pub fn empty() -> Schema {
10,000✔
113
        Self {
10,000✔
114
            identifier: None,
10,000✔
115
            fields: Vec::new(),
10,000✔
116
            primary_index: Vec::new(),
10,000✔
117
        }
10,000✔
118
    }
10,000✔
119

120
    pub fn field(&mut self, f: FieldDefinition, pk: bool) -> &mut Self {
26,990✔
121
        self.fields.push(f);
26,990✔
122
        if pk {
26,990✔
123
            self.primary_index.push(&self.fields.len() - 1)
2,380✔
124
        }
24,610✔
125
        self
26,990✔
126
    }
26,990✔
127

128
    pub fn get_field_index(&self, name: &str) -> Result<(usize, &FieldDefinition), TypeError> {
×
129
        let r = self
×
130
            .fields
×
131
            .iter()
×
132
            .enumerate()
×
133
            .find(|f| f.1.name.as_str() == name);
×
134
        match r {
×
135
            Some(v) => Ok(v),
×
136
            _ => Err(TypeError::InvalidFieldName(name.to_string())),
×
137
        }
138
    }
×
139

140
    pub fn print(&self) -> Table {
80✔
141
        let mut table = Table::new();
80✔
142
        table.add_row(row!["Field", "Type", "Nullable"]);
80✔
143
        for f in &self.fields {
280✔
144
            table.add_row(row![f.name, format!("{:?}", f.typ), f.nullable]);
200✔
145
        }
200✔
146
        table
80✔
147
    }
80✔
148

149
    pub fn set_identifier(
×
150
        &mut self,
×
151
        identifier: Option<SchemaIdentifier>,
×
152
    ) -> Result<(), TypeError> {
×
153
        self.identifier = identifier;
×
154
        Ok(())
×
155
    }
×
156
}
157

158
impl Display for Schema {
159
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
160
        let table = self.print();
×
161
        table.fmt(f)
×
162
    }
×
163
}
164

165
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
1,376✔
166
pub enum IndexDefinition {
167
    /// The sorted inverted index, supporting `Eq` filter on multiple fields and `LT`, `LTE`, `GT`, `GTE` filter on at most one field.
168
    SortedInverted(Vec<usize>),
169
    /// Full text index, supporting `Contains`, `MatchesAny` and `MatchesAll` filter on exactly one field.
170
    FullText(usize),
171
}
172

173
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2,352,764✔
174
pub struct Record {
175
    /// Schema implemented by this Record
176
    pub schema_id: Option<SchemaIdentifier>,
177
    /// List of values, following the definitions of `fields` of the associated schema
178
    pub values: Vec<Field>,
179
    /// Records with same primary key will have increasing version.
180
    pub version: Option<u32>,
181
}
182

183
impl Record {
184
    pub fn new(
44,223,520✔
185
        schema_id: Option<SchemaIdentifier>,
44,223,520✔
186
        values: Vec<Field>,
44,223,520✔
187
        version: Option<u32>,
44,223,520✔
188
    ) -> Record {
44,223,520✔
189
        Record {
44,223,520✔
190
            schema_id,
44,223,520✔
191
            values,
44,223,520✔
192
            version,
44,223,520✔
193
        }
44,223,520✔
194
    }
44,223,520✔
195

196
    pub fn from_schema(schema: &Schema) -> Record {
56,470✔
197
        Record {
56,470✔
198
            schema_id: schema.identifier,
56,470✔
199
            values: vec![Field::Null; schema.fields.len()],
56,470✔
200
            version: None,
56,470✔
201
        }
56,470✔
202
    }
56,470✔
203

204
    pub fn nulls(schema_id: Option<SchemaIdentifier>, size: usize, version: Option<u32>) -> Record {
×
205
        Record {
×
206
            schema_id,
×
207
            values: vec![Field::Null; size],
×
208
            version,
×
209
        }
×
210
    }
×
211

212
    pub fn iter(&self) -> core::slice::Iter<'_, Field> {
×
213
        self.values.iter()
×
214
    }
×
215

216
    pub fn set_value(&mut self, idx: usize, value: Field) {
×
217
        self.values[idx] = value;
×
218
    }
×
219

220
    pub fn push_value(&mut self, value: Field) {
×
221
        self.values.push(value);
×
222
    }
×
223

224
    pub fn get_value(&self, idx: usize) -> Result<&Field, TypeError> {
2,195,370✔
225
        match self.values.get(idx) {
2,195,370✔
226
            Some(f) => Ok(f),
2,195,370✔
227
            _ => Err(TypeError::InvalidFieldIndex(idx)),
×
228
        }
229
    }
2,195,370✔
230

231
    pub fn get_key(&self, indexes: &Vec<usize>) -> Vec<u8> {
5,616,900✔
232
        debug_assert!(!indexes.is_empty(), "Primary key indexes cannot be empty");
5,616,900✔
233

234
        let mut tot_size = 0_usize;
5,616,820✔
235
        let mut buffers = Vec::<Vec<u8>>::with_capacity(indexes.len());
5,616,820✔
236
        for i in indexes {
11,233,830✔
237
            let bytes = self.values[*i].encode();
5,617,010✔
238
            tot_size += bytes.len();
5,617,010✔
239
            buffers.push(bytes);
5,617,010✔
240
        }
5,617,010✔
241

242
        let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
5,616,820✔
243
        for i in buffers {
11,233,510✔
244
            res_buffer.extend(i);
5,616,690✔
245
        }
5,616,690✔
246
        res_buffer
5,616,820✔
247
    }
5,616,820✔
248

249
    pub fn get_values_hash(&self) -> u64 {
56,730✔
250
        let mut hasher = AHasher::default();
56,730✔
251

252
        for (index, field) in self.values.iter().enumerate() {
56,730✔
253
            hasher.write_i32(index as i32);
56,730✔
254
            match field {
56,730✔
255
                Field::UInt(i) => {
×
256
                    hasher.write_u8(1);
×
257
                    hasher.write_u64(*i);
×
258
                }
×
259
                Field::Int(i) => {
56,730✔
260
                    hasher.write_u8(2);
56,730✔
261
                    hasher.write_i64(*i);
56,730✔
262
                }
56,730✔
263
                Field::Float(f) => {
×
264
                    hasher.write_u8(3);
×
265
                    hasher.write(&((*f).to_ne_bytes()));
×
266
                }
×
267
                Field::Boolean(b) => {
×
268
                    hasher.write_u8(4);
×
269
                    hasher.write_u8(if *b { 1_u8 } else { 0_u8 });
×
270
                }
271
                Field::String(s) => {
×
272
                    hasher.write_u8(5);
×
273
                    hasher.write(s.as_str().as_bytes());
×
274
                }
×
275
                Field::Text(t) => {
×
276
                    hasher.write_u8(6);
×
277
                    hasher.write(t.as_str().as_bytes());
×
278
                }
×
279
                Field::Binary(b) => {
×
280
                    hasher.write_u8(7);
×
281
                    hasher.write(b.as_ref());
×
282
                }
×
283
                Field::Decimal(d) => {
×
284
                    hasher.write_u8(8);
×
285
                    hasher.write(&d.serialize());
×
286
                }
×
287
                Field::Timestamp(t) => {
×
288
                    hasher.write_u8(9);
×
289
                    hasher.write_i64(t.timestamp())
×
290
                }
291
                Field::Date(d) => {
×
292
                    hasher.write_u8(10);
×
293
                    hasher.write(d.to_string().as_bytes());
×
294
                }
×
295
                Field::Bson(b) => {
×
296
                    hasher.write_u8(11);
×
297
                    hasher.write(b.as_ref());
×
298
                }
×
299
                Field::Point(p) => {
×
300
                    hasher.write_u8(12);
×
301
                    hasher.write(p.to_bytes().as_slice());
×
302
                }
×
303
                Field::Null => {
×
304
                    hasher.write_u8(0);
×
305
                }
×
306
            }
307
        }
308
        hasher.finish()
56,730✔
309
    }
56,730✔
310
}
311

312
impl Display for Record {
313
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
314
        let v = self
×
315
            .values
×
316
            .iter()
×
317
            .map(|f| Cell::new(&f.to_string().unwrap_or("".to_string())))
×
318
            .collect::<Vec<Cell>>();
×
319

×
320
        let mut table = Table::new();
×
321
        table.add_row(Row::new(v));
×
322
        table.fmt(f)
×
323
    }
×
324
}
325

326
#[derive(Clone, Debug, PartialEq, Eq)]
129,999✔
327
pub enum Operation {
328
    Delete { old: Record },
329
    Insert { new: Record },
330
    Update { old: Record, new: Record },
331
}
332

333
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
63✔
334
pub struct DozerPoint(pub Point<OrderedFloat<f64>>);
×
335

×
336
impl GeodesicDistance<OrderedFloat<f64>> for DozerPoint {
337
    fn geodesic_distance(&self, rhs: &Self) -> OrderedFloat<f64> {
20✔
338
        let f = point! { x: self.0.x().0, y: self.0.y().0 };
20✔
339
        let t = point! { x: rhs.0.x().0, y: rhs.0.y().0 };
20✔
340
        OrderedFloat(f.geodesic_distance(&t))
20✔
341
    }
20✔
342
}
343

344
impl Ord for DozerPoint {
345
    fn cmp(&self, other: &Self) -> Ordering {
×
346
        if self.0.x() == other.0.x() && self.0.y() == other.0.y() {
×
347
            Ordering::Equal
×
348
        } else if self.0.x() > other.0.x()
×
349
            || (self.0.x() == other.0.x() && self.0.y() > other.0.y())
×
350
        {
×
351
            Ordering::Greater
×
352
        } else {
×
353
            Ordering::Less
×
354
        }
×
355
    }
×
356
}
357

358
impl PartialOrd for DozerPoint {
×
359
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
×
360
        Some(self.cmp(other))
×
361
    }
×
362
}
×
363

364
impl FromStr for DozerPoint {
×
365
    type Err = TypeError;
366

×
367
    fn from_str(str: &str) -> Result<Self, Self::Err> {
13✔
368
        let error = || InvalidFieldValue {
13✔
369
            field_type: FieldType::Point,
2✔
370
            nullable: false,
2✔
371
            value: str.to_string(),
2✔
372
        };
2✔
373

×
374
        let s = str.replace('(', "");
13✔
375
        let s = s.replace(')', "");
13✔
376
        let mut cs = s.split(',');
13✔
377
        let x = cs
13✔
378
            .next()
13✔
379
            .ok_or_else(error)?
13✔
380
            .parse::<f64>()
13✔
381
            .map_err(|_| error())?;
13✔
382
        let y = cs
11✔
383
            .next()
11✔
384
            .ok_or_else(error)?
11✔
385
            .parse::<f64>()
11✔
386
            .map_err(|_| error())?;
11✔
387
        Ok(Self(Point::from((OrderedFloat(x), OrderedFloat(y)))))
11✔
388
    }
13✔
389
}
×
390

×
391
impl From<(f64, f64)> for DozerPoint {
×
392
    fn from((x, y): (f64, f64)) -> Self {
70✔
393
        Self(point! {x: OrderedFloat(x), y: OrderedFloat(y)})
70✔
394
    }
70✔
395
}
×
396

397
impl Display for DozerPoint {
398
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
399
        f.write_str(&format!("{:?}", self.0.x_y()))
×
400
    }
×
401
}
×
402

403
impl DozerPoint {
404
    pub fn to_bytes(&self) -> [u8; 16] {
×
405
        let mut result = [0_u8; 16];
×
406
        result[0..8].copy_from_slice(&self.0.x().to_be_bytes());
×
407
        result[8..16].copy_from_slice(&self.0.y().to_be_bytes());
×
408
        result
×
409
    }
×
410

411
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, TryFromSliceError> {
×
412
        let x = f64::from_be_bytes(bytes[0..8].try_into()?);
×
413
        let y = f64::from_be_bytes(bytes[8..16].try_into()?);
×
414

×
415
        Ok(DozerPoint::from((x, y)))
×
416
    }
×
417
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc