• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.12
/dozer-core/src/dag/epoch.rs
1
use crate::dag::node::NodeHandle;
2
use dozer_types::parking_lot::Mutex;
3
use std::collections::HashMap;
4
use std::fmt::{Display, Formatter};
5
use std::sync::{Arc, Barrier};
6
use std::thread::sleep;
7
use std::time::Duration;
8

9
#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
3,984✔
10
pub struct OpIdentifier {
11
    pub txid: u64,
12
    pub seq_in_tx: u64,
13
}
14

15
impl OpIdentifier {
16
    pub fn new(txid: u64, seq_in_tx: u64) -> Self {
1,510✔
17
        Self { txid, seq_in_tx }
1,510✔
18
    }
1,510✔
19
}
20

21
pub type SourceStates = HashMap<NodeHandle, OpIdentifier>;
22

23
#[derive(Clone, Debug, PartialEq, Eq)]
3,664✔
24
pub struct Epoch {
25
    pub id: u64,
26
    pub details: SourceStates,
27
}
28

29
impl Epoch {
30
    pub fn new(id: u64, details: SourceStates) -> Self {
3,735✔
31
        Self { id, details }
3,735✔
32
    }
3,735✔
33

34
    pub fn from(id: u64, node_handle: NodeHandle, txid: u64, seq_in_tx: u64) -> Self {
1,497✔
35
        Self {
1,497✔
36
            id,
1,497✔
37
            details: [(node_handle, OpIdentifier::new(txid, seq_in_tx))]
1,497✔
38
                .into_iter()
1,497✔
39
                .collect(),
1,497✔
40
        }
1,497✔
41
    }
1,497✔
42
}
43

44
impl Display for Epoch {
45
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
46
        let details_str = self
×
47
            .details
×
48
            .iter()
×
49
            .map(|e| format!("{} -> {}:{}", e.0, e.1.txid, e.1.seq_in_tx))
×
50
            .fold(String::new(), |a, b| a + ", " + b.as_str());
×
51
        f.write_str(format!("epoch: {}, details: {}", self.id, details_str).as_str())
×
52
    }
×
53
}
54

55
#[derive(Debug)]
×
56
enum EpochManagerState {
57
    Closing {
58
        /// Current epoch id.
59
        epoch_id: u64,
60
        /// Whether we should tell the sources to terminate when this epoch closes.
61
        should_terminate: bool,
62
        /// Whether we should tell the sources to commit when this epoch closes.
63
        should_commit: bool,
64
        /// Sources wait on this barrier to synchronize an epoch close.
65
        barrier: Arc<Barrier>,
66
    },
67
    Closed {
68
        /// Whether sources should terminate.
69
        terminating: bool,
70
        /// Whether sources should commit.
71
        committing: bool,
72
        /// Closed epoch id.
73
        epoch_id: u64,
74
        /// Number of sources that have confirmed the epoch close.
75
        num_source_confirmations: usize,
76
    },
77
}
78

79
#[derive(Debug)]
×
80
pub(crate) struct EpochManager {
81
    num_sources: usize,
82
    state: Mutex<EpochManagerState>,
83
}
84

85
impl EpochManager {
86
    pub fn new(num_sources: usize) -> Self {
160✔
87
        debug_assert!(num_sources > 0);
160✔
88
        Self {
160✔
89
            num_sources,
160✔
90
            state: Mutex::new(EpochManagerState::Closing {
160✔
91
                epoch_id: 0,
160✔
92
                should_terminate: true,
160✔
93
                should_commit: false,
160✔
94
                barrier: Arc::new(Barrier::new(num_sources)),
160✔
95
            }),
160✔
96
        }
160✔
97
    }
160✔
98

99
    /// Waits for the epoch to close until all sources do so.
100
    ///
101
    /// Returns whether the participant should terminate and the epoch id if the source should commit.
102
    ///
103
    /// # Arguments
104
    ///
105
    /// - `request_termination`: Whether the source wants to terminate. The `EpochManager` checks if all sources want to terminate and returns `true` if so.
106
    /// - `request_commit`: Whether the source wants to commit. The `EpochManager` checks if any source wants to commit and returns `true` if so.
107
    pub fn wait_for_epoch_close(
2,837✔
108
        &self,
2,837✔
109
        request_termination: bool,
2,837✔
110
        request_commit: bool,
2,837✔
111
    ) -> (bool, Option<u64>) {
2,837✔
112
        let barrier = loop {
2,837✔
113
            let mut state = self.state.lock();
2,837✔
114
            match &mut *state {
2,837✔
115
                EpochManagerState::Closing {
116
                    should_terminate,
2,837✔
117
                    should_commit,
2,837✔
118
                    barrier,
2,837✔
119
                    ..
2,837✔
120
                } => {
2,837✔
121
                    // If anyone doesn't want to terminate, we don't terminate.
2,837✔
122
                    *should_terminate = *should_terminate && request_termination;
2,837✔
123
                    // If anyone wants to commit, we commit.
124
                    *should_commit = *should_commit || request_commit;
2,837✔
125
                    break barrier.clone();
2,837✔
126
                }
127
                EpochManagerState::Closed { .. } => {
×
128
                    // This thread wants to close a new epoch while some other thread hasn't got confirmation of last epoch closing.
×
129
                    // Just release the lock and put this thread to sleep.
×
130
                    drop(state);
×
131
                    sleep(Duration::from_millis(1));
×
132
                }
×
133
            }
134
        };
135

136
        barrier.wait();
2,837✔
137

2,837✔
138
        let mut state = self.state.lock();
2,837✔
139
        if let EpochManagerState::Closing {
140
            epoch_id,
2,460✔
141
            should_terminate,
2,460✔
142
            should_commit,
2,460✔
143
            ..
144
        } = &mut *state
2,837✔
145
        {
2,460✔
146
            *state = EpochManagerState::Closed {
2,460✔
147
                terminating: *should_terminate,
2,460✔
148
                committing: *should_commit,
2,460✔
149
                epoch_id: *epoch_id,
2,460✔
150
                num_source_confirmations: 0,
2,460✔
151
            };
2,460✔
152
        }
2,460✔
153

154
        match &mut *state {
2,837✔
155
            EpochManagerState::Closed {
156
                terminating,
2,837✔
157
                committing,
2,837✔
158
                epoch_id,
2,837✔
159
                num_source_confirmations,
2,837✔
160
            } => {
161
                let result = (
2,837✔
162
                    *terminating,
2,837✔
163
                    if *committing { Some(*epoch_id) } else { None },
2,837✔
164
                );
165

166
                *num_source_confirmations += 1;
2,837✔
167
                if *num_source_confirmations == self.num_sources {
2,837✔
168
                    // This thread is the last one in this critical area.
2,460✔
169
                    *state = EpochManagerState::Closing {
2,460✔
170
                        epoch_id: if *committing {
2,460✔
171
                            *epoch_id + 1
1,173✔
172
                        } else {
173
                            *epoch_id
1,287✔
174
                        },
175
                        should_terminate: true,
176
                        should_commit: false,
177
                        barrier: Arc::new(Barrier::new(self.num_sources)),
2,460✔
178
                    };
179
                }
377✔
180

181
                result
2,837✔
182
            }
183
            EpochManagerState::Closing { .. } => {
184
                unreachable!("We just modified `EpochManagerState` to `Closed`")
×
185
            }
186
        }
187
    }
2,837✔
188
}
189

190
#[cfg(test)]
191
mod tests {
192
    use std::thread::scope;
193

194
    use super::*;
195

196
    const NUM_THREADS: u16 = 10;
197

198
    fn run_epoch_manager(
4✔
199
        termination_gen: &(impl Fn(u16) -> bool + Sync),
4✔
200
        commit_gen: &(impl Fn(u16) -> bool + Sync),
4✔
201
    ) -> (bool, Option<u64>) {
4✔
202
        let epoch_manager = EpochManager::new(NUM_THREADS as usize);
4✔
203
        let epoch_manager = &epoch_manager;
4✔
204
        scope(|scope| {
4✔
205
            let handles = (0..NUM_THREADS)
4✔
206
                .map(|index| {
40✔
207
                    scope.spawn(move || {
40✔
208
                        epoch_manager
40✔
209
                            .wait_for_epoch_close(termination_gen(index), commit_gen(index))
40✔
210
                    })
40✔
211
                })
40✔
212
                .collect::<Vec<_>>();
4✔
213
            let results = handles
4✔
214
                .into_iter()
4✔
215
                .map(|handle| handle.join().unwrap())
40✔
216
                .collect::<Vec<_>>();
4✔
217
            for result in &results {
44✔
218
                assert_eq!(result, results.first().unwrap());
40✔
219
            }
220
            results.into_iter().next().unwrap()
4✔
221
        })
4✔
222
    }
4✔
223

224
    #[test]
1✔
225
    fn test_epoch_manager() {
1✔
226
        // All sources have no new data, epoch should not be closed.
1✔
227
        let (_, epoch) = run_epoch_manager(&|_| false, &|_| false);
10✔
228
        assert!(epoch.is_none());
1✔
229

230
        // One source has new data, epoch should be closed.
231
        let (_, epoch) = run_epoch_manager(&|_| false, &|index| index == 0);
10✔
232
        assert_eq!(epoch.unwrap(), 0);
1✔
233

234
        // All but one source requests termination, should not terminate.
235
        let (terminating, _) = run_epoch_manager(&|index| index != 0, &|_| false);
10✔
236
        assert!(!terminating);
1✔
237

238
        // All sources requests termination, should terminate.
239
        let (terminating, _) = run_epoch_manager(&|_| true, &|_| false);
10✔
240
        assert!(terminating);
1✔
241
    }
1✔
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc