• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.0
/dozer-ingestion/src/errors.rs
1
#![allow(clippy::enum_variant_names)]
2

3
use dozer_types::errors::internal::BoxedError;
4
use dozer_types::errors::types::{SerializationError, TypeError};
5
use dozer_types::ingestion_types::IngestorError;
6
use dozer_types::thiserror::Error;
7
use dozer_types::{bincode, serde_json};
8
use dozer_types::{rust_decimal, thiserror};
9

10
use base64::DecodeError;
11
#[cfg(feature = "snowflake")]
12
use std::num::TryFromIntError;
13
use std::str::Utf8Error;
14

15
#[cfg(feature = "snowflake")]
16
use odbc::DiagnosticRecord;
17
use schema_registry_converter::error::SRCError;
18

19
#[derive(Error, Debug)]
×
20
pub enum ConnectorError {
21
    #[error("Table not found: {0}")]
22
    TableNotFound(String),
23

24
    #[error("Columns are expected in table_info")]
25
    ColumnsNotFound,
26

27
    #[error("Failed to initialize connector")]
28
    InitializationError,
29

30
    #[error("Failed to map configuration")]
31
    WrongConnectionConfiguration,
32

33
    #[error("This connector doesn't support this method: {0}")]
34
    UnsupportedConnectorMethod(String),
35

36
    #[error("Unexpected query message")]
37
    UnexpectedQueryMessageError,
38

39
    #[error("Schema Identifier is not present")]
40
    SchemaIdentifierNotFound,
41

42
    #[error(transparent)]
43
    PostgresConnectorError(#[from] PostgresConnectorError),
44

45
    #[cfg(feature = "snowflake")]
46
    #[error(transparent)]
47
    SnowflakeError(#[from] SnowflakeError),
48

49
    #[error(transparent)]
50
    DebeziumError(#[from] DebeziumError),
51

52
    #[error(transparent)]
53
    TypeError(#[from] TypeError),
54

55
    #[error(transparent)]
56
    InternalError(#[from] BoxedError),
57

58
    #[error("Failed to send message on channel")]
59
    IngestorError(#[source] IngestorError),
60

61
    #[error("Error in Eth Connection: {0}")]
62
    EthError(#[source] web3::Error),
63

64
    #[error("Failed fetching after {0} recursions")]
65
    EthTooManyRecurisions(usize),
66

67
    #[error("Received empty message in connector")]
68
    EmptyMessage,
69
}
70
impl ConnectorError {
71
    pub fn map_serialization_error(e: serde_json::Error) -> ConnectorError {
×
72
        ConnectorError::TypeError(TypeError::SerializationError(SerializationError::Json(e)))
×
73
    }
×
74

75
    pub fn map_bincode_serialization_error(e: bincode::Error) -> ConnectorError {
×
76
        ConnectorError::TypeError(TypeError::SerializationError(SerializationError::Bincode(
×
77
            e,
×
78
        )))
×
79
    }
×
80
}
81

82
#[derive(Error, Debug)]
×
83
pub enum PostgresConnectorError {
84
    #[error("Query failed in connector: {0}")]
85
    InvalidQueryError(#[source] tokio_postgres::Error),
86

87
    #[error("Failed to connect to postgres with the specified configuration. {0}")]
88
    ConnectionFailure(#[source] tokio_postgres::Error),
89

90
    #[error("Replication is not available for user")]
91
    ReplicationIsNotAvailableForUserError,
92

93
    #[error("WAL level should be 'logical'")]
94
    WALLevelIsNotCorrect(),
95

96
    #[error("Cannot find table: {:?}", .0.join(", "))]
97
    TableError(Vec<String>),
98

99
    #[error("Cannot find column {0} in {1}")]
100
    ColumnNotFound(String, String),
101

102
    #[error("Failed to create a replication slot : {0}")]
103
    CreateSlotError(String),
104

105
    #[error("Failed to create publication")]
106
    CreatePublicationError,
107

108
    #[error("Failed to drop publication")]
109
    DropPublicationError,
110

111
    #[error("Failed to begin txn for replication")]
112
    BeginReplication,
113

114
    #[error("Failed to begin txn for replication")]
115
    CommitReplication,
116

117
    #[error("fetch of replication slot info failed")]
118
    FetchReplicationSlot,
119

120
    #[error("No slots available or all available slots are used")]
121
    NoAvailableSlotsError,
122

123
    #[error("Slot {0} not found")]
124
    SlotNotExistError(String),
125

126
    #[error("Slot {0} is already used by another process")]
127
    SlotIsInUseError(String),
128

129
    #[error("Start lsn is before first available lsn - {0} < {1}")]
130
    StartLsnIsBeforeLastFlushedLsnError(String, String),
131

132
    #[error("fetch of replication slot info failed. Error: {0}")]
133
    SyncWithSnapshotError(String),
134

135
    #[error("Replication stream error. Error: {0}")]
136
    ReplicationStreamError(String),
137

138
    #[error("Received unexpected message in replication stream")]
139
    UnexpectedReplicationMessageError,
140

141
    #[error("Replication stream error")]
142
    ReplicationStreamEndError,
143

144
    #[error(transparent)]
145
    PostgresSchemaError(#[from] PostgresSchemaError),
146

147
    #[error("LSN not stored for replication slot")]
148
    LSNNotStoredError,
149

150
    #[error("LSN parse error. Given lsn: {0}")]
151
    LsnParseError(String),
152

153
    #[error("LSN not returned from replication slot creation query")]
154
    LsnNotReturnedFromReplicationSlot,
155

156
    #[error("Table name \"{0}\" not valid")]
157
    TableNameNotValid(String),
158

159
    #[error("Column name \"{0}\" not valid")]
160
    ColumnNameNotValid(String),
161

162
    #[error("Relation not found in replication: {0}")]
163
    RelationNotFound(#[source] std::io::Error),
164
}
165

166
#[derive(Error, Debug, Eq, PartialEq)]
×
167
pub enum PostgresSchemaError {
168
    #[error("Schema's '{0}' replication identity settings is not correct. It is either not set or NOTHING")]
169
    SchemaReplicationIdentityError(String),
170

171
    #[error("Column type {0} not supported")]
172
    ColumnTypeNotSupported(String),
173

174
    #[error("CustomTypeNotSupported")]
175
    CustomTypeNotSupported,
176

177
    #[error("ColumnTypeNotFound")]
178
    ColumnTypeNotFound,
179

180
    #[error("Invalid column type")]
181
    InvalidColumnType,
182

183
    #[error("Value conversion error: {0}")]
184
    ValueConversionError(String),
185

186
    #[error("Unsupported replication type - '{0}'")]
187
    UnsupportedReplicationType(String),
188
}
189

190
#[cfg(feature = "snowflake")]
191
#[derive(Error, Debug)]
×
192
pub enum SnowflakeError {
193
    #[error("Snowflake query error")]
194
    QueryError(#[source] Box<DiagnosticRecord>),
195

196
    #[error("Snowflake connection error")]
197
    ConnectionError(#[from] Box<DiagnosticRecord>),
198

199
    #[cfg(feature = "snowflake")]
200
    #[error(transparent)]
201
    SnowflakeSchemaError(#[from] SnowflakeSchemaError),
202

203
    #[error(transparent)]
204
    SnowflakeStreamError(#[from] SnowflakeStreamError),
205
}
206

207
#[cfg(feature = "snowflake")]
208
#[derive(Error, Debug)]
×
209
pub enum SnowflakeSchemaError {
210
    #[error("Column type {0} not supported")]
211
    ColumnTypeNotSupported(String),
212

213
    #[error("Value conversion Error")]
214
    ValueConversionError(#[source] Box<DiagnosticRecord>),
215

216
    #[error("Schema conversion Error: {0}")]
217
    SchemaConversionError(#[source] TryFromIntError),
218

219
    #[error("Decimal convert error")]
220
    DecimalConvertError(#[source] rust_decimal::Error),
221
}
222

223
#[derive(Error, Debug)]
×
224
pub enum SnowflakeStreamError {
225
    #[error("Time travel not available for table")]
226
    TimeTravelNotAvailableError,
227

228
    #[error("Unsupported \"{0}\" action in stream")]
229
    UnsupportedActionInStream(String),
230

231
    #[error("Cannot determine action")]
232
    CannotDetermineAction,
233

234
    #[error("Stream not found")]
235
    StreamNotFound,
×
236
}
237

238
#[derive(Error, Debug)]
×
239
pub enum DebeziumError {
240
    #[error(transparent)]
241
    DebeziumSchemaError(#[from] DebeziumSchemaError),
242

243
    #[error("Connection error")]
244
    DebeziumConnectionError(#[source] kafka::Error),
245

246
    #[error("JSON decode error")]
247
    JsonDecodeError(#[source] serde_json::Error),
248

249
    #[error("Bytes convert error")]
250
    BytesConvertError(#[source] Utf8Error),
251

252
    #[error(transparent)]
253
    DebeziumStreamError(#[from] DebeziumStreamError),
254

255
    #[error("Schema registry fetch failed")]
256
    SchemaRegistryFetchError(#[source] SRCError),
257

258
    #[error("Topic not defined")]
259
    TopicNotDefined,
×
260
}
261

262
#[derive(Error, Debug)]
×
263
pub enum DebeziumStreamError {
264
    #[error("Consume commit error")]
265
    ConsumeCommitError(#[source] kafka::Error),
266

267
    #[error("Message consume error")]
268
    MessageConsumeError(#[source] kafka::Error),
269

270
    #[error("Polling error")]
271
    PollingError(#[source] kafka::Error),
×
272
}
273

274
#[derive(Error, Debug, PartialEq)]
6✔
275
pub enum DebeziumSchemaError {
276
    #[error("Schema definition not found")]
277
    SchemaDefinitionNotFound,
278

279
    #[error("Unsupported \"{0}\" type")]
280
    TypeNotSupported(String),
281

282
    #[error("Field \"{0}\" not found")]
283
    FieldNotFound(String),
284

285
    #[error("Binary decode error")]
286
    BinaryDecodeError(#[source] DecodeError),
287

288
    #[error("Scale not found")]
289
    ScaleNotFound,
290

291
    #[error("Scale is invalid")]
292
    ScaleIsInvalid,
293

294
    #[error("Decimal convert error")]
295
    DecimalConvertError(#[source] rust_decimal::Error),
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc