• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8625278842

10 Apr 2024 02:52AM UTC coverage: 62.489% (-0.002%) from 62.491%
8625278842

push

github

web-flow
Merge pull request #12855 from JimMoen/fix-share-queue-format

fix(mgmt): $queue shared topics format in mgmt topics api

0 of 1 new or added line in 1 file covered. (0.0%)

49 existing lines in 10 files now uncovered.

34606 of 55379 relevant lines covered (62.49%)

6756.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.02
/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_bridge_http_connector).
18

19
-include_lib("typerefl/include/types.hrl").
20
-include_lib("hocon/include/hoconsc.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
-behaviour(emqx_resource).
25

26
%% callbacks of behaviour emqx_resource
27
-export([
28
    callback_mode/0,
29
    on_start/2,
30
    on_stop/2,
31
    on_query/3,
32
    on_query_async/4,
33
    on_get_status/2,
34
    on_get_status/3,
35
    on_add_channel/4,
36
    on_remove_channel/3,
37
    on_get_channels/1,
38
    on_get_channel_status/3
39
]).
40

41
-export([reply_delegator/3]).
42
-export([render_template/2]).
43

44
-export([
45
    roots/0,
46
    fields/1,
47
    desc/1,
48
    namespace/0
49
]).
50

51
%% for other http-like connectors.
52
-export([redact_request/1]).
53

54
-export([validate_method/1, join_paths/2, formalize_request/3, transform_result/1]).
55

56
-define(DEFAULT_PIPELINE_SIZE, 100).
57
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
58

59
-define(READACT_REQUEST_NOTE, "the request body is redacted due to security reasons").
60

61
%%=====================================================================
62
%% Hocon schema
63

64
namespace() -> "connector_http".
668✔
65

66
roots() ->
67
    fields(config).
×
68

69
fields(config) ->
70
    [
71
        {connect_timeout,
72
            sc(
73
                emqx_schema:timeout_duration_ms(),
74
                #{
75
                    default => <<"15s">>,
76
                    desc => ?DESC("connect_timeout")
77
                }
78
            )},
79
        {max_retries,
80
            sc(
81
                non_neg_integer(),
82
                #{deprecated => {since, "5.0.4"}}
83
            )},
84
        {retry_interval,
85
            sc(
86
                emqx_schema:timeout_duration(),
87
                #{deprecated => {since, "5.0.4"}}
88
            )},
89
        {pool_type,
90
            sc(
91
                hoconsc:enum([random, hash]),
92
                #{
93
                    default => random,
94
                    desc => ?DESC("pool_type")
95
                }
96
            )},
97
        {pool_size,
98
            sc(
99
                pos_integer(),
100
                #{
101
                    default => 8,
102
                    desc => ?DESC("pool_size")
103
                }
104
            )},
105
        {enable_pipelining,
106
            sc(
107
                pos_integer(),
108
                #{
109
                    default => ?DEFAULT_PIPELINE_SIZE,
110
                    desc => ?DESC("enable_pipelining")
111
                }
112
            )},
113
        {request,
114
            hoconsc:mk(
115
                ref("request"),
116
                #{
117
                    default => undefined,
118
                    required => false,
119
                    desc => ?DESC("request")
120
                }
121
            )}
122
    ] ++ emqx_connector_schema_lib:ssl_fields();
29,081✔
123
fields("request") ->
124
    [
1,029✔
125
        {method,
126
            hoconsc:mk(binary(), #{
127
                required => false,
128
                desc => ?DESC("method"),
129
                validator => fun ?MODULE:validate_method/1
130
            })},
131
        {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
132
        {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
133
        {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
134
        {max_retries,
135
            sc(
136
                non_neg_integer(),
137
                #{
138
                    required => false,
139
                    desc => ?DESC("max_retries")
140
                }
141
            )},
142
        {request_timeout,
143
            sc(
144
                emqx_schema:timeout_duration_ms(),
145
                #{
146
                    required => false,
147
                    desc => ?DESC("request_timeout")
148
                }
149
            )}
150
    ].
151

152
desc(config) ->
153
    "";
×
154
desc("request") ->
155
    "";
7✔
156
desc(_) ->
157
    undefined.
×
158

159
validate_method(M) when
160
    M =:= <<"post">>;
161
    M =:= <<"put">>;
162
    M =:= <<"get">>;
163
    M =:= <<"delete">>;
164
    M =:= post;
165
    M =:= put;
166
    M =:= get;
167
    M =:= delete
168
->
169
    ok;
×
170
validate_method(M) ->
171
    case string:find(M, "${") of
×
172
        nomatch ->
173
            {error,
×
174
                <<"Invalid method, should be one of 'post', 'put', 'get', 'delete' or variables in ${field} format.">>};
175
        _ ->
176
            ok
×
177
    end.
178

179
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
176,544✔
180
ref(Field) -> hoconsc:ref(?MODULE, Field).
29,081✔
181

182
%% ===================================================================
183

184
callback_mode() -> async_if_possible.
77✔
185

186
on_start(
187
    InstId,
188
    #{
189
        base_url := #{
190
            scheme := Scheme,
191
            host := Host,
192
            port := Port,
193
            path := BasePath
194
        },
195
        connect_timeout := ConnectTimeout,
196
        pool_type := PoolType,
197
        pool_size := PoolSize
198
    } = Config
199
) ->
200
    ?SLOG(info, #{
135✔
201
        msg => "starting_http_connector",
202
        connector => InstId,
203
        config => redact(Config)
204
    }),
135✔
205
    {Transport, TransportOpts} =
135✔
206
        case Scheme of
207
            http ->
208
                {tcp, []};
129✔
209
            https ->
210
                SSLConf = maps:get(ssl, Config),
6✔
211
                %% force enable ssl
212
                SSLOpts = emqx_tls_lib:to_client_opts(SSLConf#{enable => true}),
6✔
213
                {tls, SSLOpts}
6✔
214
        end,
215
    NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
135✔
216
    PoolOpts = [
135✔
217
        {host, Host},
218
        {port, Port},
219
        {connect_timeout, ConnectTimeout},
220
        {keepalive, 30000},
221
        {pool_type, PoolType},
222
        {pool_size, PoolSize},
223
        {transport, Transport},
224
        {transport_opts, NTransportOpts},
225
        {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
226
    ],
227
    State = #{
135✔
228
        pool_name => InstId,
229
        pool_type => PoolType,
230
        host => Host,
231
        port => Port,
232
        connect_timeout => ConnectTimeout,
233
        base_path => BasePath,
234
        request => preprocess_request(maps:get(request, Config, undefined))
235
    },
236
    case start_pool(InstId, PoolOpts) of
135✔
237
        ok ->
238
            case do_get_status(InstId, ConnectTimeout) of
135✔
239
                ok ->
240
                    {ok, State};
127✔
241
                Error ->
242
                    ok = ehttpc_sup:stop_pool(InstId),
8✔
243
                    Error
8✔
244
            end;
245
        Error ->
246
            Error
×
247
    end.
248

249
start_pool(PoolName, PoolOpts) ->
250
    case ehttpc_sup:start_pool(PoolName, PoolOpts) of
135✔
251
        {ok, _} ->
252
            ok;
135✔
253
        {error, {already_started, _}} ->
254
            ?SLOG(warning, #{
×
255
                msg => "emqx_connector_on_start_already_started",
256
                connector => PoolName,
257
                pool_name => PoolName
258
            }),
×
259
            ok;
×
260
        Error ->
261
            Error
×
262
    end.
263

264
on_add_channel(
265
    _InstId,
266
    OldState,
267
    ActionId,
268
    ActionConfig
269
) ->
270
    InstalledActions = maps:get(installed_actions, OldState, #{}),
147✔
271
    {ok, ActionState} = do_create_http_action(ActionConfig),
147✔
272
    RenderTmplFunc = maps:get(render_template_func, ActionConfig, fun ?MODULE:render_template/2),
147✔
273
    ActionState1 = ActionState#{render_template_func => RenderTmplFunc},
147✔
274
    NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions),
147✔
275
    NewState = maps:put(installed_actions, NewInstalledActions, OldState),
147✔
276
    {ok, NewState}.
147✔
277

278
do_create_http_action(_ActionConfig = #{parameters := Params}) ->
279
    {ok, preprocess_request(Params)}.
147✔
280

281
on_stop(InstId, _State) ->
282
    ?SLOG(info, #{
133✔
283
        msg => "stopping_http_connector",
284
        connector => InstId
285
    }),
133✔
286
    Res = ehttpc_sup:stop_pool(InstId),
133✔
287
    ?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
133✔
288
    Res.
133✔
289

290
on_remove_channel(
291
    _InstId,
292
    OldState = #{installed_actions := InstalledActions},
293
    ActionId
294
) ->
295
    NewInstalledActions = maps:remove(ActionId, InstalledActions),
149✔
296
    NewState = maps:put(installed_actions, NewInstalledActions, OldState),
149✔
297
    {ok, NewState}.
149✔
298

299
%% BridgeV1 entrypoint
300
on_query(InstId, {send_message, Msg}, State) ->
301
    case maps:get(request, State, undefined) of
1✔
302
        undefined ->
303
            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
×
304
            {error, arg_request_not_found};
×
305
        Request ->
306
            #{
1✔
307
                method := Method,
308
                path := Path,
309
                body := Body,
310
                headers := Headers,
311
                request_timeout := Timeout
312
            } = process_request(Request, Msg),
313
            %% bridge buffer worker has retry, do not let ehttpc retry
314
            Retry = 2,
1✔
315
            ClientId = maps:get(clientid, Msg, undefined),
1✔
316
            on_query(
1✔
317
                InstId,
318
                {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
319
                State
320
            )
321
    end;
322
%% BridgeV2 entrypoint
323
on_query(
324
    InstId,
325
    {ActionId, Msg},
326
    State = #{installed_actions := InstalledActions}
327
) when is_binary(ActionId) ->
328
    case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
14✔
329
        {undefined, _} ->
330
            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
×
331
            {error, arg_request_not_found};
×
332
        {_, undefined} ->
333
            ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
×
334
            {error, action_not_found};
×
335
        {Request, ActionState} ->
336
            #{
14✔
337
                method := Method,
338
                path := Path,
339
                body := Body,
340
                headers := Headers,
341
                request_timeout := Timeout
342
            } = process_request_and_action(Request, ActionState, Msg),
343
            %% bridge buffer worker has retry, do not let ehttpc retry
344
            Retry = 2,
14✔
345
            ClientId = clientid(Msg),
14✔
346
            on_query(
14✔
347
                InstId,
348
                {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
349
                State
350
            )
351
    end;
352
on_query(InstId, {Method, Request}, State) ->
353
    %% TODO: Get retry from State
354
    on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
×
355
on_query(InstId, {Method, Request, Timeout}, State) ->
356
    %% TODO: Get retry from State
357
    on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State);
×
358
on_query(
359
    InstId,
360
    {KeyOrNum, Method, Request, Timeout, Retry},
361
    #{base_path := BasePath} = State
362
) ->
363
    ?TRACE(
15✔
364
        "QUERY",
15✔
365
        "http_connector_received",
366
        #{
367
            request => redact_request(Request),
368
            note => ?READACT_REQUEST_NOTE,
369
            connector => InstId,
370
            state => redact(State)
371
        }
15✔
372
    ),
373
    NRequest = formalize_request(Method, BasePath, Request),
15✔
374
    Worker = resolve_pool_worker(State, KeyOrNum),
15✔
375
    Result0 = ehttpc:request(
15✔
376
        Worker,
377
        Method,
378
        NRequest,
379
        Timeout,
380
        Retry
381
    ),
382
    Result = transform_result(Result0),
15✔
383
    case Result of
15✔
384
        {error, {recoverable_error, Reason}} ->
385
            ?SLOG(warning, #{
×
386
                msg => "http_connector_do_request_failed",
387
                reason => Reason,
388
                connector => InstId
389
            }),
×
390
            {error, {recoverable_error, Reason}};
×
391
        {error, #{status_code := StatusCode}} ->
392
            ?SLOG(error, #{
×
393
                msg => "http_connector_do_request_received_error_response",
394
                note => ?READACT_REQUEST_NOTE,
395
                request => redact_request(NRequest),
396
                connector => InstId,
397
                status_code => StatusCode
398
            }),
×
399
            Result;
×
400
        {error, Reason} ->
401
            ?SLOG(error, #{
×
402
                msg => "http_connector_do_request_failed",
403
                note => ?READACT_REQUEST_NOTE,
404
                request => redact_request(NRequest),
405
                reason => Reason,
406
                connector => InstId
407
            }),
×
408
            Result;
×
409
        _Success ->
410
            Result
15✔
411
    end.
412

413
%% BridgeV1 entrypoint
414
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
415
    case maps:get(request, State, undefined) of
×
416
        undefined ->
417
            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
×
418
            {error, arg_request_not_found};
×
419
        Request ->
420
            #{
×
421
                method := Method,
422
                path := Path,
423
                body := Body,
424
                headers := Headers,
425
                request_timeout := Timeout
426
            } = process_request(Request, Msg),
427
            ClientId = maps:get(clientid, Msg, undefined),
×
428
            on_query_async(
×
429
                InstId,
430
                {ClientId, Method, {Path, Headers, Body}, Timeout},
431
                ReplyFunAndArgs,
432
                State
433
            )
434
    end;
435
%% BridgeV2 entrypoint
436
on_query_async(
437
    InstId,
438
    {ActionId, Msg},
439
    ReplyFunAndArgs,
440
    State = #{installed_actions := InstalledActions}
441
) when is_binary(ActionId) ->
442
    case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
35✔
443
        {undefined, _} ->
444
            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
×
445
            {error, arg_request_not_found};
×
446
        {_, undefined} ->
447
            ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
×
448
            {error, action_not_found};
×
449
        {Request, ActionState} ->
450
            #{
35✔
451
                method := Method,
452
                path := Path,
453
                body := Body,
454
                headers := Headers,
455
                request_timeout := Timeout
456
            } = process_request_and_action(Request, ActionState, Msg),
457
            ClientId = clientid(Msg),
35✔
458
            on_query_async(
35✔
459
                InstId,
460
                {ClientId, Method, {Path, Headers, Body}, Timeout},
461
                ReplyFunAndArgs,
462
                State
463
            )
464
    end;
465
on_query_async(
466
    InstId,
467
    {KeyOrNum, Method, Request, Timeout},
468
    ReplyFunAndArgs,
469
    #{base_path := BasePath} = State
470
) ->
471
    Worker = resolve_pool_worker(State, KeyOrNum),
35✔
472
    ?TRACE(
35✔
473
        "QUERY_ASYNC",
35✔
474
        "http_connector_received",
475
        #{
476
            request => redact_request(Request),
477
            note => ?READACT_REQUEST_NOTE,
478
            connector => InstId,
479
            state => redact(State)
480
        }
35✔
481
    ),
482
    NRequest = formalize_request(Method, BasePath, Request),
35✔
483
    MaxAttempts = maps:get(max_attempts, State, 3),
35✔
484
    Context = #{
35✔
485
        attempt => 1,
486
        max_attempts => MaxAttempts,
487
        state => State,
488
        key_or_num => KeyOrNum,
489
        method => Method,
490
        request => NRequest,
491
        timeout => Timeout
492
    },
493
    ok = ehttpc:request_async(
35✔
494
        Worker,
495
        Method,
496
        NRequest,
497
        Timeout,
498
        {fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
499
    ),
500
    {ok, Worker}.
35✔
501

502
resolve_pool_worker(State, undefined) ->
503
    resolve_pool_worker(State, self());
28✔
504
resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
505
    case maps:get(pool_type, State, random) of
50✔
506
        random ->
507
            ehttpc_pool:pick_worker(PoolName);
50✔
508
        hash ->
509
            ehttpc_pool:pick_worker(PoolName, Key)
×
510
    end.
511

512
on_get_channels(ResId) ->
513
    emqx_bridge_v2:get_channels_for_connector(ResId).
629✔
514

515
on_get_status(InstId, State) ->
516
    on_get_status(InstId, State, fun default_health_checker/2).
455✔
517

518
on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) ->
519
    case do_get_status(InstId, Timeout, DoPerWorker) of
753✔
520
        ok ->
521
            connected;
748✔
522
        {error, still_connecting} ->
523
            connecting;
×
524
        {error, Reason} ->
525
            {disconnected, State, Reason}
5✔
526
    end.
527

528
do_get_status(PoolName, Timeout) ->
529
    do_get_status(PoolName, Timeout, fun default_health_checker/2).
135✔
530

531
do_get_status(PoolName, Timeout, DoPerWorker) ->
532
    Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
888✔
533
    try emqx_utils:pmap(fun(Worker) -> DoPerWorker(Worker, Timeout) end, Workers, Timeout) of
888✔
534
        [] ->
535
            {error, still_connecting};
×
536
        [_ | _] = Results ->
537
            case [E || {error, _} = E <- Results] of
887✔
538
                [] ->
539
                    ok;
875✔
540
                [{error, Reason} | _] ->
541
                    ?SLOG(info, #{
12✔
542
                        msg => "health_check_failed",
543
                        reason => redact(Reason),
544
                        connector => PoolName
545
                    }),
12✔
546
                    {error, Reason}
12✔
547
            end
548
    catch
549
        exit:timeout ->
550
            ?SLOG(info, #{
1✔
551
                msg => "health_check_failed",
552
                reason => timeout,
553
                connector => PoolName
554
            }),
1✔
555
            {error, timeout}
1✔
556
    end.
557

558
default_health_checker(Worker, Timeout) ->
559
    case ehttpc:health_check(Worker, Timeout) of
4,437✔
560
        ok ->
561
            ok;
4,379✔
562
        {error, _} = Error ->
563
            Error
50✔
564
    end.
565

566
on_get_channel_status(
567
    InstId,
568
    _ChannelId,
569
    State
570
) ->
571
    %% XXX: Reuse the connector status
572
    on_get_status(InstId, State).
177✔
573

574
%%--------------------------------------------------------------------
575
%% Internal functions
576
%%--------------------------------------------------------------------
577
preprocess_request(undefined) ->
578
    undefined;
×
579
preprocess_request(Req) when map_size(Req) == 0 ->
580
    undefined;
×
581
preprocess_request(#{method := Method} = Req) ->
582
    Path = maps:get(path, Req, <<>>),
282✔
583
    Headers = maps:get(headers, Req, []),
282✔
584
    #{
282✔
585
        method => parse_template(to_bin(Method)),
586
        path => parse_template(Path),
587
        body => maybe_parse_template(body, Req),
588
        headers => parse_headers(Headers),
589
        request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
590
        max_retries => maps:get(max_retries, Req, 2)
591
    }.
592

593
parse_headers(Headers) when is_map(Headers) ->
594
    maps:fold(
176✔
595
        fun(K, V, Acc) -> [parse_header(K, V) | Acc] end,
97✔
596
        [],
597
        Headers
598
    );
599
parse_headers(Headers) when is_list(Headers) ->
600
    lists:map(
106✔
601
        fun({K, V}) -> parse_header(K, V) end,
93✔
602
        Headers
603
    ).
604

605
parse_header(K, V) ->
606
    KStr = to_bin(K),
190✔
607
    VTpl = parse_template(to_bin(V)),
190✔
608
    {parse_template(KStr), maybe_wrap_auth_header(KStr, VTpl)}.
190✔
609

610
maybe_wrap_auth_header(Key, VTpl) when
611
    (byte_size(Key) =:= 19 orelse byte_size(Key) =:= 13)
612
->
613
    %% We check the size of potential keys in the guard above and consider only
614
    %% those that match the number of characters of either "Authorization" or
615
    %% "Proxy-Authorization".
616
    case try_bin_to_lower(Key) of
50✔
617
        <<"authorization">> ->
618
            emqx_secret:wrap(VTpl);
46✔
619
        <<"proxy-authorization">> ->
620
            emqx_secret:wrap(VTpl);
2✔
621
        _Other ->
622
            VTpl
2✔
623
    end;
624
maybe_wrap_auth_header(_Key, VTpl) ->
625
    VTpl.
140✔
626

627
try_bin_to_lower(Bin) ->
628
    try iolist_to_binary(string:lowercase(Bin)) of
50✔
629
        LowercaseBin -> LowercaseBin
50✔
630
    catch
631
        _:_ -> Bin
×
632
    end.
633

634
maybe_parse_template(Key, Conf) ->
635
    case maps:get(Key, Conf, undefined) of
282✔
636
        undefined -> undefined;
179✔
637
        Val -> parse_template(Val)
103✔
638
    end.
639

640
parse_template(String) ->
641
    emqx_template:parse(String).
1,047✔
642

643
process_request_and_action(Request, ActionState, Msg) ->
644
    MethodTemplate = maps:get(method, ActionState),
49✔
645
    RenderTmplFunc = maps:get(render_template_func, ActionState),
49✔
646
    Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)),
49✔
647
    PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)),
49✔
648
    PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)),
49✔
649

650
    Path =
49✔
651
        case PathSuffix of
652
            "" -> PathPrefix;
9✔
653
            _ -> join_paths(PathPrefix, PathSuffix)
40✔
654
        end,
655

656
    HeadersTemplate1 = maps:get(headers, Request),
49✔
657
    HeadersTemplate2 = maps:get(headers, ActionState),
49✔
658
    Headers = merge_proplist(
49✔
659
        render_headers(HeadersTemplate1, RenderTmplFunc, Msg),
660
        render_headers(HeadersTemplate2, RenderTmplFunc, Msg)
661
    ),
662
    BodyTemplate = maps:get(body, ActionState),
49✔
663
    Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg),
49✔
664
    #{
49✔
665
        method => Method,
666
        path => Path,
667
        body => Body,
668
        headers => Headers,
669
        request_timeout => maps:get(request_timeout, ActionState)
670
    }.
671

672
merge_proplist(Proplist1, Proplist2) ->
673
    lists:foldl(
49✔
674
        fun({K, V}, Acc) ->
675
            case lists:keyfind(K, 1, Acc) of
89✔
676
                false ->
677
                    [{K, V} | Acc];
89✔
678
                {K, _} = {K, V1} ->
679
                    [{K, V1} | Acc]
×
680
            end
681
        end,
682
        Proplist2,
683
        Proplist1
684
    ).
685

686
process_request(
687
    #{
688
        method := MethodTemplate,
689
        path := PathTemplate,
690
        body := BodyTemplate,
691
        headers := HeadersTemplate,
692
        request_timeout := ReqTimeout
693
    } = Conf,
694
    Msg
695
) ->
696
    RenderTemplateFun = fun render_template/2,
1✔
697
    Conf#{
1✔
698
        method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)),
699
        path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)),
700
        body => render_request_body(BodyTemplate, RenderTemplateFun, Msg),
701
        headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg),
702
        request_timeout => ReqTimeout
703
    }.
704

705
render_request_body(undefined, _, Msg) ->
706
    emqx_utils_json:encode(Msg);
19✔
707
render_request_body(BodyTks, RenderTmplFunc, Msg) ->
708
    RenderTmplFunc(BodyTks, Msg).
31✔
709

710
render_headers(HeaderTks, RenderTmplFunc, Msg) ->
711
    lists:map(
99✔
712
        fun({K, V}) ->
713
            {
94✔
714
                render_template_string(K, RenderTmplFunc, Msg),
715
                render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg)
716
            }
717
        end,
718
        HeaderTks
719
    ).
720

721
render_template(Template, Msg) ->
722
    % NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
723
    {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}),
192✔
724
    String.
192✔
725

726
render_template_string(Template, RenderTmplFunc, Msg) ->
727
    unicode:characters_to_binary(RenderTmplFunc(Template, Msg)).
238✔
728

729
make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
49✔
730
make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
×
731
make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
1✔
732
make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
×
733

734
formalize_request(Method, BasePath, {Path, Headers, _Body}) when
735
    Method =:= get; Method =:= delete
736
->
737
    formalize_request(Method, BasePath, {Path, Headers});
2,385✔
738
formalize_request(_Method, BasePath, {Path, Headers, Body}) ->
739
    {join_paths(BasePath, Path), Headers, Body};
49✔
740
formalize_request(_Method, BasePath, {Path, Headers}) ->
741
    {join_paths(BasePath, Path), Headers}.
2,385✔
742

743
%% By default, we cannot treat HTTP paths as "file" or "resource" paths,
744
%% because an HTTP server may handle paths like
745
%% "/a/b/c/", "/a/b/c" and "/a//b/c" differently.
746
%%
747
%% So we try to avoid unnecessary path normalization.
748
%%
749
%% See also: `join_paths_test_/0`
750
join_paths(Path1, Path2) ->
751
    do_join_paths(lists:reverse(to_list(Path1)), to_list(Path2)).
5,909✔
752

753
%% "abc/" + "/cde"
754
do_join_paths([$/ | Path1], [$/ | Path2]) ->
755
    lists:reverse(Path1) ++ [$/ | Path2];
44✔
756
%% "abc/" + "cde"
757
do_join_paths([$/ | Path1], Path2) ->
758
    lists:reverse(Path1) ++ [$/ | Path2];
2,395✔
759
%% "abc" + "/cde"
760
do_join_paths(Path1, [$/ | Path2]) ->
761
    lists:reverse(Path1) ++ [$/ | Path2];
1,343✔
762
%% "abc" + "cde"
763
do_join_paths(Path1, Path2) ->
764
    lists:reverse(Path1) ++ [$/ | Path2].
2,127✔
765

766
to_list(List) when is_list(List) -> List;
9,001✔
767
to_list(Bin) when is_binary(Bin) -> binary_to_list(Bin).
2,817✔
768

769
to_bin(Bin) when is_binary(Bin) ->
770
    Bin;
348✔
771
to_bin(Str) when is_list(Str) ->
772
    list_to_binary(Str);
×
773
to_bin(Atom) when is_atom(Atom) ->
774
    atom_to_binary(Atom, utf8).
314✔
775

776
reply_delegator(Context, ReplyFunAndArgs, Result0) ->
777
    spawn(fun() ->
30✔
778
        Result = transform_result(Result0),
30✔
779
        maybe_retry(Result, Context, ReplyFunAndArgs)
30✔
780
    end).
781

782
transform_result(Result) ->
783
    case Result of
2,429✔
784
        %% The normal reason happens when the HTTP connection times out before
785
        %% the request has been fully processed
786
        {error, Reason} when
787
            Reason =:= econnrefused;
788
            Reason =:= timeout;
789
            Reason =:= normal;
790
            Reason =:= {shutdown, normal};
791
            Reason =:= {shutdown, closed}
792
        ->
793
            {error, {recoverable_error, Reason}};
31✔
794
        {error, {closed, _Message} = Reason} ->
795
            %% _Message = "The connection was lost."
UNCOV
796
            {error, {recoverable_error, Reason}};
×
797
        {error, _Reason} ->
798
            Result;
8✔
799
        {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
800
            Result;
1✔
801
        {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
802
            Result;
2,388✔
803
        {ok, _TooManyRequests = StatusCode = 429, Headers} ->
804
            {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}};
×
805
        {ok, StatusCode, Headers} ->
806
            {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}};
×
807
        {ok, _TooManyRequests = StatusCode = 429, Headers, Body} ->
808
            {error,
×
809
                {recoverable_error, #{
810
                    status_code => StatusCode, headers => Headers, body => Body
811
                }}};
812
        {ok, StatusCode, Headers, Body} ->
813
            {error,
1✔
814
                {unrecoverable_error, #{
815
                    status_code => StatusCode, headers => Headers, body => Body
816
                }}}
817
    end.
818

819
maybe_retry(Result, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
820
    N >= Max
821
->
UNCOV
822
    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
×
823
maybe_retry(
824
    {error, {unrecoverable_error, #{status_code := _}}} = Result, _Context, ReplyFunAndArgs
825
) ->
826
    %% request was successful, but we got an error response; no need to retry
827
    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
1✔
828
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
UNCOV
829
    #{
×
830
        state := State,
831
        attempt := Attempt,
832
        key_or_num := KeyOrNum,
833
        method := Method,
834
        request := Request,
835
        timeout := Timeout
836
    } = Context,
837
    %% TODO: reset the expiration time for free retries?
UNCOV
838
    IsFreeRetry =
×
839
        case Reason of
840
            {recoverable_error, normal} -> true;
×
841
            {recoverable_error, {shutdown, normal}} -> true;
×
UNCOV
842
            _ -> false
×
843
        end,
UNCOV
844
    NContext =
×
845
        case IsFreeRetry of
846
            true -> Context;
×
UNCOV
847
            false -> Context#{attempt := Attempt + 1}
×
848
        end,
UNCOV
849
    ?tp(http_will_retry_async, #{}),
×
UNCOV
850
    Worker = resolve_pool_worker(State, KeyOrNum),
×
UNCOV
851
    ok = ehttpc:request_async(
×
852
        Worker,
853
        Method,
854
        Request,
855
        Timeout,
856
        {fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
857
    ),
UNCOV
858
    ok;
×
859
maybe_retry(Result, _Context, ReplyFunAndArgs) ->
860
    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
29✔
861

862
%% Function that will do a deep traversal of Data and remove sensitive
863
%% information (i.e., passwords)
864
redact(Data) ->
865
    emqx_utils:redact(Data).
1✔
866

867
%% because the body may contain some sensitive data
868
%% and at the same time the redact function will not scan the binary data
869
%% and we also can't know the body format and where the sensitive data will be
870
%% so the easy way to keep data security is redacted the whole body
871
redact_request({Path, Headers}) ->
872
    {Path, emqx_utils_redact:redact_headers(Headers)};
×
873
redact_request({Path, Headers, _Body}) ->
874
    {Path, emqx_utils_redact:redact_headers(Headers), <<"******">>}.
×
875

876
clientid(Msg) -> maps:get(clientid, Msg, undefined).
49✔
877

878
-ifdef(TEST).
879
-include_lib("eunit/include/eunit.hrl").
880

881
redact_test_() ->
882
    TestData = #{
4✔
883
        headers => [
884
            {<<"content-type">>, <<"application/json">>},
885
            {<<"Authorization">>, <<"Basic YWxhZGRpbjpvcGVuc2VzYW1l">>}
886
        ]
887
    },
888
    [
4✔
889
        ?_assertNotEqual(TestData, redact(TestData))
1✔
890
    ].
891

892
join_paths_test_() ->
893
    [
4✔
894
        ?_assertEqual("abc/cde", join_paths("abc", "cde")),
1✔
895
        ?_assertEqual("abc/cde", join_paths("abc", "/cde")),
1✔
896
        ?_assertEqual("abc/cde", join_paths("abc/", "cde")),
1✔
897
        ?_assertEqual("abc/cde", join_paths("abc/", "/cde")),
1✔
898

899
        ?_assertEqual("/", join_paths("", "")),
1✔
900
        ?_assertEqual("/cde", join_paths("", "cde")),
1✔
901
        ?_assertEqual("/cde", join_paths("", "/cde")),
1✔
902
        ?_assertEqual("/cde", join_paths("/", "cde")),
1✔
903
        ?_assertEqual("/cde", join_paths("/", "/cde")),
1✔
904

905
        ?_assertEqual("//cde/", join_paths("/", "//cde/")),
1✔
906
        ?_assertEqual("abc///cde/", join_paths("abc//", "//cde/"))
1✔
907
    ].
908

909
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc