• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8702269235

16 Apr 2024 08:17AM UTC coverage: 67.82% (-0.01%) from 67.831%
8702269235

push

github

web-flow
Merge pull request #12881 from keynslug/fix/ds-repl-flaky

fix(dsrepl): make replication-related tests more stable

11 of 17 new or added lines in 1 file covered. (64.71%)

32 existing lines in 11 files now uncovered.

37936 of 55936 relevant lines covered (67.82%)

7895.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.32
/apps/emqx_bridge/src/emqx_bridge_v2.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_bridge_v2).
17

18
-behaviour(emqx_config_handler).
19
-behaviour(emqx_config_backup).
20

21
-include_lib("emqx/include/emqx.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("emqx/include/emqx_hooks.hrl").
24
-include_lib("emqx_resource/include/emqx_resource.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26

27
%% Note: this is strange right now, because it lives in `emqx_bridge_v2', but it shall be
28
%% refactored into a new module/application with appropriate name.
29
-define(ROOT_KEY_ACTIONS, actions).
30
-define(ROOT_KEY_ACTIONS_BIN, <<"actions">>).
31
-define(ROOT_KEY_SOURCES, sources).
32
-define(ROOT_KEY_SOURCES_BIN, <<"sources">>).
33

34
%% Loading and unloading config when EMQX starts and stops
35
-export([
36
    load/0,
37
    unload/0
38
]).
39

40
%% CRUD API
41

42
-export([
43
    list/0,
44
    list/1,
45
    lookup/2,
46
    lookup/3,
47
    create/3,
48
    create/4,
49
    %% The remove/2 function is only for internal use as it may create
50
    %% rules with broken dependencies
51
    remove/2,
52
    remove/3,
53
    %% The following is the remove function that is called by the HTTP API
54
    %% It also checks for rule action dependencies and optionally removes
55
    %% them
56
    check_deps_and_remove/3,
57
    check_deps_and_remove/4
58
]).
59
-export([lookup_action/2, lookup_source/2]).
60

61
%% Operations
62

63
-export([
64
    disable_enable/3,
65
    disable_enable/4,
66
    health_check/2,
67
    send_message/4,
68
    query/4,
69
    start/2,
70
    start/3,
71
    reset_metrics/2,
72
    reset_metrics/3,
73
    create_dry_run/2,
74
    create_dry_run/3,
75
    get_metrics/2,
76
    get_metrics/3
77
]).
78

79
%% On message publish hook (for local_topics)
80

81
-export([on_message_publish/1]).
82

83
%% Convenience functions for connector implementations
84

85
-export([
86
    parse_id/1,
87
    get_channels_for_connector/1
88
]).
89

90
-export([diff_confs/2]).
91

92
%% Exported for tests
93
-export([
94
    id/2,
95
    id/3,
96
    source_id/3,
97
    source_hookpoint/1,
98
    bridge_v1_is_valid/2,
99
    bridge_v1_is_valid/3,
100
    extract_connector_id_from_bridge_v2_id/1
101
]).
102

103
%% Config Update Handler API
104

105
-export([
106
    post_config_update/5,
107
    pre_config_update/3
108
]).
109

110
%% Data backup
111
-export([
112
    import_config/1
113
]).
114

115
%% Bridge V2 Types and Conversions
116

117
-export([
118
    bridge_v2_type_to_connector_type/1,
119
    is_bridge_v2_type/1,
120
    connector_type/1
121
]).
122

123
%% Compatibility Layer API
124
%% All public functions for the compatibility layer should be prefixed with
125
%% bridge_v1_
126

127
-export([
128
    bridge_v1_lookup_and_transform/2,
129
    bridge_v1_list_and_transform/0,
130
    bridge_v1_check_deps_and_remove/3,
131
    bridge_v1_split_config_and_create/3,
132
    bridge_v1_create_dry_run/2,
133
    bridge_v1_type_to_bridge_v2_type/1,
134
    %% Exception from the naming convention:
135
    bridge_v2_type_to_bridge_v1_type/2,
136
    bridge_v1_id_to_connector_resource_id/1,
137
    bridge_v1_id_to_connector_resource_id/2,
138
    bridge_v1_enable_disable/3,
139
    bridge_v1_restart/2,
140
    bridge_v1_stop/2,
141
    bridge_v1_start/2,
142
    bridge_v1_reset_metrics/2,
143
    %% For test cases only
144
    bridge_v1_remove/2,
145
    get_conf_root_key_if_only_one/2
146
]).
147

148
%%====================================================================
149
%% Types
150
%%====================================================================
151

152
-type bridge_v2_info() :: #{
153
    type := binary(),
154
    name := binary(),
155
    raw_config := map(),
156
    resource_data := map(),
157
    status := emqx_resource:resource_status(),
158
    %% Explanation of the status if the status is not connected
159
    error := term()
160
}.
161

162
-type bridge_v2_type() :: binary() | atom() | [byte()].
163
-type bridge_v2_name() :: binary() | atom() | [byte()].
164

165
-type root_cfg_key() :: ?ROOT_KEY_ACTIONS | ?ROOT_KEY_SOURCES.
166

167
-export_type([root_cfg_key/0]).
168

169
%%====================================================================
170

171
%%====================================================================
172

173
%%====================================================================
174
%% Loading and unloading config when EMQX starts and stops
175
%%====================================================================
176

177
load() ->
178
    load_bridges(?ROOT_KEY_ACTIONS),
130✔
179
    load_bridges(?ROOT_KEY_SOURCES),
130✔
180
    load_message_publish_hook(),
130✔
181
    ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2),
130✔
182
    ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2),
130✔
183
    ok = emqx_config_handler:add_handler(config_key_path_leaf_sources(), emqx_bridge_v2),
130✔
184
    ok = emqx_config_handler:add_handler(config_key_path_sources(), emqx_bridge_v2),
130✔
185
    ok.
130✔
186

187
load_bridges(RootName) ->
188
    Bridges = emqx:get_config([RootName], #{}),
260✔
189
    _ = emqx_utils:pmap(
260✔
190
        fun({Type, Bridge}) ->
191
            emqx_utils:pmap(
7✔
192
                fun({Name, BridgeConf}) ->
193
                    install_bridge_v2(RootName, Type, Name, BridgeConf)
5✔
194
                end,
195
                maps:to_list(Bridge),
196
                infinity
197
            )
198
        end,
199
        maps:to_list(Bridges),
200
        infinity
201
    ),
202
    ok.
260✔
203

204
unload() ->
205
    unload_bridges(?ROOT_KEY_ACTIONS),
93✔
206
    unload_bridges(?ROOT_KEY_SOURCES),
93✔
207
    unload_message_publish_hook(),
93✔
208
    emqx_conf:remove_handler(config_key_path()),
93✔
209
    emqx_conf:remove_handler(config_key_path_leaf()),
93✔
210
    ok.
93✔
211

212
unload_bridges(ConfRooKey) ->
213
    Bridges = emqx:get_config([ConfRooKey], #{}),
186✔
214
    _ = emqx_utils:pmap(
186✔
215
        fun({Type, Bridge}) ->
216
            emqx_utils:pmap(
70✔
217
                fun({Name, BridgeConf}) ->
218
                    uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf)
15✔
219
                end,
220
                maps:to_list(Bridge),
221
                infinity
222
            )
223
        end,
224
        maps:to_list(Bridges),
225
        infinity
226
    ),
227
    ok.
186✔
228

229
%%====================================================================
230
%% CRUD API
231
%%====================================================================
232

233
-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}.
234
lookup(Type, Name) ->
235
    lookup(?ROOT_KEY_ACTIONS, Type, Name).
160✔
236

237
lookup_action(Type, Name) ->
238
    lookup(?ROOT_KEY_ACTIONS, Type, Name).
213✔
239

240
lookup_source(Type, Name) ->
241
    lookup(?ROOT_KEY_SOURCES, Type, Name).
5✔
242

243
-spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) ->
244
    {ok, bridge_v2_info()} | {error, not_found}.
245
lookup(ConfRootName, Type, Name) ->
246
    case emqx:get_raw_config([ConfRootName, Type, Name], not_found) of
2,941✔
247
        not_found ->
248
            {error, not_found};
985✔
249
        #{<<"connector">> := BridgeConnector} = RawConf ->
250
            ConnectorId = emqx_connector_resource:resource_id(
1,956✔
251
                connector_type(Type), BridgeConnector
252
            ),
253
            %% The connector should always exist
254
            %% ... but, in theory, there might be no channels associated to it when we try
255
            %% to delete the connector, and then this reference will become dangling...
256
            ConnectorData =
1,956✔
257
                case emqx_resource:get_instance(ConnectorId) of
258
                    {ok, _, Data} ->
259
                        Data;
1,946✔
260
                    {error, not_found} ->
261
                        #{}
10✔
262
                end,
263
            %% Find the Bridge V2 status from the ConnectorData
264
            ConnectorStatus = maps:get(status, ConnectorData, undefined),
1,956✔
265
            Channels = maps:get(added_channels, ConnectorData, #{}),
1,956✔
266
            BridgeV2Id = id_with_root_name(ConfRootName, Type, Name, BridgeConnector),
1,956✔
267
            ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
1,956✔
268
            {DisplayBridgeV2Status, ErrorMsg} =
1,956✔
269
                case {ChannelStatus, ConnectorStatus} of
270
                    {#{status := ?status_connected}, _} ->
271
                        {?status_connected, <<"">>};
1,540✔
272
                    {#{error := resource_not_operational}, ?status_connecting} ->
273
                        {?status_connecting, <<"Not installed">>};
13✔
274
                    {#{status := Status, error := undefined}, _} ->
275
                        {Status, <<"Unknown reason">>};
×
276
                    {#{status := Status, error := Error}, _} ->
277
                        {Status, emqx_utils:readable_error_msg(Error)};
295✔
278
                    {undefined, _} ->
279
                        {?status_disconnected, <<"Not installed">>}
108✔
280
                end,
281
            {ok, #{
1,956✔
282
                type => bin(Type),
283
                name => bin(Name),
284
                raw_config => RawConf,
285
                resource_data => ConnectorData,
286
                status => DisplayBridgeV2Status,
287
                error => ErrorMsg
288
            }}
289
    end.
290

291
-spec list() -> [bridge_v2_info()] | {error, term()}.
292
list() ->
293
    list_with_lookup_fun(?ROOT_KEY_ACTIONS, fun lookup/2).
223✔
294

295
list(ConfRootKey) ->
296
    LookupFun = fun(Type, Name) ->
962✔
297
        lookup(ConfRootKey, Type, Name)
266✔
298
    end,
299
    list_with_lookup_fun(ConfRootKey, LookupFun).
962✔
300

301
-spec create(bridge_v2_type(), bridge_v2_name(), map()) ->
302
    {ok, emqx_config:update_result()} | {error, any()}.
303
create(BridgeType, BridgeName, RawConf) ->
304
    create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, RawConf).
117✔
305

306
create(ConfRootKey, BridgeType, BridgeName, RawConf) ->
307
    ?SLOG(debug, #{
1,534✔
308
        bridge_action => create,
309
        bridge_version => 2,
310
        bridge_type => BridgeType,
311
        bridge_name => BridgeName,
312
        bridge_raw_config => emqx_utils:redact(RawConf),
313
        root_key_path => ConfRootKey
314
    }),
1,428✔
315
    emqx_conf:update(
1,534✔
316
        [ConfRootKey, BridgeType, BridgeName],
317
        RawConf,
318
        #{override_to => cluster}
319
    ).
320

321
-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}.
322
remove(BridgeType, BridgeName) ->
323
    %% NOTE: This function can cause broken references from rules but it is only
324
    %% called directly from test cases.
325
    remove(?ROOT_KEY_ACTIONS, BridgeType, BridgeName).
125✔
326

327
remove(ConfRootKey, BridgeType, BridgeName) ->
328
    ?SLOG(debug, #{
1,488✔
329
        bridge_action => remove,
330
        bridge_version => 2,
331
        bridge_type => BridgeType,
332
        bridge_name => BridgeName
333
    }),
1,382✔
334
    case
1,488✔
335
        emqx_conf:remove(
336
            [ConfRootKey, BridgeType, BridgeName],
337
            #{override_to => cluster}
338
        )
339
    of
340
        {ok, _} -> ok;
1,453✔
341
        {error, Reason} -> {error, Reason}
35✔
342
    end.
343

344
-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}.
345
check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
346
    check_deps_and_remove(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, AlsoDeleteActions).
×
347

348
-spec check_deps_and_remove(root_cfg_key(), bridge_v2_type(), bridge_v2_name(), boolean()) ->
349
    ok | {error, any()}.
350
check_deps_and_remove(ConfRooKey, BridgeType, BridgeName, AlsoDeleteActions) ->
351
    AlsoDelete =
20✔
352
        case AlsoDeleteActions of
353
            true -> [rule_actions];
4✔
354
            false -> []
16✔
355
        end,
356
    case
20✔
357
        emqx_bridge_lib:maybe_withdraw_rule_action(
358
            BridgeType,
359
            BridgeName,
360
            AlsoDelete
361
        )
362
    of
363
        ok ->
364
            remove(ConfRooKey, BridgeType, BridgeName);
18✔
365
        {error, Reason} ->
366
            {error, Reason}
2✔
367
    end.
368

369
%%--------------------------------------------------------------------
370
%% Helpers for CRUD API
371
%%--------------------------------------------------------------------
372

373
list_with_lookup_fun(ConfRootName, LookupFun) ->
374
    maps:fold(
3,635✔
375
        fun(Type, NameAndConf, Bridges) ->
376
            maps:fold(
1,906✔
377
                fun(Name, _RawConf, Acc) ->
378
                    [
893✔
379
                        begin
380
                            case LookupFun(Type, Name) of
893✔
381
                                {ok, BridgeInfo} ->
382
                                    BridgeInfo;
882✔
383
                                {error, not_bridge_v1_compatible} = Err ->
384
                                    %% Filtered out by the caller
385
                                    Err
11✔
386
                            end
387
                        end
388
                        | Acc
389
                    ]
390
                end,
391
                Bridges,
392
                NameAndConf
393
            )
394
        end,
395
        [],
396
        emqx:get_raw_config([ConfRootName], #{})
397
    ).
398

399
install_bridge_v2(
400
    _RootName,
401
    _BridgeType,
402
    _BridgeName,
403
    #{enable := false}
404
) ->
405
    ok;
74✔
406
install_bridge_v2(
407
    RootName,
408
    BridgeV2Type,
409
    BridgeName,
410
    Config
411
) ->
412
    install_bridge_v2_helper(
1,608✔
413
        RootName,
414
        BridgeV2Type,
415
        BridgeName,
416
        combine_connector_and_bridge_v2_config(
417
            BridgeV2Type,
418
            BridgeName,
419
            Config
420
        )
421
    ).
422

423
install_bridge_v2_helper(
424
    _RootName,
425
    _BridgeV2Type,
426
    _BridgeName,
427
    {error, Reason} = Error
428
) ->
429
    ?SLOG(warning, Reason),
4✔
430
    Error;
4✔
431
install_bridge_v2_helper(
432
    RootName,
433
    BridgeV2Type,
434
    BridgeName,
435
    #{connector := ConnectorName} = Config
436
) ->
437
    BridgeV2Id = id_with_root_name(RootName, BridgeV2Type, BridgeName, ConnectorName),
1,604✔
438
    CreationOpts = emqx_resource:fetch_creation_opts(Config),
1,604✔
439
    %% Create metrics for Bridge V2
440
    ok = emqx_resource:create_metrics(BridgeV2Id),
1,604✔
441
    %% We might need to create buffer workers for Bridge V2
442
    case get_query_mode(BridgeV2Type, Config) of
1,604✔
443
        %% the Bridge V2 has built-in buffer, so there is no need for resource workers
444
        simple_sync_internal_buffer ->
445
            ok;
24✔
446
        simple_async_internal_buffer ->
447
            ok;
80✔
448
        %% The Bridge V2 is a consumer Bridge V2, so there is no need for resource workers
449
        no_queries ->
450
            ok;
97✔
451
        _ ->
452
            %% start resource workers as the query type requires them
453
            ok = emqx_resource_buffer_worker_sup:start_workers(BridgeV2Id, CreationOpts)
1,403✔
454
    end,
455
    %% If there is a running connector, we need to install the Bridge V2 in it
456
    ConnectorId = emqx_connector_resource:resource_id(
1,604✔
457
        connector_type(BridgeV2Type), ConnectorName
458
    ),
459
    emqx_resource_manager:add_channel(
1,604✔
460
        ConnectorId,
461
        BridgeV2Id,
462
        augment_channel_config(
463
            RootName,
464
            BridgeV2Type,
465
            BridgeName,
466
            Config
467
        )
468
    ),
469
    ok.
1,600✔
470

471
augment_channel_config(
472
    ConfigRoot,
473
    BridgeV2Type,
474
    BridgeName,
475
    Config
476
) ->
477
    AugmentedConf = Config#{
5,139✔
478
        config_root => ConfigRoot,
479
        bridge_type => bin(BridgeV2Type),
480
        bridge_name => bin(BridgeName)
481
    },
482
    case emqx_action_info:is_source(BridgeV2Type) andalso ConfigRoot =:= ?ROOT_KEY_SOURCES of
5,139✔
483
        true ->
484
            BId = emqx_bridge_resource:bridge_id(BridgeV2Type, BridgeName),
592✔
485
            BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId),
592✔
486
            SourceHookpoint = source_hookpoint(BId),
592✔
487
            HookPoints = [BridgeHookpoint, SourceHookpoint],
592✔
488
            AugmentedConf#{hookpoints => HookPoints};
592✔
489
        false ->
490
            AugmentedConf
4,547✔
491
    end.
492

493
source_hookpoint(BridgeId) ->
494
    <<"$sources/", (bin(BridgeId))/binary>>.
594✔
495

496
uninstall_bridge_v2(
497
    _ConfRootKey,
498
    _BridgeType,
499
    _BridgeName,
500
    #{enable := false}
501
) ->
502
    %% Already not installed
503
    ok;
74✔
504
uninstall_bridge_v2(
505
    ConfRootKey,
506
    BridgeV2Type,
507
    BridgeName,
508
    #{connector := ConnectorName} = Config
509
) ->
510
    BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName),
1,608✔
511
    CreationOpts = emqx_resource:fetch_creation_opts(Config),
1,608✔
512
    ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
1,608✔
513
    ok = emqx_resource:clear_metrics(BridgeV2Id),
1,608✔
514
    case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of
1,608✔
515
        {error, _} ->
516
            ok;
5✔
517
        ok ->
518
            %% uninstall from connector
519
            ConnectorId = emqx_connector_resource:resource_id(
1,603✔
520
                connector_type(BridgeV2Type), ConnectorName
521
            ),
522
            emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id)
1,603✔
523
    end.
524

525
combine_connector_and_bridge_v2_config(
526
    BridgeV2Type,
527
    BridgeName,
528
    #{connector := ConnectorName} = BridgeV2Config
529
) ->
530
    ConnectorType = connector_type(BridgeV2Type),
69,193✔
531
    try emqx_config:get([connectors, ConnectorType, to_existing_atom(ConnectorName)]) of
69,193✔
532
        ConnectorConfig ->
533
            ConnectorCreationOpts = emqx_resource:fetch_creation_opts(ConnectorConfig),
69,189✔
534
            BridgeV2CreationOpts = emqx_resource:fetch_creation_opts(BridgeV2Config),
69,189✔
535
            CombinedCreationOpts = emqx_utils_maps:deep_merge(
69,189✔
536
                ConnectorCreationOpts,
537
                BridgeV2CreationOpts
538
            ),
539
            BridgeV2Config#{resource_opts => CombinedCreationOpts}
69,189✔
540
    catch
541
        _:_ ->
542
            alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName),
4✔
543
            {error, #{
4✔
544
                reason => <<"connector_not_found_or_wrong_type">>,
545
                bridge_type => BridgeV2Type,
546
                bridge_name => BridgeName,
547
                connector_name => ConnectorName
548
            }}
549
    end.
550

551
%%====================================================================
552
%% Operations
553
%%====================================================================
554
-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)).
555

556
-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
557
    {ok, any()} | {error, any()}.
558
disable_enable(Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) ->
559
    disable_enable(?ROOT_KEY_ACTIONS, Action, BridgeType, BridgeName).
×
560

561
disable_enable(ConfRootKey, Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) ->
562
    emqx_conf:update(
93✔
563
        [ConfRootKey, BridgeType, BridgeName],
564
        Action,
565
        #{override_to => cluster}
566
    ).
567

568
%% Manually start connector. This function can speed up reconnection when
569
%% waiting for auto reconnection. The function forwards the start request to
570
%% its connector. Returns ok if the status of the bridge is connected after
571
%% starting the connector. Returns {error, Reason} if the status of the bridge
572
%% is something else than connected after starting the connector or if an
573
%% error occurred when the connector was started.
574
-spec start(term(), term()) -> ok | {error, Reason :: term()}.
575
start(ActionOrSourceType, Name) ->
576
    start(?ROOT_KEY_ACTIONS, ActionOrSourceType, Name).
2✔
577

578
-spec start(root_cfg_key(), term(), term()) -> ok | {error, Reason :: term()}.
579
start(ConfRootKey, BridgeV2Type, Name) ->
580
    ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
69✔
581
        emqx_connector_resource:start(ConnectorType, ConnectorName)
69✔
582
    end,
583
    connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, true).
69✔
584

585
connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) ->
586
    connector_operation_helper_with_conf(
69✔
587
        ConfRootKey,
588
        BridgeV2Type,
589
        Name,
590
        lookup_conf(ConfRootKey, BridgeV2Type, Name),
591
        ConnectorOpFun,
592
        DoHealthCheck
593
    ).
594

595
connector_operation_helper_with_conf(
596
    _ConfRootKey,
597
    _BridgeV2Type,
598
    _Name,
599
    {error, _} = Error,
600
    _ConnectorOpFun,
601
    _DoHealthCheck
602
) ->
603
    Error;
×
604
connector_operation_helper_with_conf(
605
    _ConfRootKey,
606
    _BridgeV2Type,
607
    _Name,
608
    #{enable := false},
609
    _ConnectorOpFun,
610
    _DoHealthCheck
611
) ->
612
    ok;
×
613
connector_operation_helper_with_conf(
614
    ConfRootKey,
615
    BridgeV2Type,
616
    Name,
617
    #{connector := ConnectorName},
618
    ConnectorOpFun,
619
    DoHealthCheck
620
) ->
621
    ConnectorType = connector_type(BridgeV2Type),
300✔
622
    ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName),
300✔
623
    case {DoHealthCheck, ConnectorOpFunResult} of
300✔
624
        {false, _} ->
625
            ConnectorOpFunResult;
109✔
626
        {true, {error, Reason}} ->
627
            {error, Reason};
5✔
628
        {true, ok} ->
629
            case health_check(ConfRootKey, BridgeV2Type, Name) of
186✔
630
                #{status := connected} ->
631
                    ok;
177✔
632
                {error, Reason} ->
633
                    {error, Reason};
×
634
                #{status := Status, error := Reason} ->
635
                    Msg = io_lib:format(
9✔
636
                        "Connector started but bridge (~s:~s) is not connected. "
637
                        "Bridge Status: ~p, Error: ~p",
638
                        [bin(BridgeV2Type), bin(Name), Status, Reason]
639
                    ),
640
                    {error, iolist_to_binary(Msg)}
9✔
641
            end
642
    end.
643

644
-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}.
645
reset_metrics(Type, Name) ->
646
    reset_metrics(?ROOT_KEY_ACTIONS, Type, Name).
×
647

648
reset_metrics(ConfRootKey, Type, Name) ->
649
    reset_metrics_helper(ConfRootKey, Type, Name, lookup_conf(ConfRootKey, Type, Name)).
4✔
650

651
reset_metrics_helper(_ConfRootKey, _Type, _Name, #{enable := false}) ->
652
    ok;
×
653
reset_metrics_helper(ConfRootKey, BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
654
    ResourceId = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName),
4✔
655
    emqx_resource:reset_metrics(ResourceId);
4✔
656
reset_metrics_helper(_, _, _, _) ->
657
    {error, not_found}.
×
658

659
get_query_mode(BridgeV2Type, Config) ->
660
    CreationOpts = emqx_resource:fetch_creation_opts(Config),
69,189✔
661
    ConnectorType = connector_type(BridgeV2Type),
69,189✔
662
    ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
69,189✔
663
    emqx_resource:query_mode(ResourceType, Config, CreationOpts).
69,189✔
664

665
-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
666
    term() | {error, term()}.
667
query(BridgeType, BridgeName, Message, QueryOpts0) ->
668
    case lookup_conf(BridgeType, BridgeName) of
67,589✔
669
        #{enable := true} = Config0 ->
670
            Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
67,585✔
671
            do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
67,585✔
672
        #{enable := false} ->
673
            {error, bridge_stopped};
4✔
674
        _Error ->
675
            {error, bridge_not_found}
×
676
    end.
677

678
do_query_with_enabled_config(
679
    _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
680
) ->
681
    ?SLOG(warning, Reason),
×
682
    Error;
×
683
do_query_with_enabled_config(
684
    BridgeType, BridgeName, Message, QueryOpts0, Config
685
) ->
686
    QueryMode = get_query_mode(BridgeType, Config),
67,585✔
687
    ConnectorName = maps:get(connector, Config),
67,585✔
688
    ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeType),
67,585✔
689
    ConnectorResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
67,585✔
690
    QueryOpts = maps:merge(
67,585✔
691
        emqx_bridge:query_opts(Config),
692
        QueryOpts0#{
693
            connector_resource_id => ConnectorResId,
694
            query_mode => QueryMode
695
        }
696
    ),
697
    BridgeV2Id = id(BridgeType, BridgeName),
67,585✔
698
    case Message of
67,585✔
699
        {send_message, Msg} ->
700
            emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts);
67,494✔
701
        Msg ->
702
            emqx_resource:query(BridgeV2Id, Msg, QueryOpts)
91✔
703
    end.
704

705
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
706
    term() | {error, term()}.
707
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
708
    query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
67,486✔
709

710
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
711
    #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
712
health_check(BridgeType, BridgeName) ->
713
    health_check(?ROOT_KEY_ACTIONS, BridgeType, BridgeName).
259✔
714

715
health_check(ConfRootKey, BridgeType, BridgeName) ->
716
    case lookup_conf(ConfRootKey, BridgeType, BridgeName) of
445✔
717
        #{
718
            enable := true,
719
            connector := ConnectorName
720
        } ->
721
            ConnectorId = emqx_connector_resource:resource_id(
443✔
722
                connector_type(BridgeType), ConnectorName
723
            ),
724
            emqx_resource_manager:channel_health_check(
443✔
725
                ConnectorId, id_with_root_name(ConfRootKey, BridgeType, BridgeName, ConnectorName)
726
            );
727
        #{enable := false} ->
728
            {error, bridge_stopped};
×
729
        Error ->
730
            Error
2✔
731
    end.
732

733
-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
734
create_dry_run(Type, Conf) ->
735
    create_dry_run(?ROOT_KEY_ACTIONS, Type, Conf).
6✔
736

737
-spec create_dry_run(root_cfg_key(), bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
738
create_dry_run(ConfRootKey, Type, Conf0) ->
739
    Conf1 = maps:without([<<"name">>], Conf0),
73✔
740
    TypeBin = bin(Type),
73✔
741
    ConfRootKeyBin = bin(ConfRootKey),
73✔
742
    RawConf = #{ConfRootKeyBin => #{TypeBin => #{<<"temp_name">> => Conf1}}},
73✔
743
    %% Check config
744
    try
73✔
745
        _ =
73✔
746
            hocon_tconf:check_plain(
747
                emqx_bridge_v2_schema,
748
                RawConf,
749
                #{atom_key => true, required => false}
750
            ),
751
        #{<<"connector">> := ConnectorName} = Conf1,
73✔
752
        %% Check that the connector exists and do the dry run if it exists
753
        ConnectorType = connector_type(Type),
73✔
754
        case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
73✔
755
            not_found ->
756
                {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
2✔
757
            ConnectorRawConf ->
758
                create_dry_run_helper(
71✔
759
                    ensure_atom_root_key(ConfRootKey), Type, ConnectorRawConf, Conf1
760
                )
761
        end
762
    catch
763
        %% validation errors
764
        throw:Reason1 ->
765
            {error, Reason1}
×
766
    end.
767

768
create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
769
    BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
243✔
770
    ConnectorType = connector_type(BridgeV2Type),
243✔
771
    OnReadyCallback =
243✔
772
        fun(ConnectorId) ->
773
            {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
225✔
774
            ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName),
225✔
775
            BridgeV2Conf0 = fill_defaults(
225✔
776
                BridgeV2Type,
777
                BridgeV2RawConf,
778
                bin(ConfRootKey),
779
                emqx_bridge_v2_schema,
780
                #{make_serializable => false}
781
            ),
782
            BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0),
225✔
783
            AugmentedConf = augment_channel_config(
225✔
784
                ConfRootKey,
785
                BridgeV2Type,
786
                BridgeName,
787
                BridgeV2Conf
788
            ),
789
            case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
225✔
790
                {error, Reason} ->
791
                    {error, Reason};
×
792
                ok ->
793
                    HealthCheckResult = emqx_resource_manager:channel_health_check(
225✔
794
                        ConnectorId, ChannelTestId
795
                    ),
796
                    case HealthCheckResult of
225✔
797
                        #{status := connected} ->
798
                            ok;
212✔
799
                        #{status := Status, error := Error} ->
800
                            {error, {Status, Error}}
13✔
801
                    end
802
            end
803
        end,
804
    emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
243✔
805

806
-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics().
807
get_metrics(Type, Name) ->
808
    get_metrics(?ROOT_KEY_ACTIONS, Type, Name).
8✔
809

810
-spec get_metrics(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) ->
811
    emqx_metrics_worker:metrics().
812
get_metrics(ConfRootKey, Type, Name) ->
813
    emqx_resource:get_metrics(id_with_root_name(ConfRootKey, Type, Name)).
31✔
814

815
%%====================================================================
816
%% On message publish hook (for local topics)
817
%%====================================================================
818

819
%% The following functions are more or less copied from emqx_bridge.erl
820

821
reload_message_publish_hook(Bridges) ->
822
    ok = unload_message_publish_hook(),
3,393✔
823
    ok = load_message_publish_hook(Bridges).
3,393✔
824

825
load_message_publish_hook() ->
826
    Bridges = emqx:get_config([?ROOT_KEY_ACTIONS], #{}),
130✔
827
    load_message_publish_hook(Bridges).
130✔
828

829
load_message_publish_hook(Bridges) ->
830
    lists:foreach(
3,523✔
831
        fun({Type, Bridge}) ->
832
            lists:foreach(
3,250✔
833
                fun({_Name, BridgeConf}) ->
834
                    do_load_message_publish_hook(Type, BridgeConf)
1,954✔
835
                end,
836
                maps:to_list(Bridge)
837
            )
838
        end,
839
        maps:to_list(Bridges)
840
    ).
841

842
do_load_message_publish_hook(_Type, #{local_topic := LocalTopic}) when is_binary(LocalTopic) ->
843
    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
481✔
844
do_load_message_publish_hook(_Type, _Conf) ->
845
    ok.
1,473✔
846

847
unload_message_publish_hook() ->
848
    ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
3,486✔
849

850
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
851
    case maps:get(sys, Flags, false) of
768✔
852
        false ->
853
            {Msg, _} = emqx_rule_events:eventmsg_publish(Message),
233✔
854
            send_to_matched_egress_bridges(Topic, Msg);
233✔
855
        true ->
856
            ok
535✔
857
    end,
858
    {ok, Message}.
768✔
859

860
send_to_matched_egress_bridges(Topic, Msg) ->
861
    MatchedBridgeIds = get_matched_egress_bridges(Topic),
233✔
862
    lists:foreach(
233✔
863
        fun({Type, Name}) ->
864
            try send_message(Type, Name, Msg, #{}) of
99✔
865
                {error, Reason} ->
866
                    ?SLOG(error, #{
2✔
867
                        msg => "send_message_to_bridge_failed",
868
                        bridge_type => Type,
869
                        bridge_name => Name,
870
                        error => Reason
871
                    });
×
872
                _ ->
873
                    ok
97✔
874
            catch
875
                Err:Reason:ST ->
UNCOV
876
                    ?SLOG(error, #{
×
877
                        msg => "send_message_to_bridge_exception",
878
                        bridge_type => Type,
879
                        bridge_name => Name,
880
                        error => Err,
881
                        reason => Reason,
882
                        stacktrace => ST
883
                    })
×
884
            end
885
        end,
886
        MatchedBridgeIds
887
    ).
888

889
get_matched_egress_bridges(Topic) ->
890
    Bridges = emqx:get_config([?ROOT_KEY_ACTIONS], #{}),
233✔
891
    maps:fold(
233✔
892
        fun(BType, Conf, Acc0) ->
893
            maps:fold(
233✔
894
                fun(BName, BConf, Acc1) ->
895
                    get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
235✔
896
                end,
897
                Acc0,
898
                Conf
899
            )
900
        end,
901
        [],
902
        Bridges
903
    ).
904

905
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
906
    Acc;
4✔
907
get_matched_bridge_id(BType, Conf, Topic, BName, Acc) ->
908
    case maps:get(local_topic, Conf, undefined) of
231✔
909
        undefined ->
910
            Acc;
2✔
911
        Filter ->
912
            do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
229✔
913
    end.
914

915
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
916
    case emqx_topic:match(Topic, Filter) of
229✔
917
        true -> [{BType, BName} | Acc];
99✔
918
        false -> Acc
130✔
919
    end.
920

921
%%====================================================================
922
%% Convenience functions for connector implementations
923
%%====================================================================
924

925
parse_id(Id) ->
926
    case binary:split(Id, <<":">>, [global]) of
113✔
927
        [Type, Name] ->
928
            #{kind => undefined, type => Type, name => Name};
×
929
        [<<"action">>, Type, Name | _] ->
930
            #{kind => action, type => Type, name => Name};
113✔
931
        [<<"source">>, Type, Name | _] ->
932
            #{kind => source, type => Type, name => Name};
×
933
        _X ->
934
            error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))})
×
935
    end.
936

937
get_channels_for_connector(ConnectorId) ->
938
    Actions = get_channels_for_connector(?ROOT_KEY_ACTIONS, ConnectorId),
11,798✔
939
    Sources = get_channels_for_connector(?ROOT_KEY_SOURCES, ConnectorId),
11,798✔
940
    Actions ++ Sources.
11,798✔
941

942
get_channels_for_connector(SourcesOrActions, ConnectorId) ->
943
    try emqx_connector_resource:parse_connector_id(ConnectorId) of
23,596✔
944
        {ConnectorType, ConnectorName} ->
945
            RootConf = maps:keys(emqx:get_config([SourcesOrActions], #{})),
23,444✔
946
            RelevantBridgeV2Types = [
23,444✔
947
                Type
10,441✔
948
             || Type <- RootConf,
23,444✔
949
                connector_type(Type) =:= ConnectorType
11,058✔
950
            ],
951
            lists:flatten([
23,444✔
952
                get_channels_for_connector(SourcesOrActions, ConnectorName, BridgeV2Type)
10,441✔
953
             || BridgeV2Type <- RelevantBridgeV2Types
23,444✔
954
            ])
955
    catch
956
        _:_ ->
957
            %% ConnectorId is not a valid connector id so we assume the connector
958
            %% has no channels (e.g. it is a a connector for authn or authz)
959
            []
152✔
960
    end.
961

962
get_channels_for_connector(SourcesOrActions, ConnectorName, BridgeV2Type) ->
963
    BridgeV2s = emqx:get_config([SourcesOrActions, BridgeV2Type], #{}),
10,441✔
964
    [
10,441✔
965
        {
3,310✔
966
            id_with_root_name(SourcesOrActions, BridgeV2Type, Name, ConnectorName),
967
            augment_channel_config(SourcesOrActions, BridgeV2Type, Name, Conf)
968
        }
969
     || {Name, Conf} <- maps:to_list(BridgeV2s),
10,441✔
970
        bin(ConnectorName) =:= maps:get(connector, Conf, no_name)
3,760✔
971
    ].
972

973
%%====================================================================
974
%% ID related functions
975
%%====================================================================
976

977
id(BridgeType, BridgeName) ->
978
    id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName).
67,760✔
979

980
id(BridgeType, BridgeName, ConnectorName) ->
981
    id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, ConnectorName).
311✔
982

983
source_id(BridgeType, BridgeName, ConnectorName) ->
984
    id_with_root_name(?ROOT_KEY_SOURCES, BridgeType, BridgeName, ConnectorName).
117✔
985

986
id_with_root_name(RootName, BridgeType, BridgeName) ->
987
    case lookup_conf(RootName, BridgeType, BridgeName) of
67,791✔
988
        #{connector := ConnectorName} ->
989
            id_with_root_name(RootName, BridgeType, BridgeName, ConnectorName);
67,791✔
990
        {error, Reason} ->
991
            throw(
×
992
                {action_source_not_found, #{
993
                    reason => Reason,
994
                    root_name => RootName,
995
                    type => BridgeType,
996
                    name => BridgeName
997
                }}
998
            )
999
    end.
1000

1001
id_with_root_name(RootName0, BridgeType, BridgeName, ConnectorName) ->
1002
    RootName =
77,144✔
1003
        case bin(RootName0) of
1004
            <<"actions">> -> <<"action">>;
76,083✔
1005
            <<"sources">> -> <<"source">>
1,061✔
1006
        end,
1007
    ConnectorType = bin(connector_type(BridgeType)),
77,144✔
1008
    <<
77,144✔
1009
        (bin(RootName))/binary,
1010
        ":",
1011
        (bin(BridgeType))/binary,
1012
        ":",
1013
        (bin(BridgeName))/binary,
1014
        ":connector:",
1015
        (bin(ConnectorType))/binary,
1016
        ":",
1017
        (bin(ConnectorName))/binary
1018
    >>.
1019

1020
connector_type(Type) ->
1021
    %% remote call so it can be mocked
1022
    ?MODULE:bridge_v2_type_to_connector_type(Type).
241,204✔
1023

1024
bridge_v2_type_to_connector_type(Type) ->
1025
    emqx_action_info:action_type_to_connector_type(Type).
243,284✔
1026

1027
%%====================================================================
1028
%% Data backup API
1029
%%====================================================================
1030

1031
import_config(RawConf) ->
1032
    %% actions structure
1033
    ActionRes = emqx_bridge:import_config(
22✔
1034
        RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()
1035
    ),
1036
    SourceRes = emqx_bridge:import_config(
22✔
1037
        RawConf, <<"sources">>, ?ROOT_KEY_SOURCES, config_key_path_sources()
1038
    ),
1039
    group_import_results([ActionRes, SourceRes]).
22✔
1040

1041
group_import_results(Results0) ->
1042
    Results = lists:foldr(
22✔
1043
        fun
1044
            ({ok, OkRes}, {OkAcc, ErrAcc}) ->
1045
                {[OkRes | OkAcc], ErrAcc};
44✔
1046
            ({error, ErrRes}, {OkAcc, ErrAcc}) ->
1047
                {OkAcc, [ErrRes | ErrAcc]}
×
1048
        end,
1049
        {[], []},
1050
        Results0
1051
    ),
1052
    {results, Results}.
22✔
1053

1054
%%====================================================================
1055
%% Config Update Handler API
1056
%%====================================================================
1057

1058
config_key_path() ->
1059
    [?ROOT_KEY_ACTIONS].
245✔
1060

1061
config_key_path_leaf() ->
1062
    [?ROOT_KEY_ACTIONS, '?', '?'].
223✔
1063

1064
config_key_path_sources() ->
1065
    [?ROOT_KEY_SOURCES].
152✔
1066

1067
config_key_path_leaf_sources() ->
1068
    [?ROOT_KEY_SOURCES, '?', '?'].
130✔
1069

1070
%% enable or disable action
1071
pre_config_update([ConfRootKey, _Type, _Name], Oper, undefined) when
1072
    ?ENABLE_OR_DISABLE(Oper) andalso
1073
        (ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES)
1074
->
1075
    {error, bridge_not_found};
2✔
1076
pre_config_update([ConfRootKey, _Type, _Name], Oper, OldAction) when
1077
    ?ENABLE_OR_DISABLE(Oper) andalso
1078
        (ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES)
1079
->
1080
    {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}};
96✔
1081
%% Updates a single action from a specific HTTP API.
1082
%% If the connector is not found, the update operation fails.
1083
pre_config_update([ConfRootKey, Type, Name], Conf = #{}, _OldConf) when
1084
    ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES
1085
->
1086
    convert_from_connector(ConfRootKey, Type, Name, Conf);
1,561✔
1087
%% Batch updates actions when importing a configuration or executing a CLI command.
1088
%% Update succeeded even if the connector is not found, alarm in post_config_update
1089
pre_config_update([ConfRootKey], Conf = #{}, _OldConfs) when
1090
    ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES
1091
->
1092
    {ok, convert_from_connectors(ConfRootKey, Conf)}.
290✔
1093

1094
%% This top level handler will be triggered when the actions path is updated
1095
%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
1096
post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when
1097
    ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
1098
->
1099
    #{added := Added, removed := Removed, changed := Updated} =
290✔
1100
        diff_confs(NewConf, OldConf),
1101
    RemoveFun = fun(Type, Name, Conf) ->
290✔
1102
        uninstall_bridge_v2(ConfRootKey, Type, Name, Conf)
3✔
1103
    end,
1104
    CreateFun = fun(Type, Name, Conf) ->
290✔
1105
        install_bridge_v2(ConfRootKey, Type, Name, Conf)
37✔
1106
    end,
1107
    UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
290✔
1108
        uninstall_bridge_v2(ConfRootKey, Type, Name, OldBridgeConf),
2✔
1109
        install_bridge_v2(ConfRootKey, Type, Name, Conf)
2✔
1110
    end,
1111
    Result = perform_bridge_changes([
290✔
1112
        #{action => RemoveFun, action_name => remove, data => Removed},
1113
        #{
1114
            action => CreateFun,
1115
            action_name => create,
1116
            data => Added,
1117
            on_exception_fn => fun emqx_bridge_resource:remove/4
1118
        },
1119
        #{action => UpdateFun, action_name => update, data => Updated}
1120
    ]),
1121
    reload_message_publish_hook(NewConf),
290✔
1122
    ?tp(bridge_post_config_update_done, #{}),
290✔
1123
    Result;
290✔
1124
%% Don't crash even when the bridge is not found
1125
post_config_update([ConfRootKey, Type, Name], '$remove', _, _OldConf, _AppEnvs) when
1126
    ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
1127
->
1128
    AllBridges = emqx:get_config([ConfRootKey]),
1,512✔
1129
    case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of
1,512✔
1130
        undefined ->
1131
            ok;
8✔
1132
        Action ->
1133
            ok = uninstall_bridge_v2(ConfRootKey, Type, Name, Action),
1,504✔
1134
            Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges),
1,469✔
1135
            reload_message_publish_hook(Bridges)
1,469✔
1136
    end,
1137
    ?tp(bridge_post_config_update_done, #{}),
1,477✔
1138
    ok;
1,477✔
1139
%% Create a single bridge fails if the connector is not found (already checked in pre_config_update)
1140
post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) when
1141
    ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
1142
->
1143
    ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf),
1,480✔
1144
    Bridges = emqx_utils_maps:deep_put(
1,476✔
1145
        [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf
1146
    ),
1147
    reload_message_publish_hook(Bridges),
1,476✔
1148
    ?tp(bridge_post_config_update_done, #{}),
1,476✔
1149
    ok;
1,476✔
1150
%% update bridges fails if the connector is not found (already checked in pre_config_update)
1151
post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when
1152
    ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
1153
->
1154
    ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf),
158✔
1155
    ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf),
158✔
1156
    Bridges = emqx_utils_maps:deep_put(
158✔
1157
        [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf
1158
    ),
1159
    reload_message_publish_hook(Bridges),
158✔
1160
    ?tp(bridge_post_config_update_done, #{}),
158✔
1161
    ok.
158✔
1162

1163
diff_confs(NewConfs, OldConfs) ->
1164
    emqx_utils_maps:diff_maps(
298✔
1165
        flatten_confs(NewConfs),
1166
        flatten_confs(OldConfs)
1167
    ).
1168

1169
flatten_confs(Conf0) ->
1170
    maps:from_list(
596✔
1171
        lists:flatmap(
1172
            fun({Type, Conf}) ->
1173
                do_flatten_confs(Type, Conf)
241✔
1174
            end,
1175
            maps:to_list(Conf0)
1176
        )
1177
    ).
1178

1179
do_flatten_confs(Type, Conf0) ->
1180
    [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
241✔
1181

1182
perform_bridge_changes(Tasks) ->
1183
    perform_bridge_changes(Tasks, []).
290✔
1184

1185
perform_bridge_changes([], Errors) ->
1186
    case Errors of
290✔
1187
        [] -> ok;
288✔
1188
        _ -> {error, Errors}
2✔
1189
    end;
1190
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
1191
    OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
870✔
1192
    Results = emqx_utils:pmap(
870✔
1193
        fun({{Type, Name}, Conf}) ->
1194
            Res =
42✔
1195
                try
1196
                    Action(Type, Name, Conf)
42✔
1197
                catch
1198
                    Kind:Error:Stacktrace ->
1199
                        ?SLOG(error, #{
×
1200
                            msg => "bridge_config_update_exception",
1201
                            kind => Kind,
1202
                            error => Error,
1203
                            type => Type,
1204
                            name => Name,
1205
                            stacktrace => Stacktrace
1206
                        }),
×
1207
                        OnException(Type, Name, Conf),
×
1208
                        {error, Error}
×
1209
                end,
1210
            {{Type, Name}, Res}
42✔
1211
        end,
1212
        maps:to_list(MapConfs),
1213
        infinity
1214
    ),
1215
    Errs = lists:filter(
870✔
1216
        fun
1217
            ({_TypeName, {error, _}}) -> true;
2✔
1218
            (_) -> false
40✔
1219
        end,
1220
        Results
1221
    ),
1222
    Errors =
870✔
1223
        case Errs of
1224
            [] ->
1225
                Errors0;
868✔
1226
            _ ->
1227
                #{action_name := ActionName} = Task,
2✔
1228
                [#{action => ActionName, errors => Errs} | Errors0]
2✔
1229
        end,
1230
    perform_bridge_changes(Tasks, Errors).
870✔
1231

1232
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
1233
    fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}).
2,032✔
1234

1235
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) ->
1236
    PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
2,257✔
1237
    FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts),
2,257✔
1238
    unpack_bridge_conf(Type, FullConf, TopLevelConf).
2,257✔
1239

1240
pack_bridge_conf(Type, RawConf, TopLevelConf) ->
1241
    #{TopLevelConf => #{bin(Type) => #{<<"foo">> => RawConf}}}.
2,257✔
1242

1243
unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
1244
    TypeBin = bin(Type),
2,257✔
1245
    #{TopLevelConf := Bridges} = PackedConf,
2,257✔
1246
    #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
2,257✔
1247
    RawConf.
2,257✔
1248

1249
%%====================================================================
1250
%% Compatibility API
1251
%%====================================================================
1252

1253
%% Check if the bridge can be converted to a valid bridge v1
1254
%%
1255
%% * The corresponding bridge v2 should exist
1256
%% * The connector for the bridge v2 should have exactly one channel
1257
bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
1258
    bridge_v1_is_valid(?ROOT_KEY_ACTIONS, BridgeV1Type, BridgeName).
519✔
1259

1260
%% rabbitmq's source don't have v1 version. but action has v1 version...
1261
%% There's no good way to distinguish it, so it has to be hardcoded here.
1262
bridge_v1_is_valid(?ROOT_KEY_SOURCES, rabbitmq, _BridgeName) ->
1263
    false;
2✔
1264
bridge_v1_is_valid(ConfRootKey, BridgeV1Type, BridgeName) ->
1265
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
1,544✔
1266
    case lookup_conf(ConfRootKey, BridgeV2Type, BridgeName) of
1,544✔
1267
        {error, _} ->
1268
            %% If the bridge v2 does not exist, it is a valid bridge v1
1269
            true;
36✔
1270
        #{connector := ConnectorName} ->
1271
            ConnectorType = connector_type(BridgeV2Type),
1,508✔
1272
            ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
1,508✔
1273
            case emqx_resource:get_channels(ConnectorResourceId) of
1,508✔
1274
                {ok, [_Channel]} -> true;
1,482✔
1275
                %% not_found, [], [_|_]
1276
                _ -> false
26✔
1277
            end
1278
    end.
1279

1280
bridge_v1_type_to_bridge_v2_type(Type) ->
1281
    emqx_action_info:bridge_v1_type_to_action_type(Type).
78,447✔
1282

1283
bridge_v2_type_to_bridge_v1_type(ActionType, ActionConf) ->
1284
    emqx_action_info:action_type_to_bridge_v1_type(ActionType, ActionConf).
1,563✔
1285

1286
is_bridge_v2_type(Type) ->
1287
    emqx_action_info:is_action_type(Type).
72,549✔
1288

1289
bridge_v1_list_and_transform() ->
1290
    BridgesFromActions0 = list_with_lookup_fun(
1,225✔
1291
        ?ROOT_KEY_ACTIONS,
1292
        fun bridge_v1_lookup_and_transform/2
1293
    ),
1294
    BridgesFromActions1 = [
1,225✔
1295
        B
540✔
1296
     || B <- BridgesFromActions0,
1,225✔
1297
        B =/= not_bridge_v1_compatible_error()
549✔
1298
    ],
1299
    FromActionsNames = maps:from_keys([Name || #{name := Name} <- BridgesFromActions1], true),
1,225✔
1300
    BridgesFromSources0 = list_with_lookup_fun(
1,225✔
1301
        ?ROOT_KEY_SOURCES,
1302
        fun bridge_v1_lookup_and_transform/2
1303
    ),
1304
    BridgesFromSources1 = [
1,225✔
1305
        B
5✔
1306
     || #{name := SourceBridgeName} = B <- BridgesFromSources0,
1,225✔
1307
        B =/= not_bridge_v1_compatible_error(),
5✔
1308
        %% Action is only shown in case of name conflict
1309
        not maps:is_key(SourceBridgeName, FromActionsNames)
5✔
1310
    ],
1311
    BridgesFromActions1 ++ BridgesFromSources1.
1,225✔
1312

1313
bridge_v1_lookup_and_transform(ActionType, Name) ->
1314
    case lookup_in_actions_or_sources(ActionType, Name) of
1,362✔
1315
        {ok, ConfRootKey,
1316
            #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
1317
            BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
1,032✔
1318
            HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType),
1,032✔
1319
            case
1,032✔
1320
                HasBridgeV1Equivalent andalso
5✔
1321
                    ?MODULE:bridge_v1_is_valid(ConfRootKey, BridgeV1Type, Name)
1,027✔
1322
            of
1323
                true ->
1324
                    ConnectorType = connector_type(ActionType),
1,016✔
1325
                    case emqx_connector:lookup(ConnectorType, ConnectorName) of
1,016✔
1326
                        {ok, Connector} ->
1327
                            bridge_v1_lookup_and_transform_helper(
1,016✔
1328
                                ConfRootKey,
1329
                                BridgeV1Type,
1330
                                Name,
1331
                                ActionType,
1332
                                ActionConfig,
1333
                                ConnectorType,
1334
                                Connector
1335
                            );
1336
                        Error ->
1337
                            Error
×
1338
                    end;
1339
                false ->
1340
                    not_bridge_v1_compatible_error()
16✔
1341
            end;
1342
        Error ->
1343
            Error
330✔
1344
    end.
1345

1346
lookup_in_actions_or_sources(ActionType, Name) ->
1347
    case lookup(?ROOT_KEY_ACTIONS, ActionType, Name) of
1,362✔
1348
        {error, not_found} ->
1349
            case lookup(?ROOT_KEY_SOURCES, ActionType, Name) of
394✔
1350
                {ok, SourceInfo} ->
1351
                    {ok, ?ROOT_KEY_SOURCES, SourceInfo};
64✔
1352
                Error ->
1353
                    Error
330✔
1354
            end;
1355
        {ok, ActionInfo} ->
1356
            {ok, ?ROOT_KEY_ACTIONS, ActionInfo}
968✔
1357
    end.
1358

1359
not_bridge_v1_compatible_error() ->
1360
    {error, not_bridge_v1_compatible}.
570✔
1361

1362
has_bridge_v1_equivalent(ActionType) ->
1363
    case emqx_action_info:bridge_v1_type_name(ActionType) of
1,032✔
1364
        {ok, _} -> true;
1,027✔
1365
        {error, no_v1_equivalent} -> false
5✔
1366
    end.
1367

1368
connector_raw_config(Connector, ConnectorType) ->
1369
    get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
1,016✔
1370

1371
action_raw_config(ConfRootName, Action, ActionType) ->
1372
    get_raw_with_defaults(Action, ActionType, bin(ConfRootName), emqx_bridge_v2_schema).
1,016✔
1373

1374
get_raw_with_defaults(Config, Type, TopLevelConf, SchemaModule) ->
1375
    RawConfig = maps:get(raw_config, Config),
2,032✔
1376
    fill_defaults(Type, RawConfig, TopLevelConf, SchemaModule).
2,032✔
1377

1378
bridge_v1_lookup_and_transform_helper(
1379
    ConfRootName, BridgeV1Type, BridgeName, ActionType, Action, ConnectorType, Connector
1380
) ->
1381
    ConnectorRawConfig = connector_raw_config(Connector, ConnectorType),
1,016✔
1382
    ActionRawConfig = action_raw_config(ConfRootName, Action, ActionType),
1,016✔
1383
    BridgeV1Config = emqx_action_info:connector_action_config_to_bridge_v1_config(
1,016✔
1384
        BridgeV1Type, ConnectorRawConfig, ActionRawConfig
1385
    ),
1386
    BridgeV1Tmp = maps:put(raw_config, BridgeV1Config, Action),
1,016✔
1387
    BridgeV1 = maps:remove(status, BridgeV1Tmp),
1,016✔
1388
    BridgeV2Status = maps:get(status, Action, undefined),
1,016✔
1389
    BridgeV2Error = maps:get(error, Action, undefined),
1,016✔
1390
    ResourceData1 = maps:get(resource_data, BridgeV1, #{}),
1,016✔
1391
    %% Replace id in resource data
1392
    BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
1,016✔
1393
    ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1),
1,016✔
1394
    ConnectorStatus = maps:get(status, ResourceData2, undefined),
1,016✔
1395
    case ConnectorStatus of
1,016✔
1396
        connected ->
1397
            case BridgeV2Status of
800✔
1398
                connected ->
1399
                    %% No need to modify the status
1400
                    {ok, BridgeV1#{resource_data => ResourceData2}};
758✔
1401
                NotConnected ->
1402
                    ResourceData3 = maps:put(status, NotConnected, ResourceData2),
42✔
1403
                    ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3),
42✔
1404
                    BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1),
42✔
1405
                    {ok, BridgeV1Final}
42✔
1406
            end;
1407
        _ ->
1408
            %% No need to modify the status
1409
            {ok, BridgeV1#{resource_data => ResourceData2}}
216✔
1410
    end.
1411

1412
lookup_conf(Type, Name) ->
1413
    lookup_conf(?ROOT_KEY_ACTIONS, Type, Name).
67,589✔
1414

1415
lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(Type, Name) ->
1416
    LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, Type, Name),
3,035✔
1417
    LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, Type, Name),
3,035✔
1418
    case {LookUpConfActions, LookUpConfSources} of
3,035✔
1419
        {{error, bridge_not_found}, {error, bridge_not_found}} ->
1420
            {error, bridge_not_found};
1,525✔
1421
        {{error, bridge_not_found}, Conf} ->
1422
            Conf;
29✔
1423
        {Conf, {error, bridge_not_found}} ->
1424
            Conf;
1,481✔
1425
        {_Conf1, _Conf2} ->
1426
            {error, name_conflict_sources_actions}
×
1427
    end.
1428

1429
is_only_source(BridgeType, BridgeName) ->
1430
    LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, BridgeType, BridgeName),
159✔
1431
    LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, BridgeType, BridgeName),
159✔
1432
    case {LookUpConfActions, LookUpConfSources} of
159✔
1433
        {{error, bridge_not_found}, {error, bridge_not_found}} ->
1434
            false;
×
1435
        {{error, bridge_not_found}, _Conf} ->
1436
            true;
10✔
1437
        {_Conf, {error, bridge_not_found}} ->
1438
            false;
149✔
1439
        {_Conf1, _Conf2} ->
1440
            false
×
1441
    end.
1442

1443
get_conf_root_key_if_only_one(BridgeType, BridgeName) ->
1444
    LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, BridgeType, BridgeName),
2,146✔
1445
    LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, BridgeType, BridgeName),
2,146✔
1446
    case {LookUpConfActions, LookUpConfSources} of
2,146✔
1447
        {{error, bridge_not_found}, {error, bridge_not_found}} ->
1448
            error({action_or_source_not_found, BridgeType, BridgeName});
×
1449
        {{error, bridge_not_found}, _Conf} ->
1450
            ?ROOT_KEY_SOURCES;
72✔
1451
        {_Conf, {error, bridge_not_found}} ->
1452
            ?ROOT_KEY_ACTIONS;
2,074✔
1453
        {_Conf1, _Conf2} ->
1454
            error({name_clash_action_source, BridgeType, BridgeName})
×
1455
    end.
1456

1457
lookup_conf(RootName, Type, Name) ->
1458
    case emqx:get_config([RootName, Type, Name], not_found) of
148,707✔
1459
        not_found ->
1460
            {error, bridge_not_found};
6,903✔
1461
        Config ->
1462
            Config
141,804✔
1463
    end.
1464

1465
bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
1466
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
1,241✔
1467
    %% Check if the bridge v2 exists
1468
    case lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(BridgeV2Type, BridgeName) of
1,241✔
1469
        {error, _} ->
1470
            %% If the bridge v2 does not exist, it is a valid bridge v1
1471
            PreviousRawConf = undefined,
1,149✔
1472
            split_bridge_v1_config_and_create_helper(
1,149✔
1473
                BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
1,136✔
1474
            );
1475
        _Conf ->
1476
            case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
92✔
1477
                true ->
1478
                    %% Using remove + create as update, hence do not delete deps.
1479
                    RemoveDeps = [],
92✔
1480
                    ConfRootKey = get_conf_root_key_if_only_one(BridgeV2Type, BridgeName),
92✔
1481
                    PreviousRawConf = emqx:get_raw_config(
92✔
1482
                        [ConfRootKey, BridgeV2Type, BridgeName], undefined
1483
                    ),
1484
                    %% To avoid losing configurations. We have to make sure that no crash occurs
1485
                    %% during deletion and creation of configurations.
1486
                    PreCreateFun = fun() ->
92✔
1487
                        bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
84✔
1488
                    end,
1489
                    split_bridge_v1_config_and_create_helper(
92✔
1490
                        BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
1491
                    );
1492
                false ->
1493
                    %% If the bridge v2 exists, it is not a valid bridge v1
1494
                    {error, non_compatible_bridge_v2_exists}
×
1495
            end
1496
    end.
1497

1498
split_bridge_v1_config_and_create_helper(
1499
    BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
1500
) ->
1501
    try
1,241✔
1502
        #{
1,241✔
1503
            connector_type := ConnectorType,
1504
            connector_name := NewConnectorName,
1505
            connector_conf := NewConnectorRawConf,
1506
            bridge_v2_type := BridgeType,
1507
            bridge_v2_name := BridgeName,
1508
            bridge_v2_conf := NewBridgeV2RawConf,
1509
            conf_root_key := ConfRootName
1510
        } = split_and_validate_bridge_v1_config(
1511
            BridgeV1Type,
1512
            BridgeName,
1513
            RawConf,
1514
            PreviousRawConf
1515
        ),
1516
        _ = PreCreateFun(),
1,220✔
1517

1518
        do_connector_and_bridge_create(
1,220✔
1519
            ConfRootName,
1520
            ConnectorType,
1521
            NewConnectorName,
1522
            NewConnectorRawConf,
1523
            BridgeType,
1524
            BridgeName,
1525
            NewBridgeV2RawConf,
1526
            RawConf
1527
        )
1528
    catch
1529
        throw:Reason ->
1530
            {error, Reason}
21✔
1531
    end.
1532

1533
do_connector_and_bridge_create(
1534
    ConfRootName,
1535
    ConnectorType,
1536
    NewConnectorName,
1537
    NewConnectorRawConf,
1538
    BridgeType,
1539
    BridgeName,
1540
    NewBridgeV2RawConf,
1541
    RawConf
1542
) ->
1543
    case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
1,220✔
1544
        {ok, _} ->
1545
            case create(ConfRootName, BridgeType, BridgeName, NewBridgeV2RawConf) of
1,211✔
1546
                {ok, _} = Result ->
1547
                    Result;
1,207✔
1548
                {error, Reason1} ->
1549
                    case emqx_connector:remove(ConnectorType, NewConnectorName) of
4✔
1550
                        ok ->
1551
                            {error, Reason1};
4✔
1552
                        {error, Reason2} ->
1553
                            ?SLOG(warning, #{
×
1554
                                message => failed_to_remove_connector,
1555
                                bridge_version => 2,
1556
                                bridge_type => BridgeType,
1557
                                bridge_name => BridgeName,
1558
                                bridge_raw_config => emqx_utils:redact(RawConf)
1559
                            }),
×
1560
                            {error, Reason2}
×
1561
                    end
1562
            end;
1563
        Error ->
1564
            Error
8✔
1565
    end.
1566

1567
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
1568
    %% Create fake global config for the transformation and then call
1569
    %% `emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1'
1570
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
1,417✔
1571
    ConnectorType = connector_type(BridgeV2Type),
1,417✔
1572
    %% Needed to avoid name conflicts
1573
    CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
1,417✔
1574
    FakeGlobalConfig0 = #{
1,417✔
1575
        <<"connectors">> => CurrentConnectorsConfig,
1576
        <<"bridges">> => #{
1577
            bin(BridgeV1Type) => #{
1578
                bin(BridgeName) => RawConf
1579
            }
1580
        }
1581
    },
1582
    ConfRootKeyPrevRawConf =
1,417✔
1583
        case PreviousRawConf =/= undefined of
1584
            true -> get_conf_root_key_if_only_one(BridgeV2Type, BridgeName);
92✔
1585
            false -> not_used
1,325✔
1586
        end,
1587
    FakeGlobalConfig =
1,417✔
1588
        emqx_utils_maps:put_if(
1589
            FakeGlobalConfig0,
1590
            bin(ConfRootKeyPrevRawConf),
1591
            #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
1592
            PreviousRawConf =/= undefined
1593
        ),
1594
    %% [FIXME] this will loop through all connector types, instead pass the
1595
    %% connector type and just do it for that one
1596
    Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
1,417✔
1597
        FakeGlobalConfig
1598
    ),
1599
    ConfRootKey = get_conf_root_key(Output),
1,403✔
1600
    NewBridgeV2RawConf =
1,403✔
1601
        emqx_utils_maps:deep_get(
1602
            [
1603
                ConfRootKey,
1604
                bin(BridgeV2Type),
1605
                bin(BridgeName)
1606
            ],
1607
            Output
1608
        ),
1609
    ConnectorName = emqx_utils_maps:deep_get(
1,403✔
1610
        [
1611
            ConfRootKey,
1612
            bin(BridgeV2Type),
1613
            bin(BridgeName),
1614
            <<"connector">>
1615
        ],
1616
        Output
1617
    ),
1618
    NewConnectorRawConf =
1,403✔
1619
        emqx_utils_maps:deep_get(
1620
            [
1621
                <<"connectors">>,
1622
                bin(ConnectorType),
1623
                bin(ConnectorName)
1624
            ],
1625
            Output
1626
        ),
1627
    %% Validate the connector config and the bridge_v2 config
1628
    NewFakeConnectorConfig = #{
1,403✔
1629
        <<"connectors">> => #{
1630
            bin(ConnectorType) => #{
1631
                bin(ConnectorName) => NewConnectorRawConf
1632
            }
1633
        }
1634
    },
1635
    NewFakeBridgeV2Config = #{
1,403✔
1636
        ConfRootKey => #{
1637
            bin(BridgeV2Type) => #{
1638
                bin(BridgeName) => NewBridgeV2RawConf
1639
            }
1640
        }
1641
    },
1642
    try
1,403✔
1643
        _ = hocon_tconf:check_plain(
1,403✔
1644
            emqx_connector_schema,
1645
            NewFakeConnectorConfig,
1646
            #{atom_key => false, required => false}
1647
        ),
1648
        _ = hocon_tconf:check_plain(
1,395✔
1649
            emqx_bridge_v2_schema,
1650
            NewFakeBridgeV2Config,
1651
            #{atom_key => false, required => false}
1652
        )
1653
    of
1654
        _ ->
1655
            #{
1,392✔
1656
                connector_type => ConnectorType,
1657
                connector_name => ConnectorName,
1658
                connector_conf => NewConnectorRawConf,
1659
                bridge_v2_type => BridgeV2Type,
1660
                bridge_v2_name => BridgeName,
1661
                bridge_v2_conf => NewBridgeV2RawConf,
1662
                conf_root_key => ConfRootKey
1663
            }
1664
    catch
1665
        %% validation errors
1666
        throw:{_Module, [Reason1 | _]} ->
1667
            throw(Reason1);
11✔
1668
        throw:Reason1 ->
1669
            throw(Reason1)
×
1670
    end.
1671

1672
get_conf_root_key(#{<<"actions">> := _}) ->
1673
    <<"actions">>;
1,293✔
1674
get_conf_root_key(#{<<"sources">> := _}) ->
1675
    <<"sources">>;
110✔
1676
get_conf_root_key(_NoMatch) ->
1677
    error({incompatible_bridge_v1, no_action_or_source}).
×
1678

1679
bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
1680
    RawConf = maps:without([<<"name">>], RawConfig0),
176✔
1681
    TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
176✔
1682
    PreviousRawConf = undefined,
176✔
1683
    try
176✔
1684
        #{
176✔
1685
            conf_root_key := ConfRootKey,
1686
            connector_type := _ConnectorType,
1687
            connector_name := _NewConnectorName,
1688
            connector_conf := ConnectorRawConf,
1689
            bridge_v2_type := BridgeV2Type,
1690
            bridge_v2_name := _BridgeName,
1691
            bridge_v2_conf := BridgeV2RawConf0
1692
        } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
1693
        BridgeV2RawConf = emqx_action_info:action_convert_from_connector(
172✔
1694
            BridgeType, ConnectorRawConf, BridgeV2RawConf0
1695
        ),
1696
        create_dry_run_helper(
172✔
1697
            ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf
1698
        )
1699
    catch
1700
        throw:Reason ->
1701
            {error, Reason}
4✔
1702
    end.
1703

1704
%% Only called by test cases (may create broken references)
1705
bridge_v1_remove(BridgeV1Type, BridgeName) ->
1706
    ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
1,267✔
1707
    bridge_v1_remove(
1,267✔
1708
        ActionType,
1709
        BridgeName,
1710
        lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(ActionType, BridgeName)
1711
    ).
1712

1713
bridge_v1_remove(
1714
    ActionType,
1715
    Name,
1716
    #{connector := ConnectorName}
1717
) ->
1718
    ConfRootKey = get_conf_root_key_if_only_one(ActionType, Name),
943✔
1719
    case remove(ConfRootKey, ActionType, Name) of
943✔
1720
        ok ->
1721
            ConnectorType = connector_type(ActionType),
926✔
1722
            emqx_connector:remove(ConnectorType, ConnectorName);
926✔
1723
        Error ->
1724
            Error
17✔
1725
    end;
1726
bridge_v1_remove(
1727
    _ActionType,
1728
    _Name,
1729
    Error
1730
) ->
1731
    Error.
324✔
1732

1733
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
1734
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
207✔
1735
    bridge_v1_check_deps_and_remove(
207✔
1736
        BridgeV2Type,
1737
        BridgeName,
1738
        RemoveDeps,
1739
        lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(BridgeV2Type, BridgeName)
1740
    ).
1741

1742
%% Bridge v1 delegated-removal in 3 steps:
1743
%% 1. Delete rule actions if RemoveDeps has 'rule_actions'
1744
%% 2. Delete self (the bridge v2), also delete its channel in the connector
1745
%% 3. Delete the connector if the connector has no more channel left and if 'connector' is in RemoveDeps
1746
bridge_v1_check_deps_and_remove(
1747
    BridgeType,
1748
    BridgeName,
1749
    RemoveDeps,
1750
    #{connector := ConnectorName}
1751
) ->
1752
    RemoveConnector = lists:member(connector, RemoveDeps),
159✔
1753
    case maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) of
159✔
1754
        ok ->
1755
            ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName),
157✔
1756
            case remove(ConfRootKey, BridgeType, BridgeName) of
157✔
1757
                ok when RemoveConnector ->
1758
                    maybe_delete_channels(BridgeType, BridgeName, ConnectorName);
73✔
1759
                ok ->
1760
                    ok;
67✔
1761
                {error, Reason} ->
1762
                    {error, Reason}
17✔
1763
            end;
1764
        {error, Reason} ->
1765
            {error, Reason}
2✔
1766
    end;
1767
bridge_v1_check_deps_and_remove(_BridgeType, _BridgeName, _RemoveDeps, Error) ->
1768
    %% TODO: the connector is gone, for whatever reason, maybe call remove/2 anyway?
1769
    Error.
48✔
1770

1771
maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) ->
1772
    case is_only_source(BridgeType, BridgeName) of
159✔
1773
        true ->
1774
            ok;
10✔
1775
        false ->
1776
            emqx_bridge_lib:maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps)
149✔
1777
    end.
1778

1779
maybe_delete_channels(BridgeType, BridgeName, ConnectorName) ->
1780
    case connector_has_channels(BridgeType, ConnectorName) of
73✔
1781
        true ->
1782
            ok;
×
1783
        false ->
1784
            ConnectorType = connector_type(BridgeType),
73✔
1785
            case emqx_connector:remove(ConnectorType, ConnectorName) of
73✔
1786
                ok ->
1787
                    ok;
73✔
1788
                {error, Reason} ->
1789
                    ?SLOG(error, #{
×
1790
                        msg => failed_to_delete_connector,
1791
                        bridge_type => BridgeType,
1792
                        bridge_name => BridgeName,
1793
                        connector_name => ConnectorName,
1794
                        reason => Reason
1795
                    }),
×
1796
                    {error, Reason}
×
1797
            end
1798
    end.
1799

1800
connector_has_channels(BridgeV2Type, ConnectorName) ->
1801
    ConnectorType = connector_type(BridgeV2Type),
73✔
1802
    case emqx_connector_resource:get_channels(ConnectorType, ConnectorName) of
73✔
1803
        {ok, []} ->
1804
            false;
73✔
1805
        _ ->
1806
            true
×
1807
    end.
1808

1809
bridge_v1_id_to_connector_resource_id(BridgeId) ->
1810
    bridge_v1_id_to_connector_resource_id(?ROOT_KEY_ACTIONS, BridgeId).
×
1811

1812
bridge_v1_id_to_connector_resource_id(ConfRootKey, BridgeId) ->
1813
    case binary:split(BridgeId, <<":">>) of
585✔
1814
        [Type, Name] ->
1815
            BridgeV2Type = bin(bridge_v1_type_to_bridge_v2_type(Type)),
585✔
1816
            ConnectorName =
585✔
1817
                case lookup_conf(ConfRootKey, BridgeV2Type, Name) of
1818
                    #{connector := Con} ->
1819
                        Con;
585✔
1820
                    {error, Reason} ->
1821
                        throw(Reason)
×
1822
                end,
1823
            ConnectorType = bin(connector_type(BridgeV2Type)),
585✔
1824
            <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>
585✔
1825
    end.
1826

1827
bridge_v1_enable_disable(Action, BridgeType, BridgeName) ->
1828
    case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of
93✔
1829
        true ->
1830
            bridge_v1_enable_disable_helper(
89✔
1831
                Action,
1832
                BridgeType,
1833
                BridgeName,
1834
                lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(BridgeType, BridgeName)
1835
            );
1836
        false ->
1837
            {error, not_bridge_v1_compatible}
4✔
1838
    end.
1839

1840
bridge_v1_enable_disable_helper(_Op, _BridgeType, _BridgeName, {error, Reason}) ->
1841
    {error, Reason};
4✔
1842
bridge_v1_enable_disable_helper(enable, BridgeType, BridgeName, #{connector := ConnectorName}) ->
1843
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeType),
21✔
1844
    ConnectorType = connector_type(BridgeV2Type),
21✔
1845
    {ok, _} = emqx_connector:disable_enable(enable, ConnectorType, ConnectorName),
21✔
1846
    ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName),
21✔
1847
    emqx_bridge_v2:disable_enable(ConfRootKey, enable, BridgeV2Type, BridgeName);
21✔
1848
bridge_v1_enable_disable_helper(disable, BridgeType, BridgeName, #{connector := ConnectorName}) ->
1849
    BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType),
64✔
1850
    ConnectorType = connector_type(BridgeV2Type),
64✔
1851
    ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName),
64✔
1852
    {ok, _} = emqx_bridge_v2:disable_enable(ConfRootKey, disable, BridgeV2Type, BridgeName),
64✔
1853
    emqx_connector:disable_enable(disable, ConnectorType, ConnectorName).
64✔
1854

1855
bridge_v1_restart(BridgeV1Type, Name) ->
1856
    ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
26✔
1857
        emqx_connector_resource:restart(ConnectorType, ConnectorName)
22✔
1858
    end,
1859
    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
26✔
1860

1861
bridge_v1_stop(BridgeV1Type, Name) ->
1862
    ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
113✔
1863
        emqx_connector_resource:stop(ConnectorType, ConnectorName)
109✔
1864
    end,
1865
    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false).
113✔
1866

1867
bridge_v1_start(BridgeV1Type, Name) ->
1868
    ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
104✔
1869
        emqx_connector_resource:start(ConnectorType, ConnectorName)
100✔
1870
    end,
1871
    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
104✔
1872

1873
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
1874
    BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
243✔
1875
    case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of
243✔
1876
        true ->
1877
            ConfRootKey = get_conf_root_key_if_only_one(BridgeV2Type, Name),
231✔
1878
            connector_operation_helper_with_conf(
231✔
1879
                ConfRootKey,
1880
                BridgeV2Type,
1881
                Name,
1882
                lookup_conf_if_exists_in_exactly_one_of_sources_and_actions(BridgeV2Type, Name),
1883
                ConnectorOpFun,
1884
                DoHealthCheck
1885
            );
1886
        false ->
1887
            {error, not_bridge_v1_compatible}
12✔
1888
    end.
1889

1890
bridge_v1_reset_metrics(BridgeV1Type, BridgeName) ->
1891
    BridgeV2Type = bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
2✔
1892
    ConfRootKey = get_conf_root_key_if_only_one(
2✔
1893
        BridgeV2Type, BridgeName
1894
    ),
1895
    ok = reset_metrics(ConfRootKey, BridgeV2Type, BridgeName).
2✔
1896

1897
%%====================================================================
1898
%% Misc helper functions
1899
%%====================================================================
1900

1901
operation_to_enable(disable) -> false;
69✔
1902
operation_to_enable(enable) -> true.
27✔
1903

1904
bin(Bin) when is_binary(Bin) -> Bin;
249,840✔
1905
bin(Str) when is_list(Str) -> list_to_binary(Str);
1✔
1906
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
338,947✔
1907

1908
extract_connector_id_from_bridge_v2_id(Id) ->
1909
    case binary:split(Id, <<":">>, [global]) of
1✔
1910
        [<<"action">>, _Type, _Name, <<"connector">>, ConnectorType, ConnecorName] ->
1911
            <<"connector:", ConnectorType/binary, ":", ConnecorName/binary>>;
1✔
1912
        _X ->
1913
            error({error, iolist_to_binary(io_lib:format("Invalid action ID: ~p", [Id]))})
×
1914
    end.
1915

1916
ensure_atom_root_key(ConfRootKey) when is_atom(ConfRootKey) ->
1917
    ConfRootKey;
71✔
1918
ensure_atom_root_key(?ROOT_KEY_ACTIONS_BIN) ->
1919
    ?ROOT_KEY_ACTIONS;
145✔
1920
ensure_atom_root_key(?ROOT_KEY_SOURCES_BIN) ->
1921
    ?ROOT_KEY_SOURCES.
27✔
1922

1923
to_existing_atom(X) ->
1924
    case emqx_utils:safe_to_existing_atom(X, utf8) of
77,333✔
1925
        {ok, A} -> A;
77,327✔
1926
        {error, _} -> throw(bad_atom)
6✔
1927
    end.
1928

1929
referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) ->
1930
    %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
1931
    %% identical to its matching connector type name.
1932
    case get_connector_info(ConnectorNameBin, BridgeType) of
1,608✔
1933
        {error, not_found} ->
1934
            {error, #{
5✔
1935
                reason => "connector_not_found_or_wrong_type",
1936
                connector_name => ConnectorNameBin,
1937
                bridge_name => BridgeName,
1938
                bridge_type => BridgeType
1939
            }};
1940
        {ok, _Connector} ->
1941
            ok
1,603✔
1942
    end.
1943

1944
convert_from_connectors(ConfRootKey, Conf) ->
1945
    maps:map(
290✔
1946
        fun(ActionType, Actions) ->
1947
            maps:map(
80✔
1948
                fun(ActionName, Action) ->
1949
                    case convert_from_connector(ConfRootKey, ActionType, ActionName, Action) of
88✔
1950
                        {ok, NewAction} -> NewAction;
86✔
1951
                        {error, _} -> Action
2✔
1952
                    end
1953
                end,
1954
                Actions
1955
            )
1956
        end,
1957
        Conf
1958
    ).
1959

1960
convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
1961
    case get_connector_info(ConnectorName, Type) of
1,649✔
1962
        {ok, Connector} ->
1963
            TypeAtom = to_existing_atom(Type),
1,628✔
1964
            Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action),
1,628✔
1965
            {ok, Action1};
1,628✔
1966
        {error, not_found} ->
1967
            {error, #{
21✔
1968
                bridge_name => Name,
1969
                reason => <<"connector_not_found_or_wrong_type">>,
1970
                bridge_type => Type,
1971
                connector_name => ConnectorName,
1972
                conf_root_key => ConfRootKey
1973
            }}
1974
    end.
1975

1976
get_connector_info(ConnectorNameBin, BridgeType) ->
1977
    case to_connector(ConnectorNameBin, BridgeType) of
3,257✔
1978
        {error, not_found} ->
1979
            {error, not_found};
7✔
1980
        {ConnectorName, ConnectorType} ->
1981
            case emqx_config:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
3,250✔
1982
                undefined -> {error, not_found};
19✔
1983
                Connector -> {ok, Connector}
3,231✔
1984
            end
1985
    end.
1986

1987
to_connector(ConnectorNameBin, BridgeType) ->
1988
    try
3,257✔
1989
        ConnectorName = to_existing_atom(ConnectorNameBin),
3,257✔
1990
        BridgeType1 = to_existing_atom(BridgeType),
3,251✔
1991
        ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType1),
3,251✔
1992
        {ConnectorName, ConnectorType}
3,250✔
1993
    catch
1994
        _:_ ->
1995
            {error, not_found}
7✔
1996
    end.
1997

1998
alarm_connector_not_found(ActionType, ActionName, ConnectorName) ->
1999
    ConnectorType = connector_type(to_existing_atom(ActionType)),
4✔
2000
    ResId = emqx_connector_resource:resource_id(
4✔
2001
        ConnectorType, ConnectorName
2002
    ),
2003
    _ = emqx_alarm:safe_activate(
4✔
2004
        ResId,
2005
        #{
2006
            connector_name => ConnectorName,
2007
            connector_type => ConnectorType,
2008
            action_type => ActionType,
2009
            action_name => ActionName
2010
        },
2011
        <<"connector not found">>
2012
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc