• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4309615408

pending completion
4309615408

push

github

GitHub
chore: Remove an unused `Arc<RwLock>` (#1106)

6 of 6 new or added lines in 2 files covered. (100.0%)

28466 of 40109 relevant lines covered (70.97%)

50957.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.11
/dozer-core/src/executor/source_node.rs
1
use std::{
2
    sync::{
3
        atomic::{AtomicBool, Ordering},
4
        Arc,
5
    },
6
    time::Duration,
7
};
8

9
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
10
use dozer_types::ingestion_types::IngestionMessage;
11
use dozer_types::{
12
    log::debug,
13
    node::{NodeHandle, OpIdentifier},
14
};
15

16
use crate::{
17
    builder_dag::NodeKind,
18
    channels::SourceChannelForwarder,
19
    errors::ExecutionError,
20
    forwarder::{SourceChannelManager, StateWriter},
21
    node::{PortHandle, Source},
22
};
23

24
use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
25

26
impl SourceChannelForwarder for InternalChannelSourceForwarder {
27
    fn send(&mut self, message: IngestionMessage, port: PortHandle) -> Result<(), ExecutionError> {
4,498,269✔
28
        Ok(self.sender.send((port, message))?)
4,498,269✔
29
    }
4,498,269✔
30
}
×
31

×
32
/// The sender half of a source in the execution DAG.
×
33
#[derive(Debug)]
×
34
pub struct SourceSenderNode {
×
35
    /// Node handle in description DAG.
×
36
    node_handle: NodeHandle,
37
    /// The source.
38
    source: Box<dyn Source>,
39
    /// Last checkpointed output data sequence number.
×
40
    last_checkpoint: Option<OpIdentifier>,
41
    /// The forwarder that will be passed to the source for outputting data.
42
    forwarder: InternalChannelSourceForwarder,
43
}
44

45
impl SourceSenderNode {
46
    pub fn handle(&self) -> &NodeHandle {
342✔
47
        &self.node_handle
342✔
48
    }
342✔
49
}
50

51
impl Node for SourceSenderNode {
52
    fn run(mut self) -> Result<(), ExecutionError> {
342✔
53
        let result = self.source.start(
342✔
54
            &mut self.forwarder,
342✔
55
            self.last_checkpoint
342✔
56
                .map(|op_id| (op_id.txid, op_id.seq_in_tx)),
342✔
57
        );
342✔
58
        debug!("[{}-sender] Quit", self.node_handle);
342✔
59
        result
336✔
60
    }
336✔
61
}
×
62

×
63
/// The listener part of a source in the execution DAG.
×
64
#[derive(Debug)]
×
65
pub struct SourceListenerNode {
×
66
    /// Node handle in description DAG.
×
67
    node_handle: NodeHandle,
68
    /// Output from corresponding source sender.
69
    receiver: Receiver<(PortHandle, IngestionMessage)>,
70
    /// Receiving timeout.
×
71
    timeout: Duration,
72
    /// If the execution DAG should be running. Used for determining if a `terminate` message should be sent.
73
    running: Arc<AtomicBool>,
74
    /// This node's output channel manager, for communicating to other sources to coordinate terminate and commit, forwarding data, writing metadata and writing port state.
75
    channel_manager: SourceChannelManager,
76
}
77

78
#[derive(Debug, Clone, PartialEq)]
4,378,743✔
79
enum DataKind {
80
    Data((PortHandle, IngestionMessage)),
81
    NoDataBecauseOfTimeout,
82
    NoDataBecauseOfChannelDisconnection,
83
}
84

×
85
impl SourceListenerNode {
86
    /// Returns if the node should terminate.
87
    fn send_and_trigger_commit_if_needed(
4,382,609✔
88
        &mut self,
4,382,609✔
89
        data: DataKind,
4,382,609✔
90
    ) -> Result<bool, ExecutionError> {
4,382,609✔
91
        // If termination was requested the or source quit, we try to terminate.
92
        let terminating = data == DataKind::NoDataBecauseOfChannelDisconnection
4,382,609✔
93
            || !self.running.load(Ordering::SeqCst);
4,382,256✔
94
        // If this commit was not requested with termination at the start, we shouldn't terminate either.
×
95
        let terminating = match data {
4,382,609✔
96
            DataKind::Data((port, message)) => self
4,379,370✔
97
                .channel_manager
4,379,370✔
98
                .send_and_trigger_commit_if_needed(message, port, terminating)?,
4,379,370✔
99
            DataKind::NoDataBecauseOfTimeout | DataKind::NoDataBecauseOfChannelDisconnection => {
×
100
                self.channel_manager.trigger_commit_if_needed(terminating)?
3,239✔
101
            }
×
102
        };
×
103
        if terminating {
4,382,604✔
104
            self.channel_manager.terminate()?;
6,719✔
105
            debug!("[{}-listener] Quitting", &self.node_handle);
6,719✔
106
        }
4,375,885✔
107
        Ok(terminating)
4,376,222✔
108
    }
4,376,227✔
109
}
×
110

×
111
impl Node for SourceListenerNode {
×
112
    fn run(mut self) -> Result<(), ExecutionError> {
342✔
113
        loop {
×
114
            let terminating = match self.receiver.recv_timeout(self.timeout) {
4,381,962✔
115
                Ok(data) => self.send_and_trigger_commit_if_needed(DataKind::Data(data))?,
4,378,723✔
116
                Err(RecvTimeoutError::Timeout) => {
117
                    self.send_and_trigger_commit_if_needed(DataKind::NoDataBecauseOfTimeout)?
2,886✔
118
                }
×
119
                Err(RecvTimeoutError::Disconnected) => self.send_and_trigger_commit_if_needed(
353✔
120
                    DataKind::NoDataBecauseOfChannelDisconnection,
353✔
121
                )?,
353✔
122
            };
123
            if terminating {
4,381,957✔
124
                return Ok(());
337✔
125
            }
4,381,620✔
126
        }
×
127
    }
342✔
128
}
129

×
130
#[derive(Debug)]
×
131
struct InternalChannelSourceForwarder {
×
132
    sender: Sender<(PortHandle, IngestionMessage)>,
133
}
×
134

135
impl InternalChannelSourceForwarder {
136
    pub fn new(sender: Sender<(PortHandle, IngestionMessage)>) -> Self {
342✔
137
        Self { sender }
342✔
138
    }
342✔
139
}
140

141
pub fn create_source_nodes(
342✔
142
    dag: &mut ExecutionDag,
342✔
143
    node_index: daggy::NodeIndex,
342✔
144
    options: &ExecutorOptions,
342✔
145
    running: Arc<AtomicBool>,
342✔
146
) -> (SourceSenderNode, SourceListenerNode) {
342✔
147
    // Get the source node.
342✔
148
    let node = dag.node_weight_mut(node_index);
342✔
149
    let node_handle = node.handle.clone();
342✔
150
    let node_storage = node.storage.clone();
342✔
151
    let Some(NodeKind::Source(source, last_checkpoint)) = node.kind.take() else {
342✔
152
        panic!("Must pass in a source node");
×
153
    };
×
154

×
155
    // Create channel between source sender and source listener.
×
156
    let (source_sender, source_receiver) = bounded(options.channel_buffer_sz);
342✔
157
    // let (source_sender, source_receiver) = bounded(1);
342✔
158

342✔
159
    // Create source listener.
342✔
160
    let forwarder = InternalChannelSourceForwarder::new(source_sender);
342✔
161
    let source_sender_node = SourceSenderNode {
342✔
162
        node_handle: node_handle.clone(),
342✔
163
        source,
342✔
164
        last_checkpoint,
342✔
165
        forwarder,
342✔
166
    };
342✔
167

342✔
168
    // Create source sender node.
342✔
169
    let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
342✔
170
    let state_writer = StateWriter::new(
342✔
171
        node_storage.meta_db,
342✔
172
        record_writers,
342✔
173
        node_storage.master_txn,
342✔
174
    );
342✔
175
    let channel_manager = SourceChannelManager::new(
342✔
176
        node_handle.clone(),
342✔
177
        senders,
342✔
178
        state_writer,
342✔
179
        true,
342✔
180
        options.commit_sz,
342✔
181
        options.commit_time_threshold,
342✔
182
        dag.epoch_manager().clone(),
342✔
183
    );
342✔
184
    let source_listener_node = SourceListenerNode {
342✔
185
        node_handle,
342✔
186
        receiver: source_receiver,
342✔
187
        timeout: options.commit_time_threshold,
342✔
188
        running,
342✔
189
        channel_manager,
342✔
190
    };
342✔
191

342✔
192
    (source_sender_node, source_listener_node)
342✔
193
}
342✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc