• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.24
/apps/emqx_mongodb/src/emqx_mongodb.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_mongodb).
17

18
-include_lib("emqx_connector/include/emqx_connector.hrl").
19
-include_lib("typerefl/include/types.hrl").
20
-include_lib("hocon/include/hoconsc.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
-behaviour(emqx_resource).
25
-behaviour(hocon_schema).
26

27
%% callbacks of behaviour emqx_resource
28
-export([
29
    callback_mode/0,
30
    on_start/2,
31
    on_stop/2,
32
    on_query/3,
33
    on_get_status/2,
34
    namespace/0
35
]).
36

37
%% ecpool callback
38
-export([connect/1]).
39

40
-export([roots/0, fields/1, desc/1]).
41

42
-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
43

44
%% for testing
45
-export([maybe_resolve_srv_and_txt_records/1]).
46

47
-define(HEALTH_CHECK_TIMEOUT, 30000).
48

49
%% mongo servers don't need parse
50
-define(MONGO_HOST_OPTIONS, #{
51
    default_port => ?MONGO_DEFAULT_PORT
52
}).
53

54
%%=====================================================================
55

56
namespace() -> "mongo".
7,069✔
57

58
roots() ->
59
    [
12✔
60
        {config, #{
61
            type => hoconsc:union(
62
                [
63
                    hoconsc:ref(?MODULE, single),
64
                    hoconsc:ref(?MODULE, rs),
65
                    hoconsc:ref(?MODULE, sharded)
66
                ]
67
            )
68
        }}
69
    ].
70

71
fields("connector_rs") ->
72
    [
1,084✔
73
        {mongo_type, #{
74
            required => true,
75
            type => rs,
76
            default => rs,
77
            desc => ?DESC("rs_mongo_type")
78
        }},
79
        {servers, servers()},
80
        {w_mode, fun w_mode/1},
81
        {r_mode, fun r_mode/1},
82
        {replica_set_name, fun replica_set_name/1}
83
    ];
84
fields("connector_sharded") ->
85
    [
1,100✔
86
        {mongo_type, #{
87
            required => true,
88
            type => sharded,
89
            default => sharded,
90
            desc => ?DESC("sharded_mongo_type")
91
        }},
92
        {servers, servers()},
93
        {w_mode, fun w_mode/1}
94
    ];
95
fields("connector_single") ->
96
    [
1,182✔
97
        {mongo_type, #{
98
            required => true,
99
            type => single,
100
            default => single,
101
            desc => ?DESC("single_mongo_type")
102
        }},
103
        {server, server()},
104
        {w_mode, fun w_mode/1}
105
    ];
106
fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded ->
107
    fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb);
1,929✔
108
fields(mongodb) ->
109
    [
110
        {srv_record, fun srv_record/1},
111
        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
112
        {username, fun emqx_connector_schema_lib:username/1},
113
        {password, emqx_connector_schema_lib:password_field()},
114
        {use_legacy_protocol,
115
            hoconsc:mk(hoconsc:enum([auto, true, false]), #{
116
                default => auto,
117
                desc => ?DESC("use_legacy_protocol")
118
            })},
119
        {auth_source, #{
120
            type => binary(),
121
            required => false,
122
            desc => ?DESC("auth_source")
123
        }},
124
        {database, fun emqx_connector_schema_lib:database/1},
125
        {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
126
    ] ++
11,759✔
127
        emqx_connector_schema_lib:ssl_fields();
128
fields(topology) ->
129
    [
2,441✔
130
        {pool_size,
131
            hoconsc:mk(
132
                pos_integer(),
133
                #{
134
                    importance => ?IMPORTANCE_HIDDEN,
135
                    default => 10
136
                }
137
            )},
138
        {max_overflow, fun max_overflow/1},
139
        {overflow_ttl, duration("overflow_ttl")},
140
        {overflow_check_period, duration("overflow_check_period")},
141
        {local_threshold_ms, duration("local_threshold")},
142
        {connect_timeout_ms, duration("connect_timeout")},
143
        {socket_timeout_ms, duration("socket_timeout")},
144
        {server_selection_timeout_ms, duration("server_selection_timeout")},
145
        {wait_queue_timeout_ms, duration("wait_queue_timeout")},
146
        {heartbeat_frequency_ms,
147
            hoconsc:mk(
148
                emqx_schema:timeout_duration_ms(),
149
                #{
150
                    default => <<"200s">>,
151
                    desc => ?DESC("heartbeat_period")
152
                }
153
            )},
154
        {min_heartbeat_frequency_ms, duration("min_heartbeat_period")}
155
    ].
156

157
desc("connector_single") ->
158
    ?DESC("desc_single");
120✔
159
desc("connector_rs") ->
160
    ?DESC("desc_rs");
34✔
161
desc("connector_sharded") ->
162
    ?DESC("desc_sharded");
68✔
163
desc(single) ->
164
    ?DESC("desc_single");
6✔
165
desc(rs) ->
166
    ?DESC("desc_rs");
4✔
167
desc(sharded) ->
168
    ?DESC("desc_sharded");
×
169
desc(topology) ->
170
    ?DESC("desc_topology");
185✔
171
desc(_) ->
172
    undefined.
×
173

174
%% ===================================================================
175

176
callback_mode() -> always_sync.
69✔
177

178
on_start(
179
    InstId,
180
    Config = #{
181
        mongo_type := Type,
182
        pool_size := PoolSize,
183
        ssl := SSL
184
    }
185
) ->
186
    Msg =
68✔
187
        case Type of
188
            single -> "starting_mongodb_single_connector";
28✔
189
            rs -> "starting_mongodb_replica_set_connector";
24✔
190
            sharded -> "starting_mongodb_sharded_connector"
16✔
191
        end,
192
    ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}),
68✔
193
    NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
68✔
194
    SslOpts =
68✔
195
        case maps:get(enable, SSL) of
196
            true ->
197
                [
6✔
198
                    {ssl, true},
199
                    {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
200
                ];
201
            false ->
202
                [{ssl, false}]
62✔
203
        end,
204
    Topology = maps:get(topology, NConfig, #{}),
68✔
205
    Opts = [
68✔
206
        {mongo_type, init_type(NConfig)},
207
        {hosts, Hosts},
208
        {pool_size, PoolSize},
209
        {options, init_topology_options(maps:to_list(Topology), [])},
210
        {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
211
    ],
212
    Collection = maps:get(collection, Config, <<"mqtt">>),
68✔
213
    case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
68✔
214
        ok ->
215
            {ok, #{
54✔
216
                pool_name => InstId,
217
                type => Type,
218
                collection => Collection
219
            }};
220
        {error, Reason} ->
221
            {error, Reason}
14✔
222
    end.
223

224
on_stop(InstId, _State) ->
225
    ?SLOG(info, #{
48✔
226
        msg => "stopping_mongodb_connector",
227
        connector => InstId
228
    }),
48✔
229
    emqx_resource_pool:stop(InstId).
48✔
230

231
on_query(
232
    InstId,
233
    {_ChannelId, Document},
234
    #{pool_name := PoolName, collection := Collection} = State
235
) ->
236
    Request = {insert, Collection, Document},
31✔
237
    ?TRACE(
31✔
238
        "QUERY",
31✔
239
        "mongodb_connector_received",
240
        #{request => Request, connector => InstId, state => State}
31✔
241
    ),
242
    case
31✔
243
        ecpool:pick_and_do(
244
            PoolName,
245
            {?MODULE, mongo_insert, [Collection, Document]},
246
            no_handover
247
        )
248
    of
249
        {{false, Reason}, _Document} ->
250
            ?SLOG(error, #{
×
251
                msg => "mongodb_connector_do_query_failed",
252
                request => Request,
253
                reason => Reason,
254
                connector => InstId
255
            }),
×
256
            {error, Reason};
×
257
        {error, ecpool_empty} ->
258
            {error, {recoverable_error, ecpool_empty}};
×
259
        {{true, _Info}, _Document} ->
260
            ok
31✔
261
    end;
262
on_query(
263
    InstId,
264
    {Action, Collection, Filter, Projector},
265
    #{pool_name := PoolName} = State
266
) ->
267
    Request = {Action, Collection, Filter, Projector},
36✔
268
    ?TRACE(
36✔
269
        "QUERY",
36✔
270
        "mongodb_connector_received",
271
        #{request => Request, connector => InstId, state => State}
36✔
272
    ),
273
    case
36✔
274
        ecpool:pick_and_do(
275
            PoolName,
276
            {?MODULE, mongo_query, [Action, Collection, Filter, Projector]},
277
            no_handover
278
        )
279
    of
280
        {error, Reason} ->
281
            ?SLOG(error, #{
×
282
                msg => "mongodb_connector_do_query_failed",
283
                request => Request,
284
                reason => Reason,
285
                connector => InstId
286
            }),
×
287
            case Reason of
×
288
                ecpool_empty ->
289
                    {error, {recoverable_error, Reason}};
×
290
                _ ->
291
                    {error, Reason}
×
292
            end;
293
        {ok, Cursor} when is_pid(Cursor) ->
294
            {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
36✔
295
        Result ->
UNCOV
296
            {ok, Result}
×
297
    end.
298

299
on_get_status(InstId, State = #{pool_name := PoolName}) ->
300
    case health_check(PoolName) of
290✔
301
        ok ->
302
            ?tp(debug, emqx_connector_mongo_health_check, #{
284✔
303
                instance_id => InstId,
304
                status => ok
305
            }),
306
            connected;
284✔
307
        {error, Reason} ->
308
            ?tp(warning, emqx_connector_mongo_health_check, #{
6✔
309
                instance_id => InstId,
310
                reason => Reason,
311
                status => failed
312
            }),
313
            {disconnected, State, Reason}
6✔
314
    end.
315

316
health_check(PoolName) ->
317
    Results =
290✔
318
        emqx_resource_pool:health_check_workers(
319
            PoolName,
320
            fun ?MODULE:check_worker_health/1,
321
            ?HEALTH_CHECK_TIMEOUT + timer:seconds(1),
322
            #{return_values => true}
323
        ),
324
    case Results of
290✔
325
        {ok, []} ->
326
            {error, worker_processes_dead};
×
327
        {ok, Values} ->
328
            case lists:partition(fun(V) -> V =:= ok end, Values) of
290✔
329
                {_Ok, []} ->
330
                    ok;
284✔
331
                {_Ok, [{error, Reason} | _Errors]} ->
332
                    {error, Reason};
6✔
333
                {_Ok, [Error | _Errors]} ->
334
                    {error, Error}
×
335
            end;
336
        {error, Reason} ->
337
            {error, Reason}
×
338
    end.
339

340
%% ===================================================================
341

342
check_worker_health(Conn) ->
343
    %% we don't care if this returns something or not, we just to test the connection
344
    try do_test_query(Conn) of
2,320✔
345
        {error, Reason} ->
346
            ?SLOG(warning, #{
48✔
347
                msg => "mongo_connection_get_status_error",
348
                reason => Reason
349
            }),
×
350
            {error, Reason};
48✔
351
        _ ->
352
            ok
2,272✔
353
    catch
354
        Class:Error ->
355
            ?SLOG(warning, #{
×
356
                msg => "mongo_connection_get_status_exception",
357
                class => Class,
358
                error => Error
359
            }),
×
360
            {error, {Class, Error}}
×
361
    end.
362

363
do_test_query(Conn) ->
364
    mongoc:transaction_query(
2,320✔
365
        Conn,
366
        fun(Conf = #{pool := Worker}) ->
367
            Query = mongoc:find_one_query(Conf, <<"foo">>, #{}, #{}, 0),
2,272✔
368
            mc_worker_api:find_one(Worker, Query)
2,272✔
369
        end,
370
        #{},
371
        ?HEALTH_CHECK_TIMEOUT
372
    ).
373

374
connect(Opts) ->
375
    Type = proplists:get_value(mongo_type, Opts, single),
446✔
376
    Hosts = proplists:get_value(hosts, Opts, []),
446✔
377
    Options = proplists:get_value(options, Opts, []),
446✔
378
    WorkerOptions = proplists:get_value(worker_options, Opts, []),
446✔
379
    mongo_api:connect(Type, Hosts, Options, WorkerOptions).
446✔
380

381
mongo_query(Conn, find, Collection, Filter, Projector) ->
382
    mongo_api:find(Conn, Collection, Filter, Projector);
36✔
383
mongo_query(Conn, find_one, Collection, Filter, Projector) ->
384
    mongo_api:find_one(Conn, Collection, Filter, Projector);
×
385
%% Todo xxx
386
mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) ->
387
    ok.
×
388

389
mongo_insert(Conn, Collection, Documents) ->
390
    mongo_api:insert(Conn, Collection, Documents).
31✔
391

392
init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
393
    {rs, ReplicaSetName};
24✔
394
init_type(#{mongo_type := Type}) ->
395
    Type.
44✔
396

397
init_topology_options([{pool_size, Val} | R], Acc) ->
398
    init_topology_options(R, [{pool_size, Val} | Acc]);
68✔
399
init_topology_options([{max_overflow, Val} | R], Acc) ->
400
    init_topology_options(R, [{max_overflow, Val} | Acc]);
68✔
401
init_topology_options([{overflow_ttl, Val} | R], Acc) ->
402
    init_topology_options(R, [{overflow_ttl, Val} | Acc]);
×
403
init_topology_options([{overflow_check_period, Val} | R], Acc) ->
404
    init_topology_options(R, [{overflow_check_period, Val} | Acc]);
×
405
init_topology_options([{local_threshold_ms, Val} | R], Acc) ->
406
    init_topology_options(R, [{'localThresholdMS', Val} | Acc]);
×
407
init_topology_options([{connect_timeout_ms, Val} | R], Acc) ->
408
    init_topology_options(R, [{'connectTimeoutMS', Val} | Acc]);
13✔
409
init_topology_options([{socket_timeout_ms, Val} | R], Acc) ->
410
    init_topology_options(R, [{'socketTimeoutMS', Val} | Acc]);
×
411
init_topology_options([{server_selection_timeout_ms, Val} | R], Acc) ->
412
    init_topology_options(R, [{'serverSelectionTimeoutMS', Val} | Acc]);
6✔
413
init_topology_options([{wait_queue_timeout_ms, Val} | R], Acc) ->
414
    init_topology_options(R, [{'waitQueueTimeoutMS', Val} | Acc]);
×
415
init_topology_options([{heartbeat_frequency_ms, Val} | R], Acc) ->
416
    init_topology_options(R, [{'heartbeatFrequencyMS', Val} | Acc]);
68✔
417
init_topology_options([{min_heartbeat_frequency_ms, Val} | R], Acc) ->
418
    init_topology_options(R, [{'minHeartbeatFrequencyMS', Val} | Acc]);
×
419
init_topology_options([_ | R], Acc) ->
420
    init_topology_options(R, Acc);
×
421
init_topology_options([], Acc) ->
422
    Acc.
68✔
423

424
init_worker_options([{database, V} | R], Acc) ->
425
    init_worker_options(R, [{database, V} | Acc]);
68✔
426
init_worker_options([{auth_source, V} | R], Acc) ->
427
    init_worker_options(R, [{auth_source, V} | Acc]);
22✔
428
init_worker_options([{username, V} | R], Acc) ->
429
    init_worker_options(R, [{login, V} | Acc]);
22✔
430
init_worker_options([{password, Secret} | R], Acc) ->
431
    init_worker_options(R, [{password, Secret} | Acc]);
22✔
432
init_worker_options([{w_mode, V} | R], Acc) ->
433
    init_worker_options(R, [{w_mode, V} | Acc]);
68✔
434
init_worker_options([{r_mode, V} | R], Acc) ->
435
    init_worker_options(R, [{r_mode, V} | Acc]);
24✔
436
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
437
    init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
68✔
438
init_worker_options([_ | R], Acc) ->
439
    init_worker_options(R, Acc);
842✔
440
init_worker_options([], Acc) ->
441
    Acc.
68✔
442

443
%% ===================================================================
444
%% Schema funcs
445

446
server() ->
447
    Meta = #{desc => ?DESC("server")},
1,182✔
448
    emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
1,182✔
449

450
servers() ->
451
    Meta = #{desc => ?DESC("servers")},
2,184✔
452
    emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
2,184✔
453

454
w_mode(type) -> hoconsc:enum([unsafe, safe]);
3,260✔
455
w_mode(desc) -> ?DESC("w_mode");
3,111✔
456
w_mode(default) -> unsafe;
3,260✔
457
w_mode(_) -> undefined.
41,527✔
458

459
r_mode(type) -> hoconsc:enum([master, slave_ok]);
1,084✔
460
r_mode(desc) -> ?DESC("r_mode");
1,037✔
461
r_mode(default) -> master;
1,084✔
462
r_mode(_) -> undefined.
13,791✔
463

464
duration(Desc) ->
465
    #{
19,528✔
466
        type => emqx_schema:timeout_duration_ms(),
467
        required => false,
468
        desc => ?DESC(Desc)
469
    }.
470

471
max_overflow(type) -> non_neg_integer();
2,441✔
472
max_overflow(desc) -> ?DESC("max_overflow");
2,292✔
473
max_overflow(default) -> 0;
2,441✔
474
max_overflow(_) -> undefined.
30,784✔
475

476
replica_set_name(type) -> binary();
1,084✔
477
replica_set_name(desc) -> ?DESC("replica_set_name");
1,037✔
478
replica_set_name(required) -> true;
1,142✔
479
replica_set_name(_) -> undefined.
13,729✔
480

481
srv_record(type) -> boolean();
2,453✔
482
srv_record(desc) -> ?DESC("srv_record");
2,304✔
483
srv_record(default) -> false;
2,453✔
484
srv_record(_) -> undefined.
30,940✔
485

486
%% ===================================================================
487
%% Internal funcs
488

489
maybe_resolve_srv_and_txt_records(#{server := Server} = Config) ->
490
    NConfig = maps:remove(server, Config),
30✔
491
    maybe_resolve_srv_and_txt_records1(Server, NConfig);
30✔
492
maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) ->
493
    NConfig = maps:remove(servers, Config),
44✔
494
    maybe_resolve_srv_and_txt_records1(Servers, NConfig).
44✔
495

496
maybe_resolve_srv_and_txt_records1(
497
    Servers0,
498
    #{
499
        mongo_type := Type,
500
        srv_record := false
501
    } = Config
502
) ->
503
    case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of
68✔
504
        true ->
505
            throw(#{
×
506
                reason => "missing_parameter",
507
                param => replica_set_name
508
            });
509
        false ->
510
            Servers = parse_servers(Servers0),
68✔
511
            Config#{hosts => format_hosts(Servers)}
68✔
512
    end;
513
maybe_resolve_srv_and_txt_records1(
514
    Servers,
515
    #{
516
        mongo_type := Type,
517
        srv_record := true
518
    } = Config
519
) ->
520
    %% when srv is in use, it's typically only one DNS resolution needed,
521
    %% however, by the schema definition, it's allowed to configure more than one.
522
    %% here we keep only the fist
523
    [{DNS, _IgnorePort} | _] = parse_servers(Servers),
6✔
524
    DnsRecords = resolve_srv_records(DNS),
6✔
525
    Hosts = format_hosts(DnsRecords),
5✔
526
    ?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}),
5✔
527
    ExtraOpts = resolve_txt_records(Type, DNS),
5✔
528
    ?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}),
3✔
529
    maps:merge(Config#{hosts => Hosts}, ExtraOpts).
3✔
530

531
resolve_srv_records(DNS0) ->
532
    DNS = "_mongodb._tcp." ++ DNS0,
6✔
533
    DnsData = emqx_connector_lib:resolve_dns(DNS, srv),
6✔
534
    case [{Host, Port} || {_, _, Port, Host} <- DnsData] of
6✔
535
        [] ->
536
            throw(#{
1✔
537
                reason => "failed_to_resolve_srv_record",
538
                dns => DNS
539
            });
540
        L ->
541
            L
5✔
542
    end.
543

544
resolve_txt_records(Type, DNS) ->
545
    case emqx_connector_lib:resolve_dns(DNS, txt) of
5✔
546
        [] ->
547
            #{};
1✔
548
        [[QueryString]] = L ->
549
            %% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0"
550
            case uri_string:dissect_query(QueryString) of
3✔
551
                {error, _, _} ->
552
                    throw(#{
1✔
553
                        reason => "bad_txt_record_resolution",
554
                        resolved => L
555
                    });
556
                Options ->
557
                    convert_options(Type, normalize_options(Options))
2✔
558
            end;
559
        L ->
560
            throw(#{
1✔
561
                reason => "multiple_txt_records",
562
                resolved => L
563
            })
564
    end.
565

566
normalize_options([]) ->
567
    [];
2✔
568
normalize_options([{Name, Value} | Options]) ->
569
    [{string:lowercase(Name), Value} | normalize_options(Options)].
4✔
570

571
convert_options(rs, Options) ->
572
    M1 = maybe_add_option(auth_source, "authSource", Options),
1✔
573
    M2 = maybe_add_option(replica_set_name, "replicaSet", Options),
1✔
574
    maps:merge(M1, M2);
1✔
575
convert_options(_, Options) ->
576
    maybe_add_option(auth_source, "authSource", Options).
1✔
577

578
maybe_add_option(ConfigKey, OptName0, Options) ->
579
    OptName = string:lowercase(OptName0),
3✔
580
    case lists:keyfind(OptName, 1, Options) of
3✔
581
        {_, OptValue} ->
582
            #{ConfigKey => iolist_to_binary(OptValue)};
3✔
583
        false ->
584
            #{}
×
585
    end.
586

587
format_host({Host, Port}) ->
588
    iolist_to_binary([Host, ":", integer_to_list(Port)]).
91✔
589

590
format_hosts(Hosts) ->
591
    lists:map(fun format_host/1, Hosts).
73✔
592

593
parse_servers(HoconValue) ->
594
    lists:map(
74✔
595
        fun(#{hostname := Host, port := Port}) ->
596
            {Host, Port}
84✔
597
        end,
598
        emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS)
599
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc