• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5963889733

24 Aug 2023 01:14PM UTC coverage: 75.612%. First build
5963889733

Pull #1881

github

chubei
feat: `on_event` can subscribe to multiple endpoints at once
Pull Request #1881: feat: `on_event` can subscribe to multiple endpoints at once

78 of 78 new or added lines in 7 files covered. (100.0%)

47016 of 62181 relevant lines covered (75.61%)

89102.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.84
/dozer-api/src/grpc/common/service.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use crate::auth::Access;
5

6
use crate::grpc::shared_impl::{self, EndpointFilter};
7
use crate::grpc::types_helper::map_record;
8
use crate::CacheEndpoint;
9
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
10
use dozer_types::grpc_types::conversions::field_definition_to_grpc;
11
use dozer_types::indexmap::IndexMap;
12
use tokio_stream::wrappers::ReceiverStream;
13
use tonic::{Request, Response, Status};
14

15
use dozer_types::grpc_types::common::{
16
    CountResponse, GetEndpointsRequest, GetEndpointsResponse, GetFieldsRequest, GetFieldsResponse,
17
    OnEventRequest, QueryRequest, QueryResponse,
18
};
19
use dozer_types::grpc_types::types::Operation;
20

21
type EventResult<T> = Result<Response<T>, Status>;
22
type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
23

24
// #[derive(Clone)]
25
pub struct CommonService {
26
    /// For look up endpoint from its name. `key == value.endpoint.name`. Using index map to keep endpoint order.
27
    pub endpoint_map: IndexMap<String, Arc<CacheEndpoint>>,
28
    pub event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
29
}
30

31
impl CommonService {
×
32
    pub fn new(
28✔
33
        endpoints: Vec<Arc<CacheEndpoint>>,
28✔
34
        event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
28✔
35
    ) -> Self {
28✔
36
        let endpoint_map = endpoints
28✔
37
            .into_iter()
28✔
38
            .map(|endpoint| (endpoint.endpoint.name.clone(), endpoint))
28✔
39
            .collect();
28✔
40
        Self {
28✔
41
            endpoint_map,
28✔
42
            event_notifier,
28✔
43
        }
28✔
44
    }
28✔
45

×
46
    fn parse_request(
20✔
47
        &self,
20✔
48
        request: Request<QueryRequest>,
20✔
49
    ) -> Result<(&CacheEndpoint, QueryRequest, Option<Access>), Status> {
20✔
50
        let parts = request.into_parts();
20✔
51
        let mut extensions = parts.1;
20✔
52
        let query_request = parts.2;
20✔
53
        let access = extensions.remove::<Access>();
20✔
54
        let endpoint = &query_request.endpoint;
20✔
55
        let cache_endpoint = self
20✔
56
            .endpoint_map
20✔
57
            .get(endpoint)
20✔
58
            .map_or(Err(Status::invalid_argument(endpoint)), Ok)?;
20✔
59
        Ok((cache_endpoint, query_request, access))
20✔
60
    }
20✔
61
}
62

63
#[tonic::async_trait]
64
impl CommonGrpcService for CommonService {
×
65
    async fn count(
6✔
66
        &self,
6✔
67
        request: Request<QueryRequest>,
6✔
68
    ) -> Result<Response<CountResponse>, Status> {
6✔
69
        let (cache_endpoint, query_request, access) = self.parse_request(request)?;
6✔
70

×
71
        let count = shared_impl::count(
6✔
72
            &cache_endpoint.cache_reader(),
6✔
73
            query_request.query.as_deref(),
6✔
74
            &cache_endpoint.endpoint.name,
6✔
75
            access,
6✔
76
        )?;
6✔
77

×
78
        let reply = CountResponse {
6✔
79
            count: count as u64,
6✔
80
        };
6✔
81
        Ok(Response::new(reply))
6✔
82
    }
12✔
83

×
84
    async fn query(
14✔
85
        &self,
14✔
86
        request: Request<QueryRequest>,
14✔
87
    ) -> Result<Response<QueryResponse>, Status> {
14✔
88
        let (cache_endpoint, query_request, access) = self.parse_request(request)?;
14✔
89

×
90
        let cache_reader = cache_endpoint.cache_reader();
14✔
91
        let records = shared_impl::query(
14✔
92
            &cache_reader,
14✔
93
            query_request.query.as_deref(),
14✔
94
            &cache_endpoint.endpoint.name,
14✔
95
            access,
14✔
96
        )?;
14✔
97
        let schema = &cache_reader.get_schema().0;
14✔
98

14✔
99
        let fields = field_definition_to_grpc(schema.fields.clone());
14✔
100
        let records = records.into_iter().map(map_record).collect();
14✔
101
        let reply = QueryResponse { fields, records };
14✔
102

14✔
103
        Ok(Response::new(reply))
14✔
104
    }
28✔
105

106
    type OnEventStream = ResponseStream;
107

×
108
    async fn on_event(&self, request: Request<OnEventRequest>) -> EventResult<Self::OnEventStream> {
1✔
109
        let parts = request.into_parts();
1✔
110
        let extensions = parts.1;
1✔
111
        let query_request = parts.2;
1✔
112
        let access = extensions.get::<Access>();
1✔
113

1✔
114
        let mut endpoints = HashMap::new();
1✔
115
        for (endpoint, filter) in query_request.endpoints {
2✔
116
            let cache_endpoint = self
1✔
117
                .endpoint_map
1✔
118
                .get(&endpoint)
1✔
119
                .ok_or_else(|| Status::invalid_argument(&endpoint))?;
1✔
120
            let schema = cache_endpoint.cache_reader().get_schema().0.clone();
1✔
121
            endpoints.insert(
1✔
122
                endpoint,
1✔
123
                EndpointFilter::new(schema, filter.filter.as_deref())?,
1✔
124
            );
×
125
        }
×
126

127
        shared_impl::on_event(
1✔
128
            endpoints,
1✔
129
            self.event_notifier.as_ref().map(|r| r.resubscribe()),
1✔
130
            access.cloned(),
1✔
131
            Ok,
1✔
132
        )
1✔
133
    }
2✔
134

×
135
    async fn get_endpoints(
1✔
136
        &self,
1✔
137
        _: Request<GetEndpointsRequest>,
1✔
138
    ) -> Result<Response<GetEndpointsResponse>, Status> {
1✔
139
        let endpoints = self.endpoint_map.keys().cloned().collect();
1✔
140
        Ok(Response::new(GetEndpointsResponse { endpoints }))
1✔
141
    }
1✔
142

×
143
    async fn get_fields(
1✔
144
        &self,
1✔
145
        request: Request<GetFieldsRequest>,
1✔
146
    ) -> Result<Response<GetFieldsResponse>, Status> {
1✔
147
        let request = request.into_inner();
1✔
148
        let endpoint = request.endpoint;
1✔
149
        let cache_endpoint = self
1✔
150
            .endpoint_map
1✔
151
            .get(&endpoint)
1✔
152
            .map_or(Err(Status::invalid_argument(&endpoint)), Ok)?;
1✔
153

×
154
        let cache_reader = cache_endpoint.cache_reader();
1✔
155
        let schema = &cache_reader.get_schema().0;
1✔
156

1✔
157
        let fields = field_definition_to_grpc(schema.fields.clone());
1✔
158

1✔
159
        let primary_index = schema.primary_index.iter().map(|f| *f as i32).collect();
1✔
160
        Ok(Response::new(GetFieldsResponse {
1✔
161
            primary_index,
1✔
162
            fields,
1✔
163
        }))
1✔
164
    }
2✔
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc