• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

divviup / divviup-api / 15912404487

26 Jun 2025 09:01PM UTC coverage: 57.019% (-0.07%) from 57.086%
15912404487

Pull #1779

github

web-flow
Merge 0d4094ab5 into 18a3faee7
Pull Request #1779: Fix Clippy lint

3871 of 6789 relevant lines covered (57.02%)

64.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.68
/src/queue.rs
1
mod job;
2
pub use crate::entity::queue::JobStatus;
3
pub use job::*;
4

5
use crate::{
6
    entity::queue::{ActiveModel, Column, Entity, Model},
7
    Config, Db, DivviupApi,
8
};
9
use sea_orm::{
10
    sea_query::{all, Expr},
11
    ActiveModelTrait, ActiveValue, ColumnTrait, DbErr, EntityTrait, IntoActiveModel,
12
    PaginatorTrait, QueryFilter, TransactionTrait,
13
};
14
use std::{ops::Range, sync::Arc, time::Duration};
15
use time::OffsetDateTime;
16
use tokio::{
17
    task::{JoinHandle, JoinSet},
18
    time::sleep,
19
};
20
use trillium_tokio::{CloneCounterObserver, Stopper};
21

22
#[derive(Clone, Debug)]
23
pub struct Queue {
24
    observer: CloneCounterObserver,
25
    stopper: Stopper,
26
    db: Db,
27
    job_state: Arc<SharedJobState>,
28
}
29
/*
30
These configuration variables may eventually be useful to put on Config
31
*/
32
const MAX_RETRY: i32 = 5;
33
const QUEUE_CHECK_INTERVAL: Range<u64> = 60_000..120_000;
34
const SCHEDULE_RANDOMNESS: Range<u64> = 0..15_000;
35
const QUEUE_WORKER_COUNT: u8 = 2;
36

37
fn reschedule_based_on_failure_count(failure_count: i32) -> Option<OffsetDateTime> {
×
38
    if failure_count >= MAX_RETRY {
×
39
        None
×
40
    } else {
41
        let duration = Duration::from_millis(
×
42
            1000 * 4_u64.pow(failure_count.try_into().unwrap())
×
43
                + fastrand::u64(SCHEDULE_RANDOMNESS),
×
44
        );
45
        Some(OffsetDateTime::now_utc() + duration)
×
46
    }
47
}
×
48

49
impl From<&DivviupApi> for Queue {
50
    fn from(app: &DivviupApi) -> Self {
1✔
51
        Self::new(app.db(), app.config())
1✔
52
    }
1✔
53
}
54

55
impl Queue {
56
    pub fn new(db: &Db, config: &Config) -> Self {
2✔
57
        Self {
2✔
58
            observer: Default::default(),
2✔
59
            db: db.clone(),
2✔
60
            stopper: Default::default(),
2✔
61
            job_state: Arc::new(config.into()),
2✔
62
        }
2✔
63
    }
2✔
64

65
    pub fn with_observer(mut self, observer: CloneCounterObserver) -> Self {
×
66
        self.observer = observer;
×
67
        self
×
68
    }
×
69

70
    pub fn with_stopper(mut self, stopper: Stopper) -> Self {
×
71
        self.stopper = stopper;
×
72
        self
×
73
    }
×
74

75
    pub async fn schedule_recurring_tasks_if_needed(&self) -> Result<(), DbErr> {
×
76
        let tx = self.db.begin().await?;
×
77

78
        let session_cleanup_jobs = Entity::find()
×
79
            .filter(all![
×
80
                Expr::cust_with_expr("job->>'type' = $1", "SessionCleanup"),
×
81
                Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
×
82
            ])
×
83
            .count(&tx)
×
84
            .await?;
×
85

86
        if session_cleanup_jobs == 0 {
×
87
            Job::from(SessionCleanup).insert(&tx).await?;
×
88
        }
×
89
        tx.commit().await?;
×
90

91
        let tx = self.db.begin().await?;
×
92
        let queue_cleanup_jobs = Entity::find()
×
93
            .filter(all![
×
94
                Expr::cust_with_expr("job->>'type' = $1", "QueueCleanup"),
×
95
                Column::ScheduledAt.gt(OffsetDateTime::now_utc()),
×
96
            ])
×
97
            .count(&tx)
×
98
            .await?;
×
99

100
        if queue_cleanup_jobs == 0 {
×
101
            Job::from(QueueCleanup).insert(&tx).await?;
×
102
        }
×
103
        tx.commit().await?;
×
104

105
        Ok(())
×
106
    }
×
107

108
    pub async fn perform_one_queue_job(&self) -> Result<Option<Model>, DbErr> {
5✔
109
        let tx = self.db.begin().await?;
5✔
110
        let model = if let Some(queue_item) = Entity::next(&tx).await? {
5✔
111
            let _counter = self.observer.counter();
4✔
112
            let mut queue_item = queue_item.into_active_model();
4✔
113

114
            let mut job = queue_item.job.take().ok_or_else(|| {
4✔
115
                DbErr::Custom(String::from(
×
116
                    r#"Queue item found without a job.
×
117
                       We believe this to be unreachable"#,
×
118
                ))
×
119
            })?;
×
120

121
            let result = job.perform(&self.job_state, &tx).await;
4✔
122
            queue_item.job = ActiveValue::Set(job);
4✔
123

124
            match result {
4✔
125
                Ok(Some(next_job)) => {
3✔
126
                    queue_item.status = ActiveValue::Set(JobStatus::Success);
3✔
127
                    queue_item.scheduled_at = ActiveValue::Set(None);
3✔
128

129
                    let mut next_job = ActiveModel::from(next_job);
3✔
130
                    next_job.parent_id = ActiveValue::Set(Some(*queue_item.id.as_ref()));
3✔
131
                    let next_job = next_job.insert(&tx).await?;
3✔
132
                    queue_item.child_id = ActiveValue::Set(Some(next_job.id));
3✔
133
                }
134

135
                Ok(None) => {
1✔
136
                    queue_item.scheduled_at = ActiveValue::Set(None);
1✔
137
                    queue_item.status = ActiveValue::Set(JobStatus::Success);
1✔
138
                }
1✔
139

140
                Err(e) if e.is_retryable() => {
×
141
                    queue_item.failure_count =
×
142
                        ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
×
143
                    let reschedule =
×
144
                        reschedule_based_on_failure_count(*queue_item.failure_count.as_ref());
×
145
                    queue_item.status = ActiveValue::Set(
146
                        reschedule.map_or(JobStatus::Failed, |_| JobStatus::Pending),
×
147
                    );
148
                    queue_item.scheduled_at = ActiveValue::Set(reschedule);
×
149
                    queue_item.error_message = ActiveValue::Set(Some(e.into()));
×
150
                }
151

152
                Err(e) => {
×
153
                    queue_item.failure_count =
×
154
                        ActiveValue::Set(queue_item.failure_count.as_ref() + 1);
×
155
                    queue_item.scheduled_at = ActiveValue::Set(None);
×
156
                    queue_item.status = ActiveValue::Set(JobStatus::Failed);
×
157
                    queue_item.error_message = ActiveValue::Set(Some(e.into()));
×
158
                }
×
159
            }
160

161
            queue_item.updated_at = ActiveValue::Set(OffsetDateTime::now_utc());
4✔
162
            Some(queue_item.update(&tx).await?)
4✔
163
        } else {
164
            None
1✔
165
        };
166
        tx.commit().await?;
5✔
167
        Ok(model)
5✔
168
    }
5✔
169

170
    fn spawn_worker(self, join_set: &mut JoinSet<()>) {
×
171
        join_set.spawn(async move {
×
172
            loop {
173
                if self.stopper.is_stopped() {
×
174
                    break;
×
175
                }
×
176

177
                match self.perform_one_queue_job().await {
×
178
                    Err(e) => {
×
179
                        tracing::error!("job error {e}");
×
180
                    }
181

182
                    Ok(Some(_)) => {}
×
183

184
                    Ok(None) => {
185
                        let sleep_future =
×
186
                            sleep(Duration::from_millis(fastrand::u64(QUEUE_CHECK_INTERVAL)));
×
187
                        self.stopper.stop_future(sleep_future).await;
×
188
                    }
189
                }
190
            }
191
        });
×
192
    }
×
193

194
    async fn supervise_workers(self) {
×
195
        self.schedule_recurring_tasks_if_needed().await.unwrap();
×
196
        let mut join_set = JoinSet::new();
×
197
        for _ in 0..QUEUE_WORKER_COUNT {
×
198
            self.clone().spawn_worker(&mut join_set);
×
199
        }
×
200

201
        while join_set.join_next().await.is_some() {
×
202
            if !self.stopper.is_stopped() {
×
203
                tracing::error!("Worker task shut down. Restarting.");
×
204
                self.clone().spawn_worker(&mut join_set);
×
205
            }
×
206
        }
207
    }
×
208

209
    pub fn spawn_workers(self) -> JoinHandle<()> {
×
210
        tokio::task::spawn(self.supervise_workers())
×
211
    }
×
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc