• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
/dozer-core/src/dag/forwarder.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::dag::channels::ProcessorChannelForwarder;
3
use crate::dag::dag_metadata::SOURCE_ID_IDENTIFIER;
4
use crate::dag::epoch::{Epoch, EpochManager};
5
use crate::dag::errors::ExecutionError;
6
use crate::dag::errors::ExecutionError::{InternalError, InvalidPortHandle};
7
use crate::dag::executor::ExecutorOperation;
8
use crate::dag::executor_utils::StateOptions;
9
use crate::dag::node::{NodeHandle, PortHandle};
10
use crate::dag::record_store::{RecordWriter, RecordWriterUtils};
11
use crate::storage::common::Database;
12

13
use crate::storage::lmdb_storage::SharedTransaction;
14
use crossbeam::channel::Sender;
15
use dozer_types::internal_err;
16
use dozer_types::log::debug;
17
use dozer_types::types::{Operation, Schema};
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
use std::time::{Duration, Instant};
21

22
#[derive(Debug)]
×
23
pub(crate) struct StateWriter {
24
    meta_db: Database,
25
    record_writers: HashMap<PortHandle, Box<dyn RecordWriter>>,
26
    tx: SharedTransaction,
27
}
28

29
impl StateWriter {
30
    pub fn new(
796✔
31
        meta_db: Database,
796✔
32
        dbs: HashMap<PortHandle, StateOptions>,
796✔
33
        tx: SharedTransaction,
796✔
34
        output_schemas: HashMap<PortHandle, Schema>,
796✔
35
        retention_queue_size: usize,
796✔
36
    ) -> Result<Self, ExecutionError> {
796✔
37
        let mut record_writers = HashMap::<PortHandle, Box<dyn RecordWriter>>::new();
796✔
38
        for (port, options) in dbs {
992✔
39
            let schema = output_schemas
196✔
40
                .get(&port)
196✔
41
                .ok_or(ExecutionError::InvalidPortHandle(port))?
196✔
42
                .clone();
196✔
43

44
            let writer = RecordWriterUtils::create_writer(
196✔
45
                options.typ,
196✔
46
                options.db,
196✔
47
                options.meta_db,
196✔
48
                schema,
196✔
49
                retention_queue_size,
196✔
50
            )?;
196✔
51
            record_writers.insert(port, writer);
196✔
52
        }
53

54
        Ok(Self {
796✔
55
            meta_db,
796✔
56
            record_writers,
796✔
57
            tx,
796✔
58
        })
796✔
59
    }
796✔
60

61
    fn store_op(&mut self, op: Operation, port: &PortHandle) -> Result<Operation, ExecutionError> {
62
        if let Some(writer) = self.record_writers.get_mut(port) {
9,120,774✔
63
            writer.write(op, &self.tx)
575,451✔
64
        } else {
65
            Ok(op)
8,545,323✔
66
        }
67
    }
9,120,774✔
68

69
    pub fn store_commit_info(&mut self, epoch_details: &Epoch) -> Result<(), ExecutionError> {
14,295✔
70
        //
71
        for (source, (txid, seq_in_tx)) in &epoch_details.details {
37,515✔
72
            let mut full_key = vec![SOURCE_ID_IDENTIFIER];
23,220✔
73
            full_key.extend(source.to_bytes());
23,220✔
74

23,220✔
75
            let mut value: Vec<u8> = Vec::with_capacity(16);
23,220✔
76
            value.extend(txid.to_be_bytes());
23,220✔
77
            value.extend(seq_in_tx.to_be_bytes());
23,220✔
78

23,220✔
79
            self.tx
23,220✔
80
                .write()
23,220✔
81
                .put(self.meta_db, full_key.as_slice(), value.as_slice())?;
23,220✔
82
        }
83
        for record_writer in self.record_writers.values() {
14,295✔
84
            record_writer.commit()?;
2,869✔
85
        }
86
        self.tx.write().commit_and_renew()?;
14,295✔
87
        Ok(())
14,294✔
88
    }
14,294✔
89
}
90

91
#[derive(Debug)]
×
92
struct ChannelManager {
93
    owner: NodeHandle,
94
    senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
95
    state_writer: StateWriter,
96
    stateful: bool,
97
}
98

99
impl ChannelManager {
100
    #[inline]
101
    fn send_op(&mut self, mut op: Operation, port_id: PortHandle) -> Result<(), ExecutionError> {
9,122,647✔
102
        if self.stateful {
9,122,647✔
103
            op = self.state_writer.store_op(op, &port_id)?;
9,115,369✔
104
        }
7,278✔
105

106
        let senders = self
9,122,647✔
107
            .senders
9,122,647✔
108
            .get(&port_id)
9,122,647✔
109
            .ok_or(InvalidPortHandle(port_id))?;
9,122,647✔
110

111
        let exec_op = match op {
9,122,647✔
112
            Operation::Insert { new } => ExecutorOperation::Insert { new },
8,960,549✔
113
            Operation::Update { old, new } => ExecutorOperation::Update { old, new },
100,824✔
114
            Operation::Delete { old } => ExecutorOperation::Delete { old },
61,274✔
115
        };
116

117
        if let Some((last_sender, senders)) = senders.split_last() {
9,122,647✔
118
            for sender in senders {
9,206,987✔
119
                internal_err!(sender.send(exec_op.clone()))?;
84,340✔
120
            }
121
            internal_err!(last_sender.send(exec_op))?;
9,128,371✔
122
        }
×
123

124
        Ok(())
9,129,859✔
125
    }
9,129,869✔
126

127
    fn send_terminate(&self) -> Result<(), ExecutionError> {
603✔
128
        for senders in self.senders.values() {
617✔
129
            for sender in senders {
1,238✔
130
                internal_err!(sender.send(ExecutorOperation::Terminate))?;
621✔
131
            }
132
        }
133

134
        Ok(())
603✔
135
    }
603✔
136

137
    fn store_and_send_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
10,407✔
138
        debug!("[{}] Checkpointing - {}", self.owner, &epoch);
10,407✔
139
        self.state_writer.store_commit_info(epoch)?;
10,407✔
140

141
        for senders in &self.senders {
21,104✔
142
            for sender in senders.1 {
21,824✔
143
                internal_err!(sender.send(ExecutorOperation::Commit {
11,127✔
144
                    epoch: epoch.clone()
11,127✔
145
                }))?;
11,127✔
146
            }
147
        }
148

149
        Ok(())
10,407✔
150
    }
10,407✔
151
    fn new(
621✔
152
        owner: NodeHandle,
621✔
153
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
621✔
154
        state_writer: StateWriter,
621✔
155
        stateful: bool,
621✔
156
    ) -> Self {
621✔
157
        Self {
621✔
158
            owner,
621✔
159
            senders,
621✔
160
            state_writer,
621✔
161
            stateful,
621✔
162
        }
621✔
163
    }
621✔
164
}
165

166
#[derive(Debug)]
×
167
pub(crate) struct SourceChannelManager {
168
    source_handle: NodeHandle,
169
    manager: ChannelManager,
170
    curr_txid: u64,
171
    curr_seq_in_tx: u64,
172
    commit_sz: u32,
173
    num_uncommited_ops: u32,
174
    max_duration_between_commits: Duration,
175
    last_commit_instant: Instant,
176
    epoch_manager: Arc<EpochManager>,
177
}
178

179
impl SourceChannelManager {
180
    pub fn new(
181✔
181
        owner: NodeHandle,
181✔
182
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
181✔
183
        state_writer: StateWriter,
181✔
184
        stateful: bool,
181✔
185
        commit_sz: u32,
181✔
186
        max_duration_between_commits: Duration,
181✔
187
        epoch_manager: Arc<EpochManager>,
181✔
188
        start_seq: (u64, u64),
181✔
189
    ) -> Self {
181✔
190
        Self {
181✔
191
            manager: ChannelManager::new(owner.clone(), senders, state_writer, stateful),
181✔
192
            curr_txid: start_seq.0,
181✔
193
            curr_seq_in_tx: start_seq.1,
181✔
194
            source_handle: owner,
181✔
195
            commit_sz,
181✔
196
            num_uncommited_ops: 0,
181✔
197
            max_duration_between_commits,
181✔
198
            last_commit_instant: Instant::now(),
181✔
199
            epoch_manager,
181✔
200
        }
181✔
201
    }
181✔
202

203
    fn should_commit(&self) -> bool {
3,677,703✔
204
        self.num_uncommited_ops >= self.commit_sz
3,677,703✔
205
            || self.last_commit_instant.elapsed() >= self.max_duration_between_commits
3,677,694✔
206
    }
3,677,703✔
207

208
    pub fn trigger_commit_if_needed(
3,677,754✔
209
        &mut self,
3,677,754✔
210
        request_termination: bool,
3,677,754✔
211
    ) -> Result<bool, ExecutionError> {
3,677,754✔
212
        if request_termination || self.should_commit() {
3,677,754✔
213
            let epoch = self.epoch_manager.wait_for_epoch_close(
4,584✔
214
                self.source_handle.clone(),
4,584✔
215
                (self.curr_txid, self.curr_seq_in_tx),
4,584✔
216
                request_termination,
4,584✔
217
            );
4,584✔
218
            self.manager
4,584✔
219
                .store_and_send_commit(&Epoch::new(epoch.id, epoch.details))?;
4,584✔
220
            self.num_uncommited_ops = 0;
4,584✔
221
            self.last_commit_instant = Instant::now();
4,584✔
222
            Ok(epoch.terminating)
4,584✔
223
        } else {
224
            Ok(false)
3,673,207✔
225
        }
226
    }
3,677,791✔
227

228
    pub fn send_and_trigger_commit_if_needed(
3,676,673✔
229
        &mut self,
3,676,673✔
230
        txid: u64,
3,676,673✔
231
        seq_in_tx: u64,
3,676,673✔
232
        op: Operation,
3,676,673✔
233
        port: PortHandle,
3,676,673✔
234
        request_termination: bool,
3,676,673✔
235
    ) -> Result<bool, ExecutionError> {
3,676,673✔
236
        //
3,676,673✔
237
        self.curr_txid = txid;
3,676,673✔
238
        self.curr_seq_in_tx = seq_in_tx;
3,676,673✔
239
        self.manager.send_op(op, port)?;
3,676,673✔
240
        self.num_uncommited_ops += 1;
3,676,666✔
241
        self.trigger_commit_if_needed(request_termination)
3,676,666✔
242
    }
3,676,673✔
243

244
    pub fn terminate(&mut self) -> Result<(), ExecutionError> {
172✔
245
        self.manager.send_terminate()
172✔
246
    }
172✔
247
}
248

249
#[derive(Debug)]
×
250
pub(crate) struct ProcessorChannelManager {
251
    manager: ChannelManager,
252
}
253

254
impl ProcessorChannelManager {
255
    pub fn new(
440✔
256
        owner: NodeHandle,
440✔
257
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
440✔
258
        state_writer: StateWriter,
440✔
259
        stateful: bool,
440✔
260
    ) -> Self {
440✔
261
        Self {
440✔
262
            manager: ChannelManager::new(owner, senders, state_writer, stateful),
440✔
263
        }
440✔
264
    }
440✔
265

266
    pub fn store_and_send_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
5,823✔
267
        self.manager.store_and_send_commit(epoch)
5,823✔
268
    }
5,823✔
269

270
    pub fn send_terminate(&self) -> Result<(), ExecutionError> {
431✔
271
        self.manager.send_terminate()
431✔
272
    }
431✔
273
}
274

275
impl ProcessorChannelForwarder for ProcessorChannelManager {
276
    fn send(&mut self, op: Operation, port: PortHandle) -> Result<(), ExecutionError> {
5,452,519✔
277
        self.manager.send_op(op, port)
5,452,519✔
278
    }
5,452,519✔
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc