• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.0
/dozer-core/src/dag/errors.rs
1
#![allow(clippy::enum_variant_names)]
2
use crate::dag::appsource::AppSourceId;
3
use crate::dag::node::{NodeHandle, PortHandle};
4
use crate::storage::errors::StorageError;
5
use dozer_types::errors::internal::BoxedError;
6
use dozer_types::errors::types::TypeError;
7
use dozer_types::thiserror;
8
use dozer_types::thiserror::Error;
9

10
#[derive(Error, Debug)]
4✔
11
pub enum ExecutionError {
12
    #[error("Adding this edge would have created a cycle")]
13
    WouldCycle,
14
    #[error("Invalid port handle: {0}")]
15
    InvalidPortHandle(PortHandle),
16
    #[error("Invalid node handle: {0}")]
17
    InvalidNodeHandle(NodeHandle),
18
    #[error("Missing input for node {node}")]
19
    MissingInput { node: NodeHandle },
20
    #[error("Invalid operation: {0}")]
21
    InvalidOperation(String),
22
    #[error("Schema not initialized")]
23
    SchemaNotInitialized,
24
    #[error("The node {0} does not have any input")]
25
    MissingNodeInput(NodeHandle),
26
    #[error("The node {0} does not have any output")]
27
    MissingNodeOutput(NodeHandle),
28
    #[error("The database is invalid")]
29
    InvalidDatabase,
30
    #[error("Field not found at position {0}")]
31
    FieldNotFound(String),
32
    #[error("Port not found in source for schema_id: {0}.")]
33
    PortNotFound(String),
34
    #[error("Replication type not found")]
35
    ReplicationTypeNotFound,
36
    #[error("Record not found")]
37
    RecordNotFound(),
38
    #[error("Already exists: {0}")]
39
    MetadataAlreadyExists(NodeHandle),
40
    #[error("Incompatible schemas")]
41
    IncompatibleSchemas(),
42
    #[error("Channel disconnected")]
43
    ChannelDisconnected,
44
    #[error("Cannot spawn worker thread: {0}")]
45
    CannotSpawnWorkerThread(#[from] std::io::Error),
46
    #[error("Internal thread panicked")]
47
    InternalThreadPanic,
48
    #[error("Invalid source identifier {0}")]
49
    InvalidSourceIdentifier(AppSourceId),
50
    #[error("Ambiguous source identifier {0}")]
51
    AmbiguousSourceIdentifier(AppSourceId),
52
    #[error("Inconsistent checkpointing data")]
53
    InconsistentCheckpointMetadata,
54
    #[error("Port not found for source: {0}")]
55
    PortNotFoundInSource(PortHandle),
56
    #[error("Failed to get output schema: {0}")]
57
    FailedToGetOutputSchema(String),
58
    #[error("Update operation not supported: {0}")]
59
    UnsupportedUpdateOperation(String),
60
    #[error("Delete operation not supported: {0}")]
61
    UnsupportedDeleteOperation(String),
62
    #[error("Invalid AppSource connection {0}. Already exists.")]
63
    AppSourceConnectionAlreadyExists(String),
64
    #[error("Failed to get primary key for `{0}`")]
65
    FailedToGetPrimaryKey(String),
66
    #[error("Got mismatching primary key for `{endpoint_name}`. Expected: `{expected:?}`, got: `{actual:?}`")]
67
    MismatchPrimaryKey {
68
        endpoint_name: String,
69
        expected: Vec<String>,
70
        actual: Vec<String>,
71
    },
72

73
    // Error forwarders
74
    #[error(transparent)]
75
    InternalTypeError(#[from] TypeError),
76
    #[error(transparent)]
77
    InternalDatabaseError(#[from] StorageError),
78
    #[error(transparent)]
79
    InternalError(#[from] BoxedError),
80
    #[error("{0}. Has dozer been initialized (`dozer init`)?")]
81
    SinkError(#[source] SinkError),
82

83
    #[error("Failed to initialize source: {0}")]
84
    ConnectorError(#[source] BoxedError),
85
    // to remove
86
    #[error("{0}")]
87
    InternalStringError(String),
88

89
    #[error("Channel returned empty message in sink. Might be an issue with the sender: {0}, {1}")]
90
    SinkReceiverError(usize, #[source] BoxedError),
91

92
    #[error(
93
        "Channel returned empty message in processor. Might be an issue with the sender: {0}, {1}"
94
    )]
95
    ProcessorReceiverError(usize, #[source] BoxedError),
96

97
    #[error(transparent)]
98
    JoinError(JoinError),
99

100
    #[error(transparent)]
101
    SourceError(SourceError),
102
}
×
103

104
impl<T> From<daggy::WouldCycle<T>> for ExecutionError {
105
    fn from(_: daggy::WouldCycle<T>) -> Self {
×
106
        ExecutionError::WouldCycle
×
107
    }
×
108
}
109

110
#[derive(Error, Debug)]
×
111
pub enum SinkError {
112
    #[error("Failed to initialize schema in Sink: {0}")]
113
    SchemaUpdateFailed(#[source] BoxedError),
114

115
    #[error("Failed to begin cache transaction: {0}")]
116
    CacheBeginTransactionFailed(#[source] BoxedError),
117

118
    #[error("Failed to insert record in Sink: {0}")]
119
    CacheInsertFailed(#[source] BoxedError),
120

121
    #[error("Failed to delete record in Sink: {0}")]
122
    CacheDeleteFailed(#[source] BoxedError),
123

124
    #[error("Failed to update record in Sink: {0}")]
125
    CacheUpdateFailed(#[source] BoxedError),
126

×
127
    #[error("Failed to commit cache transaction: {0}")]
128
    CacheCommitTransactionFailed(#[source] BoxedError),
129

130
    #[error("Failed to initialize schema in Sink: {0}")]
131
    CacheCountFailed(#[source] BoxedError),
132
}
133

134
#[derive(Error, Debug)]
×
135
pub enum JoinError {
136
    #[error("Failed to find table in Join during Insert: {0}")]
137
    InsertPortError(PortHandle),
138
    #[error("Failed to find table in Join during Delete: {0}")]
×
139
    DeletePortError(PortHandle),
140
    #[error("Failed to find table in Join during Update: {0}")]
141
    UpdatePortError(PortHandle),
142
    #[error("Join ports are not properly initialized")]
143
    PortNotConnected(PortHandle),
144
}
145

146
#[derive(Error, Debug)]
×
147
pub enum SourceError {
148
    #[error("Failed to find table in Source: {0:?}")]
149
    PortError(String),
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc