• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.44
/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_bridge_kafka_impl_consumer).
5

6
-behaviour(emqx_resource).
7

8
%% `emqx_resource' API
9
-export([
10
    callback_mode/0,
11
    query_mode/1,
12
    on_start/2,
13
    on_stop/2,
14
    on_get_status/2,
15

16
    on_add_channel/4,
17
    on_remove_channel/3,
18
    on_get_channels/1,
19
    on_get_channel_status/3
20
]).
21

22
%% `brod_group_consumer' API
23
-export([
24
    init/2,
25
    handle_message/2
26
]).
27

28
-ifdef(TEST).
29
-export([consumer_group_id/1]).
30
-endif.
31

32
-include_lib("emqx/include/logger.hrl").
33
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
34
%% needed for the #kafka_message record definition
35
-include_lib("brod/include/brod.hrl").
36
-include_lib("emqx_resource/include/emqx_resource.hrl").
37

38
-type connector_config() :: #{
39
    authentication := term(),
40
    bootstrap_hosts := binary(),
41
    connector_name := atom() | binary(),
42
    connector_type := atom() | binary(),
43
    socket_opts := _,
44
    ssl := _,
45
    any() => term()
46
}.
47
-type source_config() :: #{
48
    bridge_name := atom(),
49
    hookpoints := [binary()],
50
    parameters := source_parameters()
51
}.
52
-type source_parameters() :: #{
53
    key_encoding_mode := encoding_mode(),
54
    max_batch_bytes := emqx_schema:bytesize(),
55
    max_rejoin_attempts := non_neg_integer(),
56
    offset_commit_interval_seconds := pos_integer(),
57
    offset_reset_policy := offset_reset_policy(),
58
    topic := kafka_topic(),
59
    value_encoding_mode := encoding_mode(),
60
    topic_mapping => [one_topic_mapping()]
61
}.
62
-type one_topic_mapping() :: #{
63
    kafka_topic => kafka_topic(),
64
    mqtt_topic => emqx_types:topic(),
65
    qos => emqx_types:qos(),
66
    payload_template => string()
67
}.
68
-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
69
-type kafka_topic() :: brod:topic().
70
-type kafka_message() :: #kafka_message{}.
71
-type connector_state() :: #{
72
    kafka_client_id := brod:client_id(),
73
    installed_sources := #{source_resource_id() => source_state()}
74
}.
75
-type source_state() :: #{
76
    subscriber_id := subscriber_id(),
77
    kafka_client_id := brod:client_id(),
78
    kafka_topics := [kafka_topic()]
79
}.
80
-type offset_reset_policy() :: latest | earliest.
81
-type encoding_mode() :: none | base64.
82
-type consumer_init_data() :: #{
83
    hookpoints := [binary()],
84
    key_encoding_mode := encoding_mode(),
85
    resource_id := source_resource_id(),
86
    topic_mapping := #{
87
        kafka_topic() := #{
88
            payload_template => emqx_placeholder:tmpl_token(),
89
            mqtt_topic_template => emqx_placeholder:tmpl_token(),
90
            qos => emqx_types:qos()
91
        }
92
    },
93
    value_encoding_mode := encoding_mode()
94
}.
95
-type consumer_state() :: #{
96
    hookpoints := [binary()],
97
    kafka_topic := kafka_topic(),
98
    key_encoding_mode := encoding_mode(),
99
    resource_id := source_resource_id(),
100
    topic_mapping := #{
101
        kafka_topic() := #{
102
            payload_template => emqx_placeholder:tmpl_token(),
103
            mqtt_topic_template => emqx_placeholder:tmpl_token(),
104
            qos => emqx_types:qos()
105
        }
106
    },
107
    value_encoding_mode := encoding_mode()
108
}.
109
-type subscriber_init_info() :: #{
110
    topic := brod:topic(),
111
    parition => brod:partition(),
112
    group_id => brod:group_id(),
113
    commit_fun => brod_group_subscriber_v2:commit_fun()
114
}.
115

116
-define(CLIENT_DOWN_MESSAGE,
117
    "Failed to start Kafka client. Please check the logs for errors and check"
118
    " the connection parameters."
119
).
120

121
%% Allocatable resources
122
-define(kafka_client_id, kafka_client_id).
123
-define(kafka_subscriber_id, kafka_subscriber_id).
124

125
%%-------------------------------------------------------------------------------------
126
%% `emqx_resource' API
127
%%-------------------------------------------------------------------------------------
128

129
callback_mode() ->
130
    async_if_possible.
100✔
131

132
%% consumer bridges don't need resource workers
133
query_mode(_Config) ->
134
    no_queries.
217✔
135

136
-spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()}.
137
on_start(ConnectorResId, Config) ->
138
    #{
96✔
139
        authentication := Auth,
140
        bootstrap_hosts := BootstrapHosts0,
141
        connector_type := ConnectorType,
142
        connector_name := ConnectorName,
143
        socket_opts := SocketOpts0,
144
        ssl := SSL
145
    } = Config,
146
    BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
96✔
147
    %% Note: this is distinct per node.
148
    ClientID = make_client_id(ConnectorResId, ConnectorType, ConnectorName),
96✔
149
    ClientOpts0 =
96✔
150
        case Auth of
151
            none -> [];
30✔
152
            Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
66✔
153
        end,
154
    ClientOpts = add_ssl_opts(ClientOpts0, SSL),
96✔
155
    SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
96✔
156
    ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
96✔
157
    ok = emqx_resource:allocate_resource(ConnectorResId, ?kafka_client_id, ClientID),
96✔
158
    case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
96✔
159
        ok ->
160
            ?tp(
96✔
161
                kafka_consumer_client_started,
162
                #{client_id => ClientID, resource_id => ConnectorResId}
163
            ),
164
            ?SLOG(info, #{
96✔
165
                msg => "kafka_consumer_client_started",
166
                resource_id => ConnectorResId,
167
                kafka_hosts => BootstrapHosts
168
            });
96✔
169
        {error, Reason} ->
170
            ?SLOG(error, #{
×
171
                msg => "failed_to_start_kafka_consumer_client",
172
                resource_id => ConnectorResId,
173
                kafka_hosts => BootstrapHosts,
174
                reason => emqx_utils:redact(Reason)
175
            }),
×
176
            throw(?CLIENT_DOWN_MESSAGE)
×
177
    end,
178
    {ok, #{
96✔
179
        kafka_client_id => ClientID,
180
        installed_sources => #{}
181
    }}.
182

183
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
184
on_stop(ConnectorResId, _State = undefined) ->
185
    SubscribersStopped =
5✔
186
        maps:fold(
187
            fun
188
                (?kafka_client_id, ClientID, Acc) ->
189
                    stop_client(ClientID),
5✔
190
                    Acc;
5✔
191
                ({?kafka_subscriber_id, _SourceResId}, SubscriberId, Acc) ->
192
                    stop_subscriber(SubscriberId),
1✔
193
                    Acc + 1
1✔
194
            end,
195
            0,
196
            emqx_resource:get_allocated_resources(ConnectorResId)
197
        ),
198
    case SubscribersStopped > 0 of
5✔
199
        true ->
200
            ?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
1✔
201
            ok;
1✔
202
        false ->
203
            ?tp(kafka_consumer_just_client_stopped, #{}),
4✔
204
            ok
4✔
205
    end;
206
on_stop(ConnectorResId, State) ->
207
    #{
91✔
208
        installed_sources := InstalledSources,
209
        kafka_client_id := ClientID
210
    } = State,
211
    maps:foreach(
91✔
212
        fun(_SourceResId, #{subscriber_id := SubscriberId}) ->
213
            stop_subscriber(SubscriberId)
43✔
214
        end,
215
        InstalledSources
216
    ),
217
    stop_client(ClientID),
91✔
218
    ?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}),
91✔
219
    ok.
91✔
220

221
-spec on_get_status(connector_resource_id(), connector_state()) ->
222
    ?status_connected | ?status_disconnected.
223
on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) ->
224
    case whereis(ClientID) of
400✔
225
        Pid when is_pid(Pid) ->
226
            case check_client_connectivity(Pid) of
385✔
227
                {Status, Reason} ->
228
                    {Status, State, Reason};
20✔
229
                Status ->
230
                    Status
365✔
231
            end;
232
        _ ->
233
            ?status_disconnected
15✔
234
    end;
235
on_get_status(_ConnectorResId, _State) ->
236
    ?status_disconnected.
×
237

238
-spec on_add_channel(
239
    connector_resource_id(),
240
    connector_state(),
241
    source_resource_id(),
242
    source_config()
243
) ->
244
    {ok, connector_state()}.
245
on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
246
    #{
90✔
247
        kafka_client_id := ClientID,
248
        installed_sources := InstalledSources0
249
    } = ConnectorState0,
250
    case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of
90✔
251
        {ok, SourceState} ->
252
            InstalledSources = InstalledSources0#{SourceResId => SourceState},
88✔
253
            ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
88✔
254
            {ok, ConnectorState};
88✔
255
        Error = {error, _} ->
256
            Error
×
257
    end.
258

259
-spec on_remove_channel(
260
    connector_resource_id(),
261
    connector_state(),
262
    source_resource_id()
263
) ->
264
    {ok, connector_state()}.
265
on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) ->
266
    #{installed_sources := InstalledSources0} = ConnectorState0,
88✔
267
    case maps:take(SourceResId, InstalledSources0) of
88✔
268
        {SourceState, InstalledSources} ->
269
            #{subscriber_id := SubscriberId} = SourceState,
88✔
270
            stop_subscriber(SubscriberId),
88✔
271
            deallocate_subscriber_id(ConnectorResId, SourceResId),
88✔
272
            ok;
88✔
273
        error ->
UNCOV
274
            InstalledSources = InstalledSources0
×
275
    end,
276
    ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
88✔
277
    {ok, ConnectorState}.
88✔
278

279
-spec on_get_channels(connector_resource_id()) ->
280
    [{action_resource_id(), source_config()}].
281
on_get_channels(ConnectorResId) ->
282
    emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
555✔
283

284
-spec on_get_channel_status(
285
    connector_resource_id(),
286
    source_resource_id(),
287
    connector_state()
288
) ->
289
    ?status_connected | ?status_disconnected.
290
on_get_channel_status(
291
    _ConnectorResId,
292
    SourceResId,
293
    ConnectorState = #{installed_sources := InstalledSources}
294
) when is_map_key(SourceResId, InstalledSources) ->
295
    #{kafka_client_id := ClientID} = ConnectorState,
274✔
296
    #{
274✔
297
        kafka_topics := KafkaTopics,
298
        subscriber_id := SubscriberId
299
    } = maps:get(SourceResId, InstalledSources),
300
    do_get_status(ClientID, KafkaTopics, SubscriberId);
274✔
301
on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) ->
302
    ?status_disconnected.
×
303

304
%%-------------------------------------------------------------------------------------
305
%% `brod_group_subscriber' API
306
%%-------------------------------------------------------------------------------------
307

308
-spec init(subscriber_init_info(), consumer_init_data()) -> {ok, consumer_state()}.
309
init(GroupData, State0) ->
310
    ?tp(kafka_consumer_subscriber_init, #{group_data => GroupData, state => State0}),
222✔
311
    #{topic := KafkaTopic} = GroupData,
222✔
312
    State = State0#{kafka_topic => KafkaTopic},
222✔
313
    {ok, State}.
222✔
314

315
-spec handle_message(kafka_message(), consumer_state()) -> {ok, commit, consumer_state()}.
316
handle_message(Message, State) ->
317
    ?tp_span(
482✔
318
        kafka_consumer_handle_message,
319
        #{message => Message, state => State},
482✔
320
        do_handle_message(Message, State)
482✔
321
    ).
322

323
do_handle_message(Message, State) ->
324
    #{
482✔
325
        hookpoints := Hookpoints,
326
        kafka_topic := KafkaTopic,
327
        key_encoding_mode := KeyEncodingMode,
328
        resource_id := SourceResId,
329
        topic_mapping := TopicMapping,
330
        value_encoding_mode := ValueEncodingMode
331
    } = State,
332
    FullMessage = #{
482✔
333
        headers => maps:from_list(Message#kafka_message.headers),
334
        key => encode(Message#kafka_message.key, KeyEncodingMode),
335
        offset => Message#kafka_message.offset,
336
        topic => KafkaTopic,
337
        ts => Message#kafka_message.ts,
338
        ts_type => Message#kafka_message.ts_type,
339
        value => encode(Message#kafka_message.value, ValueEncodingMode)
340
    },
341
    LegacyMQTTConfig = maps:get(KafkaTopic, TopicMapping, #{}),
482✔
342
    legacy_maybe_publish_mqtt_message(LegacyMQTTConfig, SourceResId, FullMessage),
482✔
343
    lists:foreach(fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints),
482✔
344
    emqx_resource_metrics:received_inc(SourceResId),
482✔
345
    %% note: just `ack' does not commit the offset to the
346
    %% kafka consumer group.
347
    {ok, commit, State}.
482✔
348

349
legacy_maybe_publish_mqtt_message(
350
    _MQTTConfig = #{
351
        payload_template := PayloadTemplate,
352
        qos := MQTTQoS,
353
        mqtt_topic_template := MQTTTopicTemplate
354
    },
355
    SourceResId,
356
    FullMessage
357
) when MQTTTopicTemplate =/= <<>> ->
358
    Payload = render(FullMessage, PayloadTemplate),
481✔
359
    MQTTTopic = render(FullMessage, MQTTTopicTemplate),
481✔
360
    MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
481✔
361
    _ = emqx_broker:safe_publish(MQTTMessage),
481✔
362
    ok;
481✔
363
legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
364
    ok.
1✔
365

366
%%-------------------------------------------------------------------------------------
367
%% Helper fns
368
%%-------------------------------------------------------------------------------------
369

370
add_ssl_opts(ClientOpts, #{enable := false}) ->
371
    ClientOpts;
60✔
372
add_ssl_opts(ClientOpts, SSL) ->
373
    [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
36✔
374

375
-spec make_subscriber_id(atom() | binary()) -> emqx_bridge_kafka_consumer_sup:child_id().
376
make_subscriber_id(BridgeName) ->
377
    BridgeNameBin = to_bin(BridgeName),
90✔
378
    <<"kafka_subscriber:", BridgeNameBin/binary>>.
90✔
379

380
ensure_consumer_supervisor_started() ->
381
    Mod = emqx_bridge_kafka_consumer_sup,
90✔
382
    ChildSpec =
90✔
383
        #{
384
            id => Mod,
385
            start => {Mod, start_link, []},
386
            restart => permanent,
387
            shutdown => infinity,
388
            type => supervisor,
389
            modules => [Mod]
390
        },
391
    case supervisor:start_child(emqx_bridge_sup, ChildSpec) of
90✔
392
        {ok, _Pid} ->
393
            ok;
2✔
394
        {error, already_present} ->
395
            ok;
×
396
        {error, {already_started, _Pid}} ->
397
            ok
88✔
398
    end.
399

400
-spec start_consumer(
401
    source_config(),
402
    connector_resource_id(),
403
    source_resource_id(),
404
    brod:client_id()
405
) ->
406
    {ok, source_state()} | {error, term()}.
407
start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
408
    #{
90✔
409
        bridge_name := BridgeName,
410
        hookpoints := Hookpoints,
411
        parameters := #{
412
            key_encoding_mode := KeyEncodingMode,
413
            max_batch_bytes := MaxBatchBytes,
414
            max_rejoin_attempts := MaxRejoinAttempts,
415
            offset_commit_interval_seconds := OffsetCommitInterval,
416
            offset_reset_policy := OffsetResetPolicy0,
417
            topic := _Topic,
418
            value_encoding_mode := ValueEncodingMode
419
        } = Params0
420
    } = Config,
421
    ok = ensure_consumer_supervisor_started(),
90✔
422
    ?tp(kafka_consumer_sup_started, #{}),
90✔
423
    TopicMapping = ensure_topic_mapping(Params0),
90✔
424
    InitialState = #{
90✔
425
        key_encoding_mode => KeyEncodingMode,
426
        hookpoints => Hookpoints,
427
        resource_id => SourceResId,
428
        topic_mapping => TopicMapping,
429
        value_encoding_mode => ValueEncodingMode
430
    },
431
    %% note: the group id should be the same for all nodes in the
432
    %% cluster, so that the load gets distributed between all
433
    %% consumers and we don't repeat messages in the same cluster.
434
    GroupID = consumer_group_id(BridgeName),
90✔
435
    %% earliest or latest
436
    BeginOffset = OffsetResetPolicy0,
90✔
437
    OffsetResetPolicy =
90✔
438
        case OffsetResetPolicy0 of
439
            latest -> reset_to_latest;
78✔
440
            earliest -> reset_to_earliest
12✔
441
        end,
442
    ConsumerConfig = [
90✔
443
        {begin_offset, BeginOffset},
444
        {max_bytes, MaxBatchBytes},
445
        {offset_reset_policy, OffsetResetPolicy}
446
    ],
447
    GroupConfig = [
90✔
448
        {max_rejoin_attempts, MaxRejoinAttempts},
449
        {offset_commit_interval_seconds, OffsetCommitInterval}
450
    ],
451
    KafkaTopics = maps:keys(TopicMapping),
90✔
452
    GroupSubscriberConfig =
90✔
453
        #{
454
            client => ClientID,
455
            group_id => GroupID,
456
            topics => KafkaTopics,
457
            cb_module => ?MODULE,
458
            init_data => InitialState,
459
            message_type => message,
460
            consumer_config => ConsumerConfig,
461
            group_config => GroupConfig
462
        },
463
    %% Below, we spawn a single `brod_group_consumer_v2' worker, with
464
    %% no option for a pool of those. This is because that worker
465
    %% spawns one worker for each assigned topic-partition
466
    %% automatically, so we should not spawn duplicate workers.
467
    SubscriberId = make_subscriber_id(BridgeName),
90✔
468
    ?tp(kafka_consumer_about_to_start_subscriber, #{}),
90✔
469
    ok = allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId),
89✔
470
    ?tp(kafka_consumer_subscriber_allocated, #{}),
89✔
471
    case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
89✔
472
        {ok, _ConsumerPid} ->
473
            ?tp(
89✔
474
                kafka_consumer_subscriber_started,
475
                #{resource_id => SourceResId, subscriber_id => SubscriberId}
476
            ),
477
            {ok, #{
88✔
478
                subscriber_id => SubscriberId,
479
                kafka_client_id => ClientID,
480
                kafka_topics => KafkaTopics
481
            }};
482
        {error, Reason} ->
483
            ?SLOG(error, #{
×
484
                msg => "failed_to_start_kafka_consumer",
485
                resource_id => SourceResId,
486
                reason => emqx_utils:redact(Reason)
487
            }),
×
488
            {error, Reason}
×
489
    end.
490

491
%% This is to ensure backwards compatibility with the deprectated topic mapping.
492
-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
493
ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->
494
    %% There is an existing topic mapping: legacy config.  We use it and ignore the single
495
    %% pubsub topic so that the bridge keeps working as before.
496
    convert_topic_mapping(TM);
65✔
497
ensure_topic_mapping(#{topic := KafkaTopic}) ->
498
    %% No topic mapping: generate one without MQTT templates.
499
    #{KafkaTopic => #{}}.
25✔
500

501
-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
502
stop_subscriber(SubscriberId) ->
503
    _ = log_when_error(
132✔
504
        fun() ->
505
            try
132✔
506
                emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
132✔
507
            catch
508
                exit:{noproc, _} ->
509
                    %% may happen when node is shutting down
510
                    ok
×
511
            end
512
        end,
513
        #{
514
            msg => "failed_to_delete_kafka_subscriber",
515
            subscriber_id => SubscriberId
516
        }
517
    ),
518
    ok.
132✔
519

520
-spec stop_client(brod:client_id()) -> ok.
521
stop_client(ClientID) ->
522
    _ = log_when_error(
96✔
523
        fun() ->
524
            brod:stop_client(ClientID)
96✔
525
        end,
526
        #{
527
            msg => "failed_to_delete_kafka_consumer_client",
528
            client_id => ClientID
529
        }
530
    ),
531
    ok.
96✔
532

533
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
534
    case brod:get_partitions_count(ClientID, KafkaTopic) of
282✔
535
        {ok, NPartitions} ->
536
            case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
280✔
537
                ?status_connected ->
538
                    do_get_status(ClientID, RestTopics, SubscriberId);
280✔
539
                ?status_disconnected ->
UNCOV
540
                    ?status_disconnected
×
541
            end;
542
        {error, {client_down, Context}} ->
543
            case infer_client_error(Context) of
×
544
                auth_error ->
545
                    Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
×
546
                    {?status_disconnected, Message};
×
547
                {auth_error, Message0} ->
548
                    Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
×
549
                    {?status_disconnected, Message};
×
550
                connection_refused ->
551
                    Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
×
552
                    {?status_disconnected, Message};
×
553
                _ ->
554
                    {?status_disconnected, ?CLIENT_DOWN_MESSAGE}
×
555
            end;
556
        {error, leader_not_available} ->
557
            Message =
2✔
558
                "Leader connection not available. Please check the Kafka topic used,"
559
                " the connection parameters and Kafka cluster health",
560
            {?status_disconnected, Message};
2✔
561
        _ ->
562
            ?status_disconnected
×
563
    end;
564
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
565
    ?status_connected.
272✔
566

567
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
568
    ?status_connected | ?status_disconnected.
569
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
570
    Results =
280✔
571
        lists:map(
572
            fun(N) ->
573
                brod_client:get_leader_connection(ClientID, KafkaTopic, N)
656✔
574
            end,
575
            lists:seq(0, NPartitions - 1)
576
        ),
577
    AllLeadersOk =
280✔
578
        length(Results) > 0 andalso
×
579
            lists:all(
280✔
580
                fun
581
                    ({ok, _}) ->
582
                        true;
656✔
583
                    (_) ->
584
                        false
×
585
                end,
586
                Results
587
            ),
588
    WorkersAlive = are_subscriber_workers_alive(SubscriberId),
280✔
589
    case AllLeadersOk andalso WorkersAlive of
280✔
590
        true ->
591
            ?status_connected;
280✔
592
        false ->
UNCOV
593
            ?status_disconnected
×
594
    end.
595

596
are_subscriber_workers_alive(SubscriberId) ->
597
    try
280✔
598
        Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
280✔
599
        case lists:keyfind(SubscriberId, 1, Children) of
280✔
600
            false ->
UNCOV
601
                false;
×
602
            {_, Pid, _, _} ->
603
                Workers = brod_group_subscriber_v2:get_workers(Pid),
280✔
604
                %% we can't enforce the number of partitions on a single
605
                %% node, as the group might be spread across an emqx
606
                %% cluster.
607
                lists:all(fun is_process_alive/1, maps:values(Workers))
280✔
608
        end
609
    catch
610
        exit:{shutdown, _} ->
611
            %% may happen if node is shutting down
612
            false
×
613
    end.
614

615
log_when_error(Fun, Log) ->
616
    try
228✔
617
        Fun()
228✔
618
    catch
619
        C:E ->
620
            ?SLOG(error, Log#{
×
621
                exception => C,
622
                reason => E
623
            })
×
624
    end.
625

626
-spec consumer_group_id(atom() | binary()) -> binary().
627
consumer_group_id(BridgeName0) ->
628
    BridgeName = to_bin(BridgeName0),
118✔
629
    <<"emqx-kafka-consumer-", BridgeName/binary>>.
118✔
630

631
-spec is_dry_run(connector_resource_id()) -> boolean().
632
is_dry_run(ConnectorResId) ->
633
    TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX),
96✔
634
    case TestIdStart of
96✔
635
        nomatch ->
636
            false;
59✔
637
        _ ->
638
            string:equal(TestIdStart, ConnectorResId)
37✔
639
    end.
640

641
-spec check_client_connectivity(pid()) ->
642
    ?status_connected
643
    | ?status_disconnected
644
    | {?status_disconnected, term()}.
645
check_client_connectivity(ClientPid) ->
646
    %% We use a fake group id just to probe the connection, as `get_group_coordinator'
647
    %% will ensure a connection to the broker.
648
    FakeGroupId = <<"____emqx_consumer_probe">>,
385✔
649
    case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of
385✔
650
        {error, client_down} ->
651
            ?status_disconnected;
×
652
        {error, {client_down, Reason}} ->
653
            %% `brod' should have already logged the client being down.
654
            {?status_disconnected, maybe_clean_error(Reason)};
14✔
655
        {error, Reason} ->
656
            %% `brod' should have already logged the client being down.
657
            {?status_disconnected, maybe_clean_error(Reason)};
6✔
658
        {ok, _Metadata} ->
659
            ?status_connected
365✔
660
    end.
661

662
%% Attempt to make the returned error a bit more friendly.
663
maybe_clean_error(Reason) ->
664
    case Reason of
20✔
665
        [{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) ->
666
            HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
1✔
667
            {HostPort, nxdomain};
1✔
668
        [{error_code, Code}, {error_msg, Msg} | _] ->
669
            {Code, Msg};
5✔
670
        _ ->
671
            Reason
14✔
672
    end.
673

674
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
675
make_client_id(ConnectorResId, BridgeType, BridgeName) ->
676
    case is_dry_run(ConnectorResId) of
96✔
677
        false ->
678
            ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
59✔
679
            binary_to_atom(ClientID0);
59✔
680
        true ->
681
            %% It is a dry run and we don't want to leak too many
682
            %% atoms.
683
            probing_brod_consumers
37✔
684
    end.
685

686
convert_topic_mapping(TopicMappingList) ->
687
    lists:foldl(
65✔
688
        fun(Fields, Acc) ->
689
            #{
68✔
690
                kafka_topic := KafkaTopic,
691
                mqtt_topic := MQTTTopicTemplate0,
692
                qos := QoS,
693
                payload_template := PayloadTemplate0
694
            } = Fields,
695
            PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
68✔
696
            MQTTTopicTemplate = emqx_placeholder:preproc_tmpl(MQTTTopicTemplate0),
68✔
697
            Acc#{
68✔
698
                KafkaTopic => #{
699
                    payload_template => PayloadTemplate,
700
                    mqtt_topic_template => MQTTTopicTemplate,
701
                    qos => QoS
702
                }
703
            }
704
        end,
705
        #{},
706
        TopicMappingList
707
    ).
708

709
render(FullMessage, PayloadTemplate) ->
710
    Opts = #{
962✔
711
        return => full_binary,
712
        var_trans => fun
713
            (undefined) ->
714
                <<>>;
3✔
715
            (X) ->
716
                emqx_utils_conv:bin(X)
490✔
717
        end
718
    },
719
    emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
962✔
720

721
encode(Value, none) ->
722
    Value;
964✔
723
encode(Value, base64) ->
724
    base64:encode(Value).
×
725

726
to_bin(B) when is_binary(B) -> B;
180✔
727
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
28✔
728

729
infer_client_error(Error) ->
730
    case Error of
×
731
        [{_BrokerEndpoint, {econnrefused, _}} | _] ->
732
            connection_refused;
×
733
        [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) ->
734
            {auth_error, Message};
×
735
        [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] ->
736
            auth_error;
×
737
        _ ->
738
            undefined
×
739
    end.
740

741
allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId) ->
742
    ok = emqx_resource:allocate_resource(
89✔
743
        ConnectorResId,
744
        {?kafka_subscriber_id, SourceResId},
745
        SubscriberId
746
    ).
747

748
deallocate_subscriber_id(ConnectorResId, SourceResId) ->
749
    ok = emqx_resource:deallocate_resource(
88✔
750
        ConnectorResId,
751
        {?kafka_subscriber_id, SourceResId}
752
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc