• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.71
/dozer-api/src/grpc/internal/internal_pipeline_server.rs
1
use crossbeam::channel::{Receiver, Sender};
2
use dozer_types::grpc_types::internal::{StatusUpdate, StatusUpdateRequest};
3
use dozer_types::{crossbeam, log::info, models::app_config::Config, tracing::warn};
4
use dozer_types::{
5
    grpc_types::{
6
        internal::{
7
            internal_pipeline_service_server::{self, InternalPipelineService},
8
            AliasEventsRequest, AliasRedirected, OperationsRequest,
9
        },
10
        types::Operation,
11
    },
12
    log::debug,
13
};
14
use std::{fmt::Debug, net::ToSocketAddrs, pin::Pin, thread};
15
use tokio::sync::broadcast;
16
use tokio_stream::wrappers::ReceiverStream;
17
use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status};
18

19
pub type PipelineEventSenders = (
20
    Sender<AliasRedirected>,
21
    Sender<Operation>,
22
    Sender<StatusUpdate>,
23
);
24
pub type PipelineEventReceivers = (
25
    Receiver<AliasRedirected>,
26
    Receiver<Operation>,
27
    Receiver<StatusUpdate>,
28
);
29

30
pub struct InternalPipelineServer {
31
    alias_redirected_receiver: broadcast::Receiver<AliasRedirected>,
32
    operation_receiver: broadcast::Receiver<Operation>,
33
    status_updates_receiver: broadcast::Receiver<StatusUpdate>,
34
}
35
impl InternalPipelineServer {
36
    pub fn new(pipeline_event_receivers: PipelineEventReceivers) -> Self {
15✔
37
        let alias_redirected_receiver =
15✔
38
            crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.0);
15✔
39
        let operation_receiver =
15✔
40
            crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.1);
15✔
41
        let status_updates_receiver =
15✔
42
            crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.2);
15✔
43
        Self {
15✔
44
            alias_redirected_receiver,
15✔
45
            operation_receiver,
15✔
46
            status_updates_receiver,
15✔
47
        }
15✔
48
    }
15✔
49
}
50

51
fn crossbeam_mpsc_receiver_to_tokio_broadcast_receiver<T: Clone + Debug + Send + 'static>(
45✔
52
    crossbeam_receiver: Receiver<T>,
45✔
53
) -> broadcast::Receiver<T> {
45✔
54
    let (broadcast_sender, broadcast_receiver) = broadcast::channel(16);
45✔
55
    thread::Builder::new().name("crossbeam_mpsc_receiver_to_tokio_broadcast_receiver".to_string()).spawn(move || loop {
45✔
56
        let message = crossbeam_receiver.recv();
80✔
57
        match message {
80✔
58
            Ok(message) => {
80✔
59
                let result = broadcast_sender.send(message);
80✔
60
                if let Err(e) = result {
80✔
61
                    warn!("Internal Pipeline server - Error sending message to broadcast channel: {:?}", e);
45✔
62
                }
35✔
63
            }
64
            Err(err) => {
×
65
                debug!(
×
66
                    "Error receiving: {:?}. Exiting crossbeam_mpsc_receiver_to_tokio_broadcast_receiver thread",
×
67
                    err
68
                );
69
                break;
×
70
            }
×
71
        }
×
72
    }).expect("Failed to spawn crossbeam_mpsc_receiver_to_tokio_broadcast_receiver thread");
45✔
73
    broadcast_receiver
45✔
74
}
45✔
75

76
type OperationsStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
77
type AliasEventsStream = Pin<Box<dyn Stream<Item = Result<AliasRedirected, Status>> + Send>>;
78
type StatusUpdateStream = Pin<Box<dyn Stream<Item = Result<StatusUpdate, Status>> + Send>>;
79

80
#[tonic::async_trait]
81
impl InternalPipelineService for InternalPipelineServer {
82
    type StreamOperationsStream = OperationsStream;
83
    async fn stream_operations(
×
84
        &self,
×
85
        _request: tonic::Request<OperationsRequest>,
×
86
    ) -> Result<Response<OperationsStream>, Status> {
×
87
        let (operation_sender, operation_receiver) = tokio::sync::mpsc::channel(1000);
×
88
        let mut receiver = self.operation_receiver.resubscribe();
×
89
        tokio::spawn(async move {
×
90
            loop {
91
                let result = receiver.try_recv();
×
92
                match result {
×
93
                    Ok(operation) => {
×
94
                        let result = operation_sender.send(Ok(operation)).await;
×
95
                        if let Err(e) = result {
×
96
                            warn!("Error sending message to mpsc channel: {:?}", e);
×
97
                            break;
×
98
                        }
×
99
                    }
100
                    Err(err) => {
×
101
                        if err == broadcast::error::TryRecvError::Closed {
×
102
                            break;
×
103
                        }
×
104
                    }
105
                }
106
                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
×
107
            }
108
        });
×
109
        let output_stream = ReceiverStream::new(operation_receiver);
×
110
        Ok(Response::new(Box::pin(output_stream)))
×
111
    }
×
112

113
    type StreamAliasEventsStream = AliasEventsStream;
114

115
    async fn stream_alias_events(
×
116
        &self,
×
117
        _request: tonic::Request<AliasEventsRequest>,
×
118
    ) -> Result<Response<Self::StreamAliasEventsStream>, Status> {
×
119
        let (alias_redirected_sender, alias_redirected_receiver) = tokio::sync::mpsc::channel(1000);
×
120
        let mut receiver = self.alias_redirected_receiver.resubscribe();
×
121
        tokio::spawn(async move {
×
122
            loop {
123
                let result = receiver.try_recv();
×
124
                match result {
×
125
                    Ok(alias_redirected) => {
×
126
                        let result = alias_redirected_sender.send(Ok(alias_redirected)).await;
×
127
                        if let Err(e) = result {
×
128
                            warn!("Error sending message to mpsc channel: {:?}", e);
×
129
                            break;
×
130
                        }
×
131
                    }
132
                    Err(err) => {
×
133
                        if err == broadcast::error::TryRecvError::Closed {
×
134
                            break;
×
135
                        }
×
136
                    }
137
                }
138
                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
×
139
            }
140
        });
×
141
        let output_stream = ReceiverStream::new(alias_redirected_receiver);
×
142
        Ok(Response::new(Box::pin(output_stream)))
×
143
    }
×
144

145
    type StreamStatusUpdatesStream = StatusUpdateStream;
146

147
    async fn stream_status_updates(
×
148
        &self,
×
149
        _request: tonic::Request<StatusUpdateRequest>,
×
150
    ) -> Result<Response<Self::StreamStatusUpdatesStream>, Status> {
×
151
        let (status_updates_sender, status_updates_receiver) = tokio::sync::mpsc::channel(1000);
×
152
        let mut receiver = self.status_updates_receiver.resubscribe();
×
153
        tokio::spawn(async move {
×
154
            loop {
155
                let result = receiver.try_recv();
×
156
                match result {
×
157
                    Ok(status_update) => {
×
158
                        let result = status_updates_sender.send(Ok(status_update)).await;
×
159
                        if let Err(e) = result {
×
160
                            warn!("Error sending message to mpsc channel: {:?}", e);
×
161
                            break;
×
162
                        }
×
163
                    }
164
                    Err(err) => {
×
165
                        if err == broadcast::error::TryRecvError::Closed {
×
166
                            break;
×
167
                        }
×
168
                    }
169
                }
170
                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
×
171
            }
172
        });
×
173
        let output_stream = ReceiverStream::new(status_updates_receiver);
×
174
        Ok(Response::new(Box::pin(output_stream)))
×
175
    }
×
176
}
177

178
pub async fn start_internal_pipeline_server(
15✔
179
    app_config: Config,
15✔
180
    receivers: PipelineEventReceivers,
15✔
181
) -> Result<(), tonic::transport::Error> {
15✔
182
    let server = InternalPipelineServer::new(receivers);
12✔
183

12✔
184
    let internal_config = app_config
12✔
185
        .api
12✔
186
        .unwrap_or_default()
12✔
187
        .app_grpc
12✔
188
        .unwrap_or_default();
12✔
189

190
    info!(
191
        "Starting Internal Server on http://{}:{}",
×
192
        internal_config.host, internal_config.port,
193
    );
194
    let mut addr = format!("{}:{}", internal_config.host, internal_config.port)
12✔
195
        .to_socket_addrs()
12✔
196
        .unwrap();
12✔
197
    Server::builder()
12✔
198
        .add_service(internal_pipeline_service_server::InternalPipelineServiceServer::new(server))
12✔
199
        .serve(addr.next().unwrap())
12✔
200
        .await
×
201
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc