• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4309615408

pending completion
4309615408

push

github

GitHub
chore: Remove an unused `Arc<RwLock>` (#1106)

6 of 6 new or added lines in 2 files covered. (100.0%)

28466 of 40109 relevant lines covered (70.97%)

50957.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.0
/dozer-types/src/ingestion_types.rs
1
use prettytable::Table as PrettyTable;
2
use std::fmt::Debug;
3

4
use serde::{Deserialize, Serialize};
5
use thiserror::Error;
6

7
use crate::{errors::internal::BoxedError, node::OpIdentifier, types::Operation};
8

9
#[derive(Debug, Clone, PartialEq)]
×
10
pub struct IngestionMessage {
11
    pub identifier: OpIdentifier,
12
    pub kind: IngestionMessageKind,
×
13
}
14

15
impl IngestionMessage {
16
    pub fn new_op(txn: u64, seq_no: u64, op: Operation) -> Self {
42,248,910✔
17
        Self {
42,248,910✔
18
            identifier: OpIdentifier::new(txn, seq_no),
42,248,910✔
19
            kind: IngestionMessageKind::OperationEvent(op),
42,248,910✔
20
        }
42,248,910✔
21
    }
42,248,910✔
22

23
    pub fn new_snapshotting_done(txn: u64, seq_no: u64) -> Self {
10✔
24
        Self {
10✔
25
            identifier: OpIdentifier::new(txn, seq_no),
10✔
26
            kind: IngestionMessageKind::SnapshottingDone,
10✔
27
        }
10✔
28
    }
10✔
29
}
30

×
31
#[derive(Clone, Debug, PartialEq)]
2✔
32
pub enum IngestionMessageKind {
33
    OperationEvent(Operation),
34
    SnapshottingDone,
35
}
36

37
#[derive(Error, Debug)]
×
38
pub enum IngestorError {
39
    #[error("Failed to send message on channel")]
40
    ChannelError(#[from] BoxedError),
41
}
42

43
pub trait IngestorForwarder: Send + Sync + Debug {
44
    fn forward(&self, msg: IngestionMessage) -> Result<(), IngestorError>;
45
}
×
46

47
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
10✔
48
pub struct EthFilter {
49
    // Starting block
50
    #[prost(uint64, optional, tag = "1")]
51
    pub from_block: Option<u64>,
52
    #[prost(uint64, optional, tag = "2")]
53
    pub to_block: Option<u64>,
54
    #[prost(string, repeated, tag = "3")]
55
    #[serde(default)]
56
    pub addresses: Vec<String>,
57
    #[prost(string, repeated, tag = "4")]
×
58
    #[serde(default)]
×
59
    pub topics: Vec<String>,
×
60
}
61

×
62
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
218✔
63
pub struct GrpcConfig {
×
64
    #[prost(string, tag = "1", default = "0.0.0.0")]
65
    #[serde(default = "default_ingest_host")]
×
66
    pub host: String,
67
    #[prost(uint32, tag = "2", default = "8085")]
68
    #[serde(default = "default_ingest_port")]
69
    pub port: u32,
70
    #[prost(oneof = "GrpcConfigSchemas", tags = "3,4")]
71
    pub schemas: Option<GrpcConfigSchemas>,
72
}
73

×
74
fn default_ingest_host() -> String {
10✔
75
    "0.0.0.0".to_owned()
10✔
76
}
10✔
77

78
fn default_ingest_port() -> u32 {
10✔
79
    8085
10✔
80
}
10✔
81

×
82
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)]
210✔
83
pub enum GrpcConfigSchemas {
84
    #[prost(string, tag = "3")]
85
    Inline(String),
×
86
    #[prost(string, tag = "4")]
87
    Path(String),
88
}
89

90
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
6✔
91
pub struct EthConfig {
92
    #[prost(oneof = "EthProviderConfig", tags = "2,3")]
93
    pub provider: Option<EthProviderConfig>,
×
94
}
95

96
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Oneof, Hash)]
4✔
97
pub enum EthProviderConfig {
98
    #[prost(message, tag = "2")]
99
    Log(EthLogConfig),
100
    #[prost(message, tag = "3")]
101
    Trace(EthTraceConfig),
102
}
103

104
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
12✔
105
pub struct EthLogConfig {
106
    #[prost(string, tag = "1")]
107
    pub wss_url: String,
108
    #[prost(message, optional, tag = "2")]
109
    pub filter: Option<EthFilter>,
110
    #[prost(message, repeated, tag = "3")]
111
    #[serde(default)]
112
    pub contracts: Vec<EthContract>,
113
}
114

115
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
116
pub struct EthTraceConfig {
117
    #[prost(string, tag = "1")]
118
    pub https_url: String,
×
119
    // Starting block
×
120
    #[prost(uint64, tag = "2")]
×
121
    pub from_block: u64,
122
    #[prost(uint64, optional, tag = "3")]
123
    pub to_block: Option<u64>,
×
124
    #[prost(uint64, tag = "4", default = "3")]
×
125
    #[serde(default = "default_batch_size")]
126
    pub batch_size: u64,
×
127
}
×
128

×
129
fn default_batch_size() -> u64 {
×
130
    3
×
131
}
×
132

×
133
impl EthConfig {
×
134
    pub fn convert_to_table(&self) -> PrettyTable {
×
135
        let mut table = table!();
×
136

×
137
        let provider = self.provider.as_ref().expect("Must provide provider");
×
138
        match provider {
×
139
            EthProviderConfig::Log(log) => {
×
140
                table.add_row(row!["provider", "logs"]);
×
141
                table.add_row(row!["wss_url", format!("{:?}", log.wss_url)]);
×
142
                if let Some(filter) = &log.filter {
×
143
                    table.add_row(row!["filter", format!("{filter:?}")]);
×
144
                }
×
145
                if !log.contracts.is_empty() {
×
146
                    table.add_row(row!["contracts", format!("{:?}", log.contracts)]);
×
147
                }
×
148
            }
149
            EthProviderConfig::Trace(trace) => {
×
150
                table.add_row(row!["https_url", format!("{:?}", trace.https_url)]);
×
151
                table.add_row(row!["provider", "traces"]);
×
152
                table.add_row(row!("trace", format!("{trace:?}")));
×
153
            }
×
154
        }
155
        table
×
156
    }
×
157
}
158

159
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
160
pub struct EthContract {
161
    #[prost(string, tag = "1")]
162
    pub name: String,
163
    #[prost(string, tag = "2")]
164
    pub address: String,
165
    #[prost(string, tag = "3")]
166
    pub abi: String,
167
}
168

×
169
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
170
pub struct KafkaConfig {
×
171
    #[prost(string, tag = "1")]
×
172
    pub broker: String,
×
173
    #[prost(string, optional, tag = "3")]
×
174
    pub schema_registry_url: Option<String>,
×
175
}
×
176

×
177
impl KafkaConfig {
×
178
    pub fn convert_to_table(&self) -> PrettyTable {
×
179
        table!(
×
180
            ["broker", self.broker],
×
181
            [
×
182
                "schema registry url",
×
183
                self.schema_registry_url
×
184
                    .as_ref()
×
185
                    .map_or("--------", |url| url)
×
186
            ]
×
187
        )
×
188
    }
×
189
}
190

191
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
192
pub struct SnowflakeConfig {
193
    #[prost(string, tag = "1")]
194
    pub server: String,
195
    #[prost(string, tag = "2")]
196
    pub port: String,
197
    #[prost(string, tag = "3")]
198
    pub user: String,
199
    #[prost(string, tag = "4")]
200
    pub password: String,
201
    #[prost(string, tag = "5")]
202
    pub database: String,
×
203
    #[prost(string, tag = "6")]
×
204
    pub schema: String,
×
205
    #[prost(string, tag = "7")]
×
206
    pub warehouse: String,
×
207
    #[prost(string, optional, tag = "8")]
×
208
    pub driver: Option<String>,
×
209
}
×
210

×
211
impl SnowflakeConfig {
×
212
    pub fn convert_to_table(&self) -> PrettyTable {
×
213
        table!(
×
214
            ["server", self.server],
×
215
            ["port", self.port],
×
216
            ["user", self.user],
×
217
            ["password", "************"],
×
218
            ["database", self.database],
×
219
            ["schema", self.schema],
×
220
            ["warehouse", self.warehouse],
×
221
            ["driver", self.driver.as_ref().map_or("default", |d| d)]
×
222
        )
×
223
    }
×
224
}
225

226
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
227
pub struct DataFusionConfig {
228
    #[prost(string, tag = "1")]
229
    pub access_key_id: String,
×
230
    #[prost(string, tag = "2")]
×
231
    pub secret_access_key: String,
×
232
    #[prost(string, tag = "3")]
×
233
    pub region: String,
×
234
    #[prost(string, tag = "4")]
×
235
    pub bucket_name: String,
×
236
}
×
237

238
impl DataFusionConfig {
239
    pub fn convert_to_table(&self) -> PrettyTable {
×
240
        table!(
×
241
            ["access_key_id", self.access_key_id],
×
242
            ["secret_access_key", self.secret_access_key],
×
243
            ["region", self.region],
×
244
            ["bucket_name", self.bucket_name]
×
245
        )
×
246
    }
×
247
}
248

249
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
26✔
250
pub struct Table {
251
    #[prost(string, tag = "1")]
×
252
    pub name: String,
253
    #[prost(string, tag = "2")]
254
    pub prefix: String,
255
    #[prost(string, tag = "3")]
256
    pub file_type: String,
257
    #[prost(string, tag = "4")]
258
    pub extension: String,
259
}
260

261
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
262
pub struct S3Details {
263
    #[prost(string, tag = "1")]
×
264
    pub access_key_id: String,
265
    #[prost(string, tag = "2")]
266
    pub secret_access_key: String,
267
    #[prost(string, tag = "3")]
268
    pub region: String,
269
    #[prost(string, tag = "4")]
270
    pub bucket_name: String,
271
}
272

×
273
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
×
274
pub struct S3Storage {
×
275
    #[prost(message, optional, tag = "1")]
×
276
    pub details: Option<S3Details>,
×
277
    #[prost(message, repeated, tag = "2")]
×
278
    pub tables: Vec<Table>,
×
279
}
×
280

×
281
impl S3Storage {
×
282
    pub fn convert_to_table(&self) -> PrettyTable {
×
283
        self.details.as_ref().map_or_else(
×
284
            || table!(),
×
285
            |details| {
×
286
                table!(
×
287
                    ["access_key_id", details.access_key_id],
×
288
                    ["secret_access_key", details.secret_access_key],
×
289
                    ["region", details.region],
×
290
                    ["bucket_name", details.bucket_name]
×
291
                )
×
292
            },
×
293
        )
×
294
    }
×
295
}
296

297
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
25✔
298
pub struct LocalDetails {
299
    #[prost(string, tag = "1")]
300
    pub path: String,
301
}
302

×
303
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, ::prost::Message, Hash)]
5✔
304
pub struct LocalStorage {
×
305
    #[prost(message, optional, tag = "1")]
×
306
    pub details: Option<LocalDetails>,
×
307
    #[prost(message, repeated, tag = "2")]
308
    pub tables: Vec<Table>,
309
}
310

311
impl LocalStorage {
312
    pub fn convert_to_table(&self) -> PrettyTable {
×
313
        self.details
×
314
            .as_ref()
×
315
            .map_or_else(|| table!(), |details| table!(["path", details.path]))
×
316
    }
×
317
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc