• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3974044832

pending completion
3974044832

Pull #703

github

GitHub
Merge 6f30ce3b1 into 9fa836726
Pull Request #703: fix: forbid duplicated cte names

27 of 27 new or added lines in 1 file covered. (100.0%)

22298 of 33673 relevant lines covered (66.22%)

35756.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/dozer-core/src/dag/epoch.rs
1
use crate::dag::node::NodeHandle;
2
use dozer_types::parking_lot::Mutex;
3
use std::collections::HashMap;
4
use std::fmt::{Display, Formatter};
5
use std::sync::{Arc, Barrier};
6
use std::thread::sleep;
7
use std::time::Duration;
8

9
#[derive(Clone, Debug, PartialEq, Eq)]
23,004✔
10
pub struct Epoch {
11
    pub id: u64,
12
    pub details: HashMap<NodeHandle, (u64, u64)>,
13
}
14

15
impl Epoch {
16
    pub fn new(id: u64, details: HashMap<NodeHandle, (u64, u64)>) -> Self {
37,079✔
17
        Self { id, details }
37,079✔
18
    }
37,079✔
19

20
    pub fn from(id: u64, node_handle: NodeHandle, txid: u64, seq_in_tx: u64) -> Self {
×
21
        Self {
×
22
            id,
×
23
            details: [(node_handle, (txid, seq_in_tx))].into_iter().collect(),
×
24
        }
×
25
    }
×
26
}
27

28
impl Display for Epoch {
29
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
30
        let details_str = self
×
31
            .details
×
32
            .iter()
×
33
            .map(|e| format!("{} -> {}:{}", e.0, e.1 .0, e.1 .1))
×
34
            .fold(String::new(), |a, b| a + ", " + b.as_str());
×
35
        f.write_str(format!("epoch: {}, details: {}", self.id, details_str).as_str())
×
36
    }
×
37
}
38

39
#[derive(Debug, Clone)]
8,232✔
40
pub struct ClosingEpoch {
41
    pub id: u64,
42
    pub details: HashMap<NodeHandle, (u64, u64)>,
43
    pub terminating: bool,
44
}
45

46
impl ClosingEpoch {
47
    pub fn new(id: u64, details: HashMap<NodeHandle, (u64, u64)>, terminating: bool) -> Self {
6,833✔
48
        Self {
6,833✔
49
            id,
6,833✔
50
            details,
6,833✔
51
            terminating,
6,833✔
52
        }
6,833✔
53
    }
6,833✔
54
}
55

56
#[derive(Debug)]
×
57
enum EpochManagerState {
58
    Closing {
59
        epoch: Epoch,
60
        should_terminate: bool,
61
        barrier: Arc<Barrier>,
62
    },
63
    Closed {
64
        epoch: ClosingEpoch,
65
        num_participant_confirmations: usize,
66
    },
67
}
68

69
#[derive(Debug)]
×
70
pub(crate) struct EpochManager {
71
    num_participants: usize,
72
    state: Mutex<EpochManagerState>,
73
}
74

75
impl EpochManager {
76
    pub fn new(num_participants: usize) -> Self {
127✔
77
        Self {
127✔
78
            num_participants,
127✔
79
            state: Mutex::new(EpochManagerState::Closing {
127✔
80
                epoch: Epoch::new(0, HashMap::new()),
127✔
81
                should_terminate: true,
127✔
82
                barrier: Arc::new(Barrier::new(num_participants)),
127✔
83
            }),
127✔
84
        }
127✔
85
    }
127✔
86

87
    pub fn wait_for_epoch_close(
8,232✔
88
        &self,
8,232✔
89
        participant: NodeHandle,
8,232✔
90
        txn_id_and_seq_number: (u64, u64),
8,232✔
91
        request_termination: bool,
8,232✔
92
    ) -> ClosingEpoch {
8,232✔
93
        let barrier = loop {
8,232✔
94
            let mut state = self.state.lock();
8,232✔
95
            match &mut *state {
8,232✔
96
                EpochManagerState::Closing {
97
                    epoch,
8,232✔
98
                    should_terminate,
8,232✔
99
                    barrier,
8,232✔
100
                } => {
8,232✔
101
                    epoch.details.insert(participant, txn_id_and_seq_number);
8,232✔
102
                    // If anyone doesn't want to terminate, we don't terminate.
8,232✔
103
                    *should_terminate = *should_terminate && request_termination;
8,232✔
104
                    break barrier.clone();
8,232✔
105
                }
106
                EpochManagerState::Closed { .. } => {
×
107
                    // This thread wants to close a new epoch while some other thread hasn't got confirmation of last epoch closing.
×
108
                    // Just release the lock and put this thread to sleep.
×
109
                    drop(state);
×
110
                    sleep(Duration::from_millis(1));
×
111
                }
×
112
            }
113
        };
114

115
        barrier.wait();
8,232✔
116

8,232✔
117
        let mut state = self.state.lock();
8,232✔
118
        if let EpochManagerState::Closing {
119
            epoch,
6,835✔
120
            should_terminate,
6,835✔
121
            ..
122
        } = &mut *state
8,232✔
123
        {
124
            // This thread is the first one in this critical area.
125
            debug_assert!(epoch.details.len() == self.num_participants);
6,834✔
126
            let closing_epoch =
6,834✔
127
                ClosingEpoch::new(epoch.id, epoch.details.clone(), *should_terminate);
6,834✔
128
            *state = EpochManagerState::Closed {
6,834✔
129
                epoch: closing_epoch,
6,834✔
130
                num_participant_confirmations: 0,
6,834✔
131
            };
6,834✔
132
        }
1,397✔
133

134
        match &mut *state {
8,231✔
135
            EpochManagerState::Closed {
136
                epoch,
8,231✔
137
                num_participant_confirmations,
8,231✔
138
            } => {
8,231✔
139
                *num_participant_confirmations += 1;
8,231✔
140
                let closing_epoch = epoch.clone();
8,231✔
141
                if *num_participant_confirmations == self.num_participants {
8,231✔
142
                    // This thread is the last one in this critical area.
6,835✔
143
                    *state = EpochManagerState::Closing {
6,835✔
144
                        epoch: Epoch::new(closing_epoch.id + 1, HashMap::new()),
6,835✔
145
                        should_terminate: true,
6,835✔
146
                        barrier: Arc::new(Barrier::new(self.num_participants)),
6,835✔
147
                    };
6,835✔
148
                }
6,835✔
149
                closing_epoch
8,231✔
150
            }
151
            EpochManagerState::Closing { .. } => {
152
                unreachable!("We just modified `EpochManagerstate` to `Closed`")
×
153
            }
154
        }
155
    }
8,231✔
156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc