• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.94
/dozer-ingestion/src/connectors/grpc/connector.rs
1
use std::fmt::Debug;
2
use std::path::Path;
3

4
use super::adapter::{GrpcIngestor, IngestAdapter};
5
use super::ingest::IngestorServiceImpl;
6
use crate::connectors::{table_name, SourceSchema, SourceSchemaResult, TableIdentifier};
7
use crate::{
8
    connectors::{Connector, TableInfo},
9
    errors::ConnectorError,
10
    ingestion::Ingestor,
11
};
12
use dozer_types::grpc_types::ingest::ingest_service_server::IngestServiceServer;
13
use dozer_types::ingestion_types::GrpcConfig;
14
use dozer_types::log::{info, warn};
15
use dozer_types::tracing::Level;
16
use tonic::async_trait;
17
use tonic::transport::Server;
18
use tower_http::trace::{self, TraceLayer};
19

20
pub struct GrpcConnector<T>
21
where
22
    T: IngestAdapter,
23
{
24
    pub id: u64,
25
    pub name: String,
26
    pub config: GrpcConfig,
27
    _phantom: std::marker::PhantomData<T>,
28
}
29
impl<T> Debug for GrpcConnector<T>
30
where
31
    T: IngestAdapter,
32
{
33
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
34
        f.debug_struct("GrpcConnector")
×
35
            .field("id", &self.id)
×
36
            .field("name", &self.name)
×
37
            .field("config", &self.config)
×
38
            .finish()
×
39
    }
×
40
}
41

42
impl<T> GrpcConnector<T>
43
where
44
    T: IngestAdapter,
45
{
46
    pub fn new(id: u64, name: String, config: GrpcConfig) -> Result<Self, ConnectorError> {
62✔
47
        Ok(Self {
62✔
48
            id,
62✔
49
            name,
62✔
50
            config,
62✔
51
            _phantom: std::marker::PhantomData,
62✔
52
        })
62✔
53
    }
62✔
54

55
    pub fn parse_config(config: &GrpcConfig) -> Result<String, ConnectorError>
78✔
56
    where
78✔
57
        T: IngestAdapter,
78✔
58
    {
78✔
59
        let schemas = config.schemas.as_ref().map_or_else(
78✔
60
            || {
78✔
61
                Err(ConnectorError::InitializationError(
×
62
                    "schemas not found".to_string(),
×
63
                ))
×
64
            },
78✔
65
            Ok,
78✔
66
        )?;
78✔
67
        let schemas_str = match schemas {
78✔
68
            dozer_types::ingestion_types::GrpcConfigSchemas::Inline(schemas_str) => {
78✔
69
                schemas_str.clone()
78✔
70
            }
71
            dozer_types::ingestion_types::GrpcConfigSchemas::Path(path) => {
×
72
                let path = Path::new(path);
×
73
                std::fs::read_to_string(path)
×
74
                    .map_err(|e| ConnectorError::InitializationError(e.to_string()))?
×
75
            }
76
        };
77

78
        Ok(schemas_str)
78✔
79
    }
78✔
80

81
    pub async fn serve(&self, ingestor: &Ingestor) -> Result<(), ConnectorError> {
14✔
82
        let host = &self.config.host;
14✔
83
        let port = self.config.port;
14✔
84

85
        let addr = format!("{host:}:{port:}").parse().map_err(|e| {
14✔
86
            ConnectorError::InitializationError(format!("Failed to parse address: {e}"))
×
87
        })?;
14✔
88

89
        let schemas_str = Self::parse_config(&self.config)?;
14✔
90
        let adapter = GrpcIngestor::<T>::new(schemas_str)?;
14✔
91

92
        // Ingestor will live as long as the server
93
        // Refactor to use Arc
94
        let ingestor = unsafe { std::mem::transmute::<&'_ Ingestor, &'static Ingestor>(ingestor) };
14✔
95

14✔
96
        let ingest_service = IngestorServiceImpl::new(adapter, ingestor);
14✔
97
        let ingest_service = tonic_web::config()
14✔
98
            .allow_all_origins()
14✔
99
            .enable(IngestServiceServer::new(ingest_service));
14✔
100

14✔
101
        let reflection_service = tonic_reflection::server::Builder::configure()
14✔
102
            .register_encoded_file_descriptor_set(
14✔
103
                dozer_types::grpc_types::ingest::FILE_DESCRIPTOR_SET,
14✔
104
            )
14✔
105
            .build()
14✔
106
            .unwrap();
14✔
107
        info!("Starting Dozer GRPC Ingestor  on http://{}:{} ", host, port,);
×
108
        Server::builder()
14✔
109
            .layer(
14✔
110
                TraceLayer::new_for_http()
14✔
111
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
14✔
112
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
14✔
113
            )
14✔
114
            .accept_http1(true)
14✔
115
            .add_service(ingest_service)
14✔
116
            .add_service(reflection_service)
14✔
117
            .serve(addr)
14✔
118
            .await
14✔
119
            .map_err(|e| ConnectorError::InitializationError(e.to_string()))
×
120
    }
×
121
}
122

123
impl<T: IngestAdapter> GrpcConnector<T> {
124
    fn get_all_schemas(&self) -> Result<Vec<(String, SourceSchema)>, ConnectorError> {
34✔
125
        let schemas_str = Self::parse_config(&self.config)?;
34✔
126
        let adapter = GrpcIngestor::<T>::new(schemas_str)?;
34✔
127
        adapter.get_schemas()
34✔
128
    }
34✔
129
}
130

131
#[async_trait]
132
impl<T> Connector for GrpcConnector<T>
133
where
134
    T: IngestAdapter,
135
{
136
    fn types_mapping() -> Vec<(String, Option<dozer_types::types::FieldType>)>
×
137
    where
×
138
        Self: Sized,
×
139
    {
×
140
        todo!()
×
141
    }
×
142

143
    async fn validate_connection(&self) -> Result<(), ConnectorError> {
×
144
        self.get_all_schemas().map(|_| ())
×
145
    }
×
146

147
    async fn list_tables(&self) -> Result<Vec<TableIdentifier>, ConnectorError> {
32✔
148
        Ok(self
32✔
149
            .get_all_schemas()?
32✔
150
            .into_iter()
32✔
151
            .map(|(name, _)| TableIdentifier::from_table_name(name))
50✔
152
            .collect())
32✔
153
    }
64✔
154

155
    async fn validate_tables(&self, tables: &[TableIdentifier]) -> Result<(), ConnectorError> {
×
156
        let schemas = self.get_all_schemas()?;
×
157
        for table in tables {
×
158
            if !schemas
×
159
                .iter()
×
160
                .any(|(name, _)| name == &table.name && table.schema.is_none())
×
161
            {
162
                return Err(ConnectorError::TableNotFound(table_name(
×
163
                    table.schema.as_deref(),
×
164
                    &table.name,
×
165
                )));
×
166
            }
×
167
        }
168
        Ok(())
×
169
    }
×
170

171
    async fn list_columns(
2✔
172
        &self,
2✔
173
        tables: Vec<TableIdentifier>,
2✔
174
    ) -> Result<Vec<TableInfo>, ConnectorError> {
2✔
175
        let schemas = self.get_all_schemas()?;
2✔
176
        let mut result = vec![];
2✔
177
        for table in tables {
4✔
178
            if let Some((_, schema)) = schemas
2✔
179
                .iter()
2✔
180
                .find(|(name, _)| name == &table.name && table.schema.is_none())
2✔
181
            {
182
                let column_names = schema
2✔
183
                    .schema
2✔
184
                    .fields
2✔
185
                    .iter()
2✔
186
                    .map(|field| field.name.clone())
4✔
187
                    .collect();
2✔
188
                result.push(TableInfo {
2✔
189
                    schema: table.schema,
2✔
190
                    name: table.name,
2✔
191
                    column_names,
2✔
192
                })
2✔
193
            } else {
194
                return Err(ConnectorError::TableNotFound(table_name(
×
195
                    table.schema.as_deref(),
×
196
                    &table.name,
×
197
                )));
×
198
            }
199
        }
200
        Ok(result)
2✔
201
    }
4✔
202

203
    async fn get_schemas(
30✔
204
        &self,
30✔
205
        table_infos: &[TableInfo],
30✔
206
    ) -> Result<Vec<SourceSchemaResult>, ConnectorError> {
30✔
207
        let schemas_str = Self::parse_config(&self.config)?;
30✔
208
        let adapter = GrpcIngestor::<T>::new(schemas_str)?;
30✔
209

210
        let schemas = adapter.get_schemas()?;
30✔
211

212
        let mut result = vec![];
30✔
213
        for table in table_infos {
78✔
214
            if let Some((_, schema)) = schemas
48✔
215
                .iter()
48✔
216
                .find(|(name, _)| name == &table.name && table.schema.is_none())
66✔
217
            {
218
                warn!("TODO: filter columns");
48✔
219
                result.push(Ok(schema.clone()));
48✔
220
            } else {
×
221
                result.push(Err(ConnectorError::TableNotFound(table_name(
×
222
                    table.schema.as_deref(),
×
223
                    &table.name,
×
224
                ))));
×
225
            }
×
226
        }
227

228
        Ok(result)
30✔
229
    }
60✔
230

231
    async fn start(
14✔
232
        &self,
14✔
233
        ingestor: &Ingestor,
14✔
234
        _table_names: Vec<TableInfo>,
14✔
235
    ) -> Result<(), ConnectorError> {
14✔
236
        self.serve(ingestor).await
14✔
237
    }
14✔
238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc