• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

divviup / divviup-api / 16946563839

13 Aug 2025 06:54PM UTC coverage: 55.907% (+0.03%) from 55.879%
16946563839

push

github

web-flow
use correct request path for agg job metrics (#1857)

Now it matches the path served by Janus ([1])

[1]: https://github.com/divviup/janus/blob/1fc4ac830/aggregator_api/src/lib.rs#L103

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

3885 of 6949 relevant lines covered (55.91%)

60.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.5
/src/clients/aggregator_client.rs
1
use crate::{
2
    clients::{ClientConnExt, ClientError},
3
    entity::{task::ProvisionableTask, Aggregator},
4
    handler::Error,
5
};
6
use api_types::TaskAggregationJobMetrics;
7
use janus_messages::Time as JanusTime;
8
use serde::{de::DeserializeOwned, Serialize};
9
use trillium_client::{Client, KnownHeaderName};
10
use url::Url;
11
pub mod api_types;
12
pub use api_types::{
13
    AggregatorApiConfig, TaskCreate, TaskIds, TaskPatch, TaskResponse, TaskUploadMetrics,
14
};
15

16
const CONTENT_TYPE: &str = "application/vnd.janus.aggregator+json;version=0.1";
17

18
#[derive(Debug, Clone)]
19
pub struct AggregatorClient {
20
    client: Client,
21
    aggregator: Aggregator,
22
}
23

24
impl AsRef<Client> for AggregatorClient {
25
    fn as_ref(&self) -> &Client {
×
26
        &self.client
×
27
    }
×
28
}
29

30
impl AggregatorClient {
31
    pub fn new(client: Client, aggregator: Aggregator, bearer_token: &str) -> Self {
45✔
32
        let client = client
45✔
33
            .with_base(aggregator.api_url.clone())
45✔
34
            .with_default_header(
45✔
35
                KnownHeaderName::Authorization,
45✔
36
                format!("Bearer {bearer_token}"),
45✔
37
            )
38
            .with_default_header(KnownHeaderName::Accept, CONTENT_TYPE);
45✔
39

40
        Self { client, aggregator }
45✔
41
    }
45✔
42

43
    pub async fn get_config(
25✔
44
        client: Client,
25✔
45
        base_url: Url,
25✔
46
        token: &str,
25✔
47
    ) -> Result<AggregatorApiConfig, ClientError> {
25✔
48
        client
25✔
49
            .get(base_url)
25✔
50
            .with_request_header(KnownHeaderName::Authorization, format!("Bearer {token}"))
25✔
51
            .with_request_header(KnownHeaderName::Accept, CONTENT_TYPE)
25✔
52
            .success_or_client_error()
25✔
53
            .await?
25✔
54
            .response_json()
22✔
55
            .await
22✔
56
            .map_err(Into::into)
22✔
57
    }
25✔
58

59
    pub async fn get_task_ids(&self) -> Result<Vec<String>, ClientError> {
2✔
60
        let mut ids = vec![];
2✔
61
        let mut path = String::from("task_ids");
2✔
62
        loop {
63
            let TaskIds {
64
                task_ids,
6✔
65
                pagination_token,
6✔
66
            } = self.get(&path).await?;
6✔
67

68
            ids.extend(task_ids);
6✔
69

70
            match pagination_token {
6✔
71
                Some(pagination_token) => {
4✔
72
                    path = format!("task_ids?pagination_token={pagination_token}");
4✔
73
                }
4✔
74
                None => break Ok(ids),
2✔
75
            }
76
        }
77
    }
2✔
78

79
    pub async fn get_task(&self, task_id: &str) -> Result<TaskResponse, ClientError> {
×
80
        self.get(&format!("tasks/{task_id}")).await
×
81
    }
×
82

83
    pub async fn get_task_upload_metrics(
3✔
84
        &self,
3✔
85
        task_id: &str,
3✔
86
    ) -> Result<TaskUploadMetrics, ClientError> {
3✔
87
        self.get(&format!("tasks/{task_id}/metrics/uploads")).await
3✔
88
    }
3✔
89

90
    pub async fn get_task_aggregation_job_metrics(
×
91
        &self,
×
92
        task_id: &str,
×
93
    ) -> Result<TaskAggregationJobMetrics, ClientError> {
×
NEW
94
        self.get(&format!("tasks/{task_id}/metrics/aggregations"))
×
95
            .await
×
96
    }
×
97

98
    pub async fn create_task(&self, task: &ProvisionableTask) -> Result<TaskResponse, Error> {
14✔
99
        let task_create = TaskCreate::build(&self.aggregator, task)?;
14✔
100
        self.post("tasks", &task_create).await.map_err(Into::into)
14✔
101
    }
14✔
102

103
    pub async fn update_task_expiration(
26✔
104
        &self,
26✔
105
        task_id: &str,
26✔
106
        expiration: Option<JanusTime>,
26✔
107
    ) -> Result<TaskResponse, Error> {
26✔
108
        self.patch(
26✔
109
            &format!("tasks/{task_id}"),
26✔
110
            &TaskPatch {
26✔
111
                task_expiration: expiration,
26✔
112
            },
26✔
113
        )
26✔
114
        .await
26✔
115
        .map_err(Into::into)
26✔
116
    }
26✔
117

118
    // private below here
119

120
    async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, ClientError> {
9✔
121
        self.client
9✔
122
            .get(path)
9✔
123
            .success_or_client_error()
9✔
124
            .await?
9✔
125
            .response_json()
9✔
126
            .await
9✔
127
            .map_err(ClientError::from)
9✔
128
    }
9✔
129

130
    async fn post<T: DeserializeOwned>(
14✔
131
        &self,
14✔
132
        path: &str,
14✔
133
        body: &impl Serialize,
14✔
134
    ) -> Result<T, ClientError> {
14✔
135
        self.client
14✔
136
            .post(path)
14✔
137
            .with_json_body(body)?
14✔
138
            .with_request_header(KnownHeaderName::ContentType, CONTENT_TYPE)
14✔
139
            .success_or_client_error()
14✔
140
            .await?
14✔
141
            .response_json()
14✔
142
            .await
14✔
143
            .map_err(ClientError::from)
14✔
144
    }
14✔
145

146
    async fn patch<T: DeserializeOwned>(
26✔
147
        &self,
26✔
148
        path: &str,
26✔
149
        body: &impl Serialize,
26✔
150
    ) -> Result<T, ClientError> {
26✔
151
        self.client
26✔
152
            .patch(path)
26✔
153
            .with_json_body(body)?
26✔
154
            .with_request_header(KnownHeaderName::ContentType, CONTENT_TYPE)
26✔
155
            .success_or_client_error()
26✔
156
            .await?
26✔
157
            .response_json()
24✔
158
            .await
24✔
159
            .map_err(ClientError::from)
24✔
160
    }
26✔
161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc