• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.64
/dozer-core/src/dag/executor_utils.rs
1
#![allow(clippy::type_complexity)]
2
use crate::dag::dag::{Dag, Edge, Endpoint};
3
use crate::dag::dag_metadata::METADATA_DB_NAME;
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::executor::ExecutorOperation;
6
use crate::dag::node::{NodeHandle, OutputPortDef, OutputPortType, PortHandle};
7
use crate::dag::record_store::{
8
    AutogenRowKeyLookupRecordReader, PrimaryKeyValueLookupRecordReader, RecordReader,
9
};
10
use crate::storage::common::Database;
11
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
12
use crossbeam::channel::{bounded, Receiver, Select, Sender};
13
use std::collections::hash_map::Entry;
14
use std::collections::HashMap;
15
use std::path::Path;
16

17
use super::hash_map_to_vec::insert_vec_element;
18

19
pub(crate) struct StorageMetadata {
20
    pub env: LmdbEnvironmentManager,
21
    pub meta_db: Database,
22
}
23

24
impl StorageMetadata {
×
25
    pub fn new(env: LmdbEnvironmentManager, meta_db: Database) -> Self {
676✔
26
        Self { env, meta_db }
676✔
27
    }
676✔
28
}
29

×
30
pub(crate) fn init_component<F>(
337✔
31
    node_handle: &NodeHandle,
337✔
32
    base_path: &Path,
337✔
33
    mut init_f: F,
337✔
34
) -> Result<StorageMetadata, ExecutionError>
337✔
35
where
337✔
36
    F: FnMut(&mut LmdbEnvironmentManager) -> Result<(), ExecutionError>,
337✔
37
{
337✔
38
    let mut env = LmdbEnvironmentManager::create(base_path, format!("{node_handle}").as_str())?;
337✔
39
    let db = env.open_database(METADATA_DB_NAME, false)?;
334✔
40
    init_f(&mut env)?;
334✔
41
    Ok(StorageMetadata::new(env, db))
334✔
42
}
337✔
43
#[inline]
×
44
pub(crate) fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
2,586✔
45
    let mut sel = Select::new();
2,586✔
46
    for r in receivers {
5,495✔
47
        sel.recv(r);
2,909✔
48
    }
2,909✔
49
    sel
2,586✔
50
}
2,586✔
51

×
52
pub(crate) fn index_edges<T: Clone>(
57✔
53
    dag: &Dag<T>,
57✔
54
    channel_buf_sz: usize,
57✔
55
) -> (
57✔
56
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>>,
57✔
57
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>>,
57✔
58
) {
57✔
59
    let mut senders = HashMap::new();
57✔
60
    let mut receivers = HashMap::new();
57✔
61

×
62
    for edge in dag.edges() {
193✔
63
        let (tx, rx) = bounded(channel_buf_sz);
193✔
64
        // let (tx, rx) = match dag.nodes.get(&edge.from.node).unwrap() {
193✔
65
        //     NodeType::Source(_) => bounded(1),
193✔
66
        //     _ => bounded(channel_buf_sz),
193✔
67
        // };
193✔
68

193✔
69
        insert_sender_or_receiver(&mut senders, edge.from.clone(), tx);
193✔
70
        insert_sender_or_receiver(&mut receivers, edge.to.clone(), rx);
193✔
71
    }
193✔
72

73
    (senders, receivers)
57✔
74
}
57✔
75

×
76
fn insert_sender_or_receiver<T>(
386✔
77
    map: &mut HashMap<NodeHandle, HashMap<PortHandle, Vec<T>>>,
386✔
78
    endpoint: Endpoint,
386✔
79
    value: T,
386✔
80
) {
386✔
81
    match map.entry(endpoint.node) {
386✔
82
        Entry::Occupied(mut entry) => {
34✔
83
            insert_vec_element(entry.get_mut(), endpoint.port, value);
34✔
84
        }
34✔
85
        Entry::Vacant(entry) => {
352✔
86
            let mut port_map = HashMap::new();
352✔
87
            port_map.insert(endpoint.port, vec![value]);
352✔
88
            entry.insert(port_map);
352✔
89
        }
352✔
90
    }
×
91
}
386✔
92

93
pub(crate) fn build_receivers_lists(
511✔
94
    receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
511✔
95
) -> (Vec<PortHandle>, Vec<Receiver<ExecutorOperation>>) {
511✔
96
    let mut handles_ls: Vec<PortHandle> = Vec::new();
511✔
97
    let mut receivers_ls: Vec<Receiver<ExecutorOperation>> = Vec::new();
511✔
98
    for e in receivers {
1,066✔
99
        for r in e.1 {
1,110✔
100
            receivers_ls.push(r);
555✔
101
            handles_ls.push(e.0);
555✔
102
        }
555✔
103
    }
×
104
    (handles_ls, receivers_ls)
511✔
105
}
511✔
106

×
107
fn get_inputs_for_output(edges: &[Edge], node: &NodeHandle, port: &PortHandle) -> Vec<Endpoint> {
204✔
108
    edges
204✔
109
        .iter()
204✔
110
        .filter(|e| e.from.node == *node && e.from.port == *port)
815✔
111
        .map(|e| e.to.clone())
208✔
112
        .collect()
204✔
113
}
204✔
114

×
115
const PORT_STATE_KEY: &str = "__PORT_STATE_";
×
116

×
117
#[derive(Debug)]
×
118
pub(crate) struct StateOptions {
×
119
    pub(crate) db: Database,
×
120
    pub(crate) meta_db: Database,
×
121
    pub(crate) typ: OutputPortType,
×
122
}
×
123

×
124
pub(crate) fn create_ports_databases_and_fill_downstream_record_readers(
517✔
125
    handle: &NodeHandle,
517✔
126
    edges: &[Edge],
517✔
127
    mut env: LmdbEnvironmentManager,
517✔
128
    output_ports: &[OutputPortDef],
517✔
129
    record_stores: &mut HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>,
517✔
130
) -> Result<(SharedTransaction, HashMap<PortHandle, StateOptions>), ExecutionError> {
517✔
131
    let mut port_databases: Vec<Option<StateOptions>> = Vec::new();
517✔
132
    for port in output_ports {
1,092✔
133
        let opt = match &port.typ {
575✔
134
            OutputPortType::Stateless => None,
371✔
135
            typ => {
204✔
136
                let db =
204✔
137
                    env.open_database(&format!("{}_{}", PORT_STATE_KEY, port.handle), false)?;
204✔
138
                let meta_db =
204✔
139
                    env.open_database(&format!("{}_{}_META", PORT_STATE_KEY, port.handle), false)?;
204✔
140
                Some(StateOptions {
204✔
141
                    db,
204✔
142
                    meta_db,
204✔
143
                    typ: typ.clone(),
204✔
144
                })
204✔
145
            }
146
        };
×
147
        port_databases.push(opt);
575✔
148
    }
149

×
150
    let master_tx = env.create_txn()?;
517✔
151

×
152
    for (state_options, port) in port_databases.iter().zip(output_ports.iter()) {
575✔
153
        if let Some(state_options) = state_options {
575✔
154
            for endpoint in get_inputs_for_output(edges, handle, &port.handle) {
208✔
155
                let record_reader: Box<dyn RecordReader> = match port.typ {
184✔
156
                    OutputPortType::AutogenRowKeyLookup => Box::new(
1✔
157
                        AutogenRowKeyLookupRecordReader::new(master_tx.clone(), state_options.db),
1✔
158
                    ),
1✔
159
                    OutputPortType::StatefulWithPrimaryKeyLookup { .. } => Box::new(
183✔
160
                        PrimaryKeyValueLookupRecordReader::new(master_tx.clone(), state_options.db),
183✔
161
                    ),
183✔
162
                    OutputPortType::Stateless => panic!("Internal error: Invalid port type"),
×
163
                };
×
164

×
165
                record_stores
184✔
166
                    .get_mut(&endpoint.node)
184✔
167
                    .expect("Record store HashMap must be created for every node upfront")
184✔
168
                    .insert(endpoint.port, record_reader);
184✔
169
            }
×
170
        }
371✔
171
    }
172

173
    let port_databases = output_ports
517✔
174
        .iter()
517✔
175
        .zip(port_databases.into_iter())
517✔
176
        .flat_map(|(output_port, state_option)| {
575✔
177
            state_option.map(|state_option| (output_port.handle, state_option))
575✔
178
        })
575✔
179
        .collect();
517✔
180

517✔
181
    Ok((master_tx, port_databases))
517✔
182
}
517✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc