• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.16
/apps/emqx_utils/src/emqx_utils.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_utils).
18

19
-compile(inline).
20
%% [TODO] Cleanup so the instruction below is not necessary.
21
-elvis([{elvis_style, god_modules, disable}]).
22

23
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
24

25
-export([
26
    merge_opts/2,
27
    maybe_apply/2,
28
    compose/1,
29
    compose/2,
30
    cons/2,
31
    run_fold/3,
32
    pipeline/3,
33
    start_timer/2,
34
    start_timer/3,
35
    cancel_timer/1,
36
    drain_deliver/0,
37
    drain_deliver/1,
38
    drain_down/1,
39
    check_oom/1,
40
    check_oom/2,
41
    tune_heap_size/1,
42
    proc_name/2,
43
    proc_stats/0,
44
    proc_stats/1,
45
    rand_seed/0,
46
    now_to_secs/1,
47
    now_to_ms/1,
48
    index_of/2,
49
    maybe_parse_ip/1,
50
    ipv6_probe/1,
51
    gen_id/0,
52
    gen_id/1,
53
    explain_posix/1,
54
    pforeach/2,
55
    pforeach/3,
56
    pmap/2,
57
    pmap/3,
58
    readable_error_msg/1,
59
    safe_to_existing_atom/1,
60
    safe_to_existing_atom/2,
61
    pub_props_to_packet/1,
62
    safe_filename/1,
63
    diff_lists/3,
64
    merge_lists/3,
65
    flattermap/2,
66
    tcp_keepalive_opts/4,
67
    format/1,
68
    call_first_defined/1,
69
    ntoa/1,
70
    foldl_while/3,
71
    is_restricted_str/1
72
]).
73

74
-export([
75
    bin_to_hexstr/2,
76
    hexstr_to_bin/1
77
]).
78

79
-export([
80
    nolink_apply/1,
81
    nolink_apply/2
82
]).
83

84
-export([clamp/3, redact/1, redact/2, is_redacted/2, is_redacted/3]).
85
-export([deobfuscate/2]).
86

87
-export_type([
88
    readable_error_msg/1
89
]).
90

91
-type readable_error_msg(_Error) :: binary().
92

93
-type option(T) :: undefined | T.
94

95
-dialyzer({nowarn_function, [nolink_apply/2]}).
96

97
-define(SHORT, 8).
98

99
-define(DEFAULT_PMAP_TIMEOUT, 5000).
100

101
%% @doc Parse v4 or v6 string format address to tuple.
102
%% `Host' itself is returned if it's not an ip string.
103
maybe_parse_ip(Host) ->
104
    case inet:parse_address(Host) of
×
105
        {ok, Addr} when is_tuple(Addr) -> Addr;
×
106
        {error, einval} -> Host
×
107
    end.
108

109
%% @doc Add `ipv6_probe' socket option if it's supported.
110
%% gen_tcp:ipv6_probe() -> true. is added to EMQ's OTP forks
111
ipv6_probe(Opts) ->
112
    case erlang:function_exported(gen_tcp, ipv6_probe, 0) of
467✔
113
        true -> [{ipv6_probe, true} | Opts];
467✔
114
        false -> Opts
×
115
    end.
116

117
%% @doc Merge options
118
-spec merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist().
119
merge_opts(Defaults, Options) ->
120
    lists:foldl(
1,240✔
121
        fun
122
            ({Opt, Val}, Acc) ->
123
                lists:keystore(Opt, 1, Acc, {Opt, Val});
8,691✔
124
            (Opt, Acc) ->
125
                lists:usort([Opt | Acc])
×
126
        end,
127
        Defaults,
128
        Options
129
    ).
130

131
%% @doc Apply a function to a maybe argument.
132
-spec maybe_apply(fun((option(A)) -> option(A)), option(A)) ->
133
    option(A)
134
when
135
    A :: any().
136
maybe_apply(_Fun, undefined) ->
137
    undefined;
2✔
138
maybe_apply(Fun, Arg) when is_function(Fun) ->
139
    erlang:apply(Fun, [Arg]).
34,962✔
140

141
-spec compose(list(F)) -> G when
142
    F :: fun((any()) -> any()),
143
    G :: fun((any()) -> any()).
144
compose([F | More]) -> compose(F, More).
×
145

146
-spec compose(F, G | [Gs]) -> C when
147
    F :: fun((X1) -> X2),
148
    G :: fun((X2) -> X3),
149
    Gs :: [fun((Xn) -> Xn1)],
150
    C :: fun((X1) -> Xm),
151
    X3 :: any(),
152
    Xn :: any(),
153
    Xn1 :: any(),
154
    Xm :: any().
155
compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end;
×
156
compose(F, [G]) -> compose(F, G);
×
157
compose(F, [G | More]) -> compose(compose(F, G), More).
×
158

159
-spec cons(X, [X]) -> [X, ...].
160
cons(Head, Tail) ->
161
    [Head | Tail].
46✔
162

163
%% @doc RunFold
164
run_fold([], Acc, _State) ->
165
    Acc;
25,234✔
166
run_fold([Fun | More], Acc, State) ->
167
    run_fold(More, Fun(Acc, State), State).
100,932✔
168

169
%% @doc Pipeline
170
pipeline([], Input, State) ->
171
    {ok, Input, State};
53,960✔
172
pipeline([Fun | More], Input, State) ->
173
    case apply_fun(Fun, Input, State) of
271,191✔
174
        ok -> pipeline(More, Input, State);
144,413✔
175
        {ok, NState} -> pipeline(More, Input, NState);
59,510✔
176
        {ok, Output, NState} -> pipeline(More, Output, NState);
67,229✔
177
        {error, Reason} -> {error, Reason, State};
37✔
178
        {error, Reason, NState} -> {error, Reason, NState}
2✔
179
    end.
180

181
-spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
182
foldl_while(_Fun, Acc, []) ->
183
    Acc;
42✔
184
foldl_while(Fun, Acc, [X | Xs]) ->
185
    case Fun(X, Acc) of
123✔
186
        {cont, NewAcc} ->
187
            foldl_while(Fun, NewAcc, Xs);
55✔
188
        {halt, NewAcc} ->
189
            NewAcc
68✔
190
    end.
191

192
-compile({inline, [apply_fun/3]}).
193
apply_fun(Fun, Input, State) ->
194
    case erlang:fun_info(Fun, arity) of
271,191✔
195
        {arity, 1} -> Fun(Input);
9,449✔
196
        {arity, 2} -> Fun(Input, State)
261,742✔
197
    end.
198

199
-spec start_timer(integer() | atom(), term()) -> option(reference()).
200
start_timer(Interval, Msg) ->
201
    start_timer(Interval, self(), Msg).
608,616✔
202

203
-spec start_timer(integer() | atom(), pid() | atom(), term()) -> option(reference()).
204
start_timer(Interval, Dest, Msg) when is_number(Interval) ->
205
    erlang:start_timer(erlang:ceil(Interval), Dest, Msg);
608,616✔
206
start_timer(_Atom, _Dest, _Msg) ->
207
    undefined.
×
208

209
-spec cancel_timer(option(reference())) -> ok.
210
cancel_timer(Timer) when is_reference(Timer) ->
211
    case erlang:cancel_timer(Timer) of
11,299✔
212
        false ->
213
            receive
147✔
214
                {timeout, Timer, _} -> ok
122✔
215
            after 0 -> ok
25✔
216
            end;
217
        _ ->
218
            ok
11,152✔
219
    end;
220
cancel_timer(_) ->
221
    ok.
40✔
222

223
%% @doc Drain delivers
224
drain_deliver() ->
225
    drain_deliver(-1).
570✔
226

227
drain_deliver(N) when is_integer(N) ->
228
    drain_deliver(N, []).
4,377✔
229

230
drain_deliver(0, Acc) ->
231
    lists:reverse(Acc);
×
232
drain_deliver(N, Acc) ->
233
    receive
4,606✔
234
        Deliver = {deliver, _Topic, _Msg} ->
235
            drain_deliver(N - 1, [Deliver | Acc])
229✔
236
    after 0 ->
237
        lists:reverse(Acc)
4,377✔
238
    end.
239

240
%% @doc Drain process 'DOWN' events.
241
-spec drain_down(pos_integer()) -> list(pid()).
242
drain_down(Cnt) when Cnt > 0 ->
243
    drain_down(Cnt, []).
9,879✔
244

245
drain_down(0, Acc) ->
246
    lists:reverse(Acc);
×
247
drain_down(Cnt, Acc) ->
248
    receive
12,550✔
249
        {'DOWN', _MRef, process, Pid, _Reason} ->
250
            drain_down(Cnt - 1, [Pid | Acc])
2,671✔
251
    after 0 ->
252
        lists:reverse(Acc)
9,879✔
253
    end.
254

255
%% @doc Check process's mailbox and heapsize against OOM policy,
256
%% return `ok | {shutdown, Reason}' accordingly.
257
%% `ok': There is nothing out of the ordinary.
258
%% `shutdown': Some numbers (message queue length hit the limit),
259
%%             hence shutdown for greater good (system stability).
260
%% [FIXME] cross-dependency on `emqx_types`.
261
-spec check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}.
262
check_oom(Policy) ->
263
    check_oom(self(), Policy).
11✔
264

265
-spec check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}.
266
check_oom(_Pid, #{enable := false}) ->
267
    ok;
×
268
check_oom(Pid, #{
269
    max_mailbox_size := MaxQLen,
270
    max_heap_size := MaxHeapSize
271
}) ->
272
    case process_info(Pid, [message_queue_len, total_heap_size]) of
11✔
273
        undefined ->
274
            ok;
×
275
        [{message_queue_len, QLen}, {total_heap_size, HeapSize}] ->
276
            do_check_oom([
11✔
277
                {QLen, MaxQLen, mailbox_overflow},
278
                {HeapSize, MaxHeapSize, proc_heap_too_large}
279
            ])
280
    end.
281

282
do_check_oom([]) ->
283
    ok;
11✔
284
do_check_oom([{Val, Max, Reason} | Rest]) ->
285
    case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of
22✔
286
        true -> {shutdown, #{reason => Reason, value => Val, max => Max}};
×
287
        false -> do_check_oom(Rest)
22✔
288
    end.
289

290
tune_heap_size(#{enable := false}) ->
291
    ok;
424✔
292
%% If the max_heap_size is set to zero, the limit is disabled.
293
tune_heap_size(#{max_heap_size := MaxHeapSize}) when MaxHeapSize > 0 ->
294
    MaxSize =
8,462✔
295
        case erlang:system_info(wordsize) of
296
            % arch_64
297
            8 ->
298
                (1 bsl 59) - 1;
8,462✔
299
            % arch_32
300
            4 ->
301
                (1 bsl 27) - 1
×
302
        end,
303
    OverflowedSize =
8,462✔
304
        case erlang:trunc(MaxHeapSize * 1.5) of
305
            SZ when SZ > MaxSize -> MaxSize;
×
306
            SZ -> SZ
8,462✔
307
        end,
308
    erlang:process_flag(max_heap_size, #{
8,462✔
309
        size => OverflowedSize,
310
        kill => true,
311
        error_logger => true
312
    }).
313

314
-spec proc_name(atom(), pos_integer()) -> atom().
315
proc_name(Mod, Id) ->
316
    list_to_atom(lists:concat([Mod, "_", Id])).
7,184✔
317

318
%% Get Proc's Stats.
319
%% [FIXME] cross-dependency on `emqx_types`.
320
-spec proc_stats() -> emqx_types:stats().
321
proc_stats() -> proc_stats(self()).
32,371✔
322

323
-spec proc_stats(pid()) -> emqx_types:stats().
324
proc_stats(Pid) ->
325
    case
32,371✔
326
        process_info(Pid, [
327
            message_queue_len,
328
            heap_size,
329
            total_heap_size,
330
            reductions,
331
            memory
332
        ])
333
    of
334
        undefined -> [];
×
335
        [{message_queue_len, Len} | ProcStats] -> [{mailbox_len, Len} | ProcStats]
32,371✔
336
    end.
337

338
rand_seed() ->
339
    rand:seed(exsplus, erlang:timestamp()).
×
340

341
-spec now_to_secs(erlang:timestamp()) -> pos_integer().
342
now_to_secs({MegaSecs, Secs, _MicroSecs}) ->
343
    MegaSecs * 1000000 + Secs.
×
344

345
-spec now_to_ms(erlang:timestamp()) -> pos_integer().
346
now_to_ms({MegaSecs, Secs, MicroSecs}) ->
347
    (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs / 1000).
×
348

349
%% lists:index_of/2
350
index_of(E, L) ->
351
    index_of(E, 1, L).
770✔
352

353
index_of(_E, _I, []) ->
354
    error(badarg);
×
355
index_of(E, I, [E | _]) ->
356
    I;
770✔
357
index_of(E, I, [_ | L]) ->
358
    index_of(E, I + 1, L).
3,833✔
359

360
-spec bin_to_hexstr(binary(), lower | upper) -> binary().
361
bin_to_hexstr(B, upper) when is_binary(B) ->
362
    <<<<(int2hexchar(H, upper)), (int2hexchar(L, upper))>> || <<H:4, L:4>> <= B>>;
2,989✔
363
bin_to_hexstr(B, lower) when is_binary(B) ->
364
    <<<<(int2hexchar(H, lower)), (int2hexchar(L, lower))>> || <<H:4, L:4>> <= B>>.
300✔
365

366
int2hexchar(I, _) when I >= 0 andalso I < 10 -> I + $0;
81,615✔
367
int2hexchar(I, upper) -> I - 10 + $A;
20,655✔
368
int2hexchar(I, lower) -> I - 10 + $a.
5,136✔
369

370
-spec hexstr_to_bin(binary()) -> binary().
371
hexstr_to_bin(B) when is_binary(B) ->
372
    hexstr_to_bin(B, erlang:bit_size(B)).
124✔
373

374
hexstr_to_bin(B, Size) when is_binary(B) ->
375
    case Size rem 16 of
124✔
376
        0 ->
377
            make_binary(B);
120✔
378
        8 ->
379
            make_binary(<<"0", B/binary>>);
4✔
380
        _ ->
381
            throw({unsupport_hex_string, B, Size})
×
382
    end.
383

384
make_binary(B) -> <<<<(hexchar2int(H) * 16 + hexchar2int(L))>> || <<H:8, L:8>> <= B>>.
124✔
385

386
hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
1,572✔
387
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
749✔
388
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
3✔
389

390
-spec gen_id() -> list().
391
gen_id() ->
392
    gen_id(?SHORT).
9✔
393

394
-spec gen_id(integer()) -> list().
395
gen_id(Len) ->
396
    BitLen = Len * 4,
950✔
397
    <<R:BitLen>> = crypto:strong_rand_bytes(Len div 2),
950✔
398
    int_to_hex(R, Len).
950✔
399

400
-spec clamp(number(), number(), number()) -> number().
401
clamp(Val, Min, _Max) when Val < Min -> Min;
5,106✔
402
clamp(Val, _Min, Max) when Val > Max -> Max;
4✔
403
clamp(Val, _Min, _Max) -> Val.
346✔
404

405
%% @doc https://www.erlang.org/doc/man/file.html#posix-error-codes
406
explain_posix(eacces) -> "Permission denied";
×
407
explain_posix(eagain) -> "Resource temporarily unavailable";
×
408
explain_posix(ebadf) -> "Bad file number";
×
409
explain_posix(ebusy) -> "File busy";
×
410
explain_posix(edquot) -> "Disk quota exceeded";
×
411
explain_posix(eexist) -> "File already exists";
×
412
explain_posix(efault) -> "Bad address in system call argument";
×
413
explain_posix(efbig) -> "File too large";
×
414
explain_posix(eintr) -> "Interrupted system call";
×
415
explain_posix(einval) -> "Invalid argument argument file/socket";
×
416
explain_posix(eio) -> "I/O error";
×
417
explain_posix(eisdir) -> "Illegal operation on a directory";
×
418
explain_posix(eloop) -> "Too many levels of symbolic links";
×
419
explain_posix(emfile) -> "Too many open files";
×
420
explain_posix(emlink) -> "Too many links";
×
421
explain_posix(enametoolong) -> "Filename too long";
×
422
explain_posix(enfile) -> "File table overflow";
×
423
explain_posix(enodev) -> "No such device";
×
424
explain_posix(enoent) -> "No such file or directory";
2✔
425
explain_posix(enomem) -> "Not enough memory";
×
426
explain_posix(enospc) -> "No space left on device";
×
427
explain_posix(enotblk) -> "Block device required";
×
428
explain_posix(enotdir) -> "Not a directory";
×
429
explain_posix(enotsup) -> "Operation not supported";
×
430
explain_posix(enxio) -> "No such device or address";
×
431
explain_posix(eperm) -> "Not owner";
×
432
explain_posix(epipe) -> "Broken pipe";
×
433
explain_posix(erofs) -> "Read-only file system";
×
434
explain_posix(espipe) -> "Invalid seek";
×
435
explain_posix(esrch) -> "No such process";
×
436
explain_posix(estale) -> "Stale remote file handle";
×
437
explain_posix(exdev) -> "Cross-domain link";
×
438
explain_posix(NotPosix) -> NotPosix.
×
439

440
-spec pforeach(fun((A) -> term()), list(A)) -> ok.
441
pforeach(Fun, List) when is_function(Fun, 1), is_list(List) ->
442
    pforeach(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
×
443

444
-spec pforeach(fun((A) -> term()), list(A), timeout()) -> ok.
445
pforeach(Fun, List, Timeout) ->
446
    _ = pmap(Fun, List, Timeout),
371✔
447
    ok.
371✔
448

449
%% @doc Like lists:map/2, only the callback function is evaluated
450
%% concurrently.
451
-spec pmap(fun((A) -> B), list(A)) -> list(B).
452
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
453
    pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
30✔
454

455
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
456
pmap(Fun, List, Timeout) when
457
    is_function(Fun, 1),
458
    is_list(List),
459
    (is_integer(Timeout) andalso Timeout >= 0 orelse Timeout =:= infinity)
460
->
461
    nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
7,413✔
462

463
%% @doc Delegate a function to a worker process.
464
%% The function may spawn_link other processes but we do not
465
%% want the caller process to be linked.
466
%% This is done by isolating the possible link with a not-linked
467
%% middleman process.
468
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
×
469

470
%% @doc Same as `nolink_apply/1', with a timeout.
471
-spec nolink_apply(function(), timeout()) -> term().
472
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
473
    Caller = self(),
7,413✔
474
    ResRef = alias([reply]),
7,413✔
475
    Middleman = erlang:spawn(
7,413✔
476
        fun() ->
477
            process_flag(trap_exit, true),
7,413✔
478
            CallerMRef = erlang:monitor(process, Caller),
7,413✔
479
            Worker = erlang:spawn_link(
7,413✔
480
                fun() ->
481
                    Res =
7,413✔
482
                        try
483
                            {normal, Fun()}
7,413✔
484
                        catch
485
                            C:E:S ->
486
                                {exception, {C, E, S}}
33✔
487
                        end,
488
                    _ = erlang:send(ResRef, {ResRef, Res}),
7,375✔
489
                    ?tp(pmap_middleman_sent_response, #{}),
7,375✔
490
                    exit(normal)
6,664✔
491
                end
492
            ),
493
            receive
7,413✔
494
                {'DOWN', CallerMRef, process, _, _} ->
495
                    %% For whatever reason, if the caller is dead,
496
                    %% there is no reason to continue
497
                    exit(Worker, kill),
1,083✔
498
                    exit(normal);
1,083✔
499
                {'EXIT', Worker, normal} ->
500
                    exit(normal);
6,287✔
501
                {'EXIT', Worker, Reason} ->
502
                    %% worker exited with some reason other than 'normal'
503
                    _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
×
504
                    exit(normal)
×
505
            end
506
        end
507
    ),
508
    receive
7,413✔
509
        {ResRef, {normal, Result}} ->
510
            Result;
7,341✔
511
        {ResRef, {exception, {C, E, S}}} ->
512
            erlang:raise(C, E, S);
29✔
513
        {ResRef, {'EXIT', Reason}} ->
514
            exit(Reason)
×
515
    after Timeout ->
516
        %% possible race condition: a message was received just as we enter the after
517
        %% block.
518
        ?tp(pmap_timeout, #{}),
43✔
519
        unalias(ResRef),
43✔
520
        exit(Middleman, kill),
43✔
521
        receive
43✔
522
            {ResRef, {normal, Result}} ->
523
                Result;
×
524
            {ResRef, {exception, {C, E, S}}} ->
UNCOV
525
                erlang:raise(C, E, S);
×
526
            {ResRef, {'EXIT', Reason}} ->
527
                exit(Reason)
×
528
        after 0 ->
529
            exit(timeout)
43✔
530
        end
531
    end.
532

533
safe_to_existing_atom(In) ->
534
    safe_to_existing_atom(In, utf8).
1,244✔
535

536
safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) ->
537
    try_to_existing_atom(fun erlang:binary_to_existing_atom/2, Bin, Encoding);
72,794✔
538
safe_to_existing_atom(List, Encoding) when is_list(List) ->
539
    try_to_existing_atom(fun(In, _) -> erlang:list_to_existing_atom(In) end, List, Encoding);
1,220✔
540
safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) ->
541
    {ok, Atom};
4,756✔
542
safe_to_existing_atom(_Any, _Encoding) ->
543
    {error, invalid_type}.
×
544

545
-spec tcp_keepalive_opts(term(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ->
546
    {ok, [{keepalive, true} | {raw, non_neg_integer(), non_neg_integer(), binary()}]}
547
    | {error, {unsupported_os, term()}}.
548
tcp_keepalive_opts({unix, linux}, Idle, Interval, Probes) ->
549
    {ok, [
1✔
550
        {keepalive, true},
551
        {raw, 6, 4, <<Idle:32/native>>},
552
        {raw, 6, 5, <<Interval:32/native>>},
553
        {raw, 6, 6, <<Probes:32/native>>}
554
    ]};
555
tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) ->
556
    {ok, [
×
557
        {keepalive, true},
558
        {raw, 6, 16#10, <<Idle:32/native>>},
559
        {raw, 6, 16#101, <<Interval:32/native>>},
560
        {raw, 6, 16#102, <<Probes:32/native>>}
561
    ]};
562
tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
563
    {error, {unsupported_os, OS}}.
×
564

565
format(Term) ->
566
    iolist_to_binary(io_lib:format("~0p", [Term])).
1,123✔
567

568
-spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return().
569
call_first_defined([{Module, Function, Args} | Rest]) ->
570
    try
1,278,409✔
571
        apply(Module, Function, Args)
1,278,409✔
572
    catch
573
        error:undef:Stacktrace ->
574
            case Stacktrace of
605,474✔
575
                [{Module, Function, _, _} | _] ->
576
                    call_first_defined(Rest);
605,474✔
577
                _ ->
578
                    erlang:raise(error, undef, Stacktrace)
×
579
            end
580
    end;
581
call_first_defined([]) ->
582
    error(none_fun_is_defined).
×
583

584
%%------------------------------------------------------------------------------
585
%% Internal Functions
586
%%------------------------------------------------------------------------------
587

588
do_parallel_map(Fun, List) ->
589
    Parent = self(),
7,413✔
590
    PidList = lists:map(
7,413✔
591
        fun(Item) ->
592
            erlang:spawn_link(
34,774✔
593
                fun() ->
594
                    Res =
34,774✔
595
                        try
596
                            {normal, Fun(Item)}
34,774✔
597
                        catch
598
                            C:E:St ->
599
                                {exception, {C, E, St}}
257✔
600
                        end,
601
                    Parent ! {self(), Res}
34,742✔
602
                end
603
            )
604
        end,
605
        List
606
    ),
607
    lists:foldr(
7,413✔
608
        fun(Pid, Acc) ->
609
            receive
34,484✔
610
                {Pid, {normal, Result}} ->
611
                    [Result | Acc];
34,413✔
612
                {Pid, {exception, {C, E, St}}} ->
613
                    erlang:raise(C, E, St)
35✔
614
            end
615
        end,
616
        [],
617
        PidList
618
    ).
619

620
int_to_hex(I, N) when is_integer(I), I >= 0 ->
621
    int_to_hex([], I, 1, N).
950✔
622

623
int_to_hex(L, I, Count, N) when
624
    I < 16
625
->
626
    pad([int_to_hex(I) | L], N - Count);
950✔
627
int_to_hex(L, I, Count, N) ->
628
    int_to_hex([int_to_hex(I rem 16) | L], I div 16, Count + 1, N).
9,473✔
629

630
int_to_hex(I) when 0 =< I, I =< 9 ->
631
    I + $0;
6,529✔
632
int_to_hex(I) when 10 =< I, I =< 15 ->
633
    (I - 10) + $a.
3,894✔
634

635
pad(L, 0) ->
636
    L;
950✔
637
pad(L, Count) ->
638
    pad([$0 | L], Count - 1).
65✔
639

640
readable_error_msg(Msg) when is_binary(Msg) -> Msg;
903✔
641
readable_error_msg(Error) ->
642
    case io_lib:printable_unicode_list(Error) of
1,339✔
643
        true ->
644
            unicode:characters_to_binary(Error, utf8);
8✔
645
        false ->
646
            case emqx_hocon:format_error(Error, #{no_stacktrace => true}) of
1,331✔
647
                {ok, Msg} ->
648
                    Msg;
132✔
649
                false ->
650
                    to_hr_error(Error)
1,199✔
651
            end
652
    end.
653

654
to_hr_error(nxdomain) ->
655
    <<"Could not resolve host">>;
15✔
656
to_hr_error(econnrefused) ->
657
    <<"Connection refused">>;
53✔
658
to_hr_error({unauthorized_client, _}) ->
659
    <<"Unauthorized client">>;
6✔
660
to_hr_error({not_authorized, _}) ->
661
    <<"Not authorized">>;
1✔
662
to_hr_error({malformed_username_or_password, _}) ->
663
    <<"Bad username or password">>;
1✔
664
to_hr_error(Error) ->
665
    format(Error).
1,123✔
666

667
try_to_existing_atom(Convert, Data, Encoding) ->
668
    try Convert(Data, Encoding) of
74,014✔
669
        Atom ->
670
            {ok, Atom}
74,000✔
671
    catch
672
        _:Reason -> {error, Reason}
14✔
673
    end.
674

675
redact(Term) ->
676
    emqx_utils_redact:redact(Term).
3,705✔
677

678
redact(Term, Checker) ->
679
    emqx_utils_redact:redact(Term, Checker).
40✔
680

681
deobfuscate(NewConf, OldConf) ->
682
    emqx_utils_redact:deobfuscate(NewConf, OldConf).
196✔
683

684
is_redacted(K, V) ->
685
    emqx_utils_redact:is_redacted(K, V).
8✔
686

687
is_redacted(K, V, Fun) ->
688
    emqx_utils_redact:is_redacted(K, V, Fun).
×
689

690
-ifdef(TEST).
691
-include_lib("eunit/include/eunit.hrl").
692

693
ipv6_probe_test() ->
694
    try gen_tcp:ipv6_probe() of
1✔
695
        true ->
696
            ?assertEqual([{ipv6_probe, true}], ipv6_probe([]))
1✔
697
    catch
698
        _:_ ->
699
            ok
×
700
    end.
701

702
-endif.
703

704
pub_props_to_packet(Properties) ->
705
    F = fun
47✔
706
        ('User-Property', M) ->
707
            case is_map(M) andalso map_size(M) > 0 of
47✔
708
                true -> {true, maps:to_list(M)};
11✔
709
                false -> false
36✔
710
            end;
711
        ('User-Property-Pairs', _) ->
712
            false;
×
713
        (_, _) ->
714
            true
15✔
715
    end,
716
    maps:filtermap(F, Properties).
47✔
717

718
%% fix filename by replacing characters which could be invalid on some filesystems
719
%% with safe ones
720
-spec safe_filename(binary() | unicode:chardata()) -> binary() | [unicode:chardata()].
721
safe_filename(Filename) when is_binary(Filename) ->
722
    binary:replace(Filename, <<":">>, <<"-">>, [global]);
495✔
723
safe_filename(Filename) when is_list(Filename) ->
724
    lists:flatten(string:replace(Filename, ":", "-", all)).
17,467✔
725

726
%% @doc Compares two lists of maps and returns the differences between them in a
727
%% map containing four keys – 'removed', 'added', 'identical', and 'changed' –
728
%% each holding a list of maps. Elements are compared using key function KeyFunc
729
%% to extract the comparison key used for matching.
730
%%
731
%% The return value is a map with the following keys and the list of maps as its values:
732
%% * 'removed' – a list of maps that were present in the Old list, but not found in the New list.
733
%% * 'added' – a list of maps that were present in the New list, but not found in the Old list.
734
%% * 'identical' – a list of maps that were present in both lists and have the same comparison key value.
735
%% * 'changed' – a list of pairs of maps representing the changes between maps present in the New and Old lists.
736
%% The first map in the pair represents the map in the Old list, and the second map
737
%% represents the potential modification in the New list.
738

739
%% The KeyFunc parameter is a function that extracts the comparison key used
740
%% for matching from each map. The function should return a comparable term,
741
%% such as an atom, a number, or a string. This is used to determine if each
742
%% element is the same in both lists.
743

744
-spec diff_lists(list(T), list(T), Func) ->
745
    #{
746
        added := list(T),
747
        identical := list(T),
748
        removed := list(T),
749
        changed := list({Old :: T, New :: T})
750
    }
751
when
752
    Func :: fun((T) -> any()),
753
    T :: any().
754

755
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
756
    Removed =
80✔
757
        lists:foldl(
758
            fun(E, RemovedAcc) ->
759
                case search(KeyFunc(E), KeyFunc, New) of
150✔
760
                    false -> [E | RemovedAcc];
8✔
761
                    _ -> RemovedAcc
142✔
762
                end
763
            end,
764
            [],
765
            Old
766
        ),
767
    {Added, Identical, Changed} =
80✔
768
        lists:foldl(
769
            fun(E, Acc) ->
770
                {Added0, Identical0, Changed0} = Acc,
215✔
771
                case search(KeyFunc(E), KeyFunc, Old) of
215✔
772
                    false ->
773
                        {[E | Added0], Identical0, Changed0};
73✔
774
                    E ->
775
                        {Added0, [E | Identical0], Changed0};
107✔
776
                    E1 ->
777
                        {Added0, Identical0, [{E1, E} | Changed0]}
35✔
778
                end
779
            end,
780
            {[], [], []},
781
            New
782
        ),
783
    #{
80✔
784
        removed => lists:reverse(Removed),
785
        added => lists:reverse(Added),
786
        identical => lists:reverse(Identical),
787
        changed => lists:reverse(Changed)
788
    }.
789

790
%% @doc Merges two lists preserving the original order of elements in both lists.
791
%% KeyFunc must extract a unique key from each element.
792
%% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but
793
%% the element position in the result list will equal its position in List1.
794
%% Example:
795
%%     emqx_utils:merge_append_lists(
796
%%         [#{id => a, val => old}, #{id => b, val => old}],
797
%%         [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}],
798
%%         fun(#{id := Id}) -> Id end).
799
%%    [#{id => a,val => new},
800
%%     #{id => b,val => new},
801
%%     #{id => c},
802
%%     #{id => d}]
803
-spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when
804
    KeyFunc :: fun((T) -> any()),
805
    T :: any().
806
merge_lists(List1, List2, KeyFunc) ->
807
    WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2),
52✔
808
    WithKeysList1 = lists:map(
52✔
809
        fun(E) ->
810
            K = KeyFunc(E),
76✔
811
            case lists:keyfind(K, 1, WithKeysList2) of
76✔
812
                false -> {K, E};
×
813
                WithKey1 -> WithKey1
76✔
814
            end
815
        end,
816
        List1
817
    ),
818
    NewWithKeysList2 = lists:filter(
52✔
819
        fun({K, _}) ->
820
            not lists:keymember(K, 1, WithKeysList1)
134✔
821
        end,
822
        WithKeysList2
823
    ),
824
    [E || {_, E} <- WithKeysList1 ++ NewWithKeysList2].
52✔
825

826
search(_ExpectValue, _KeyFunc, []) ->
827
    false;
81✔
828
search(ExpectValue, KeyFunc, [Item | List]) ->
829
    case KeyFunc(Item) =:= ExpectValue of
826✔
830
        true -> Item;
284✔
831
        false -> search(ExpectValue, KeyFunc, List)
542✔
832
    end.
833

834
%% @doc Maps over a list of terms and flattens the result, giving back a flat
835
%% list of terms. It's similar to `lists:flatmap/2`, but it also works on a
836
%% single term as `Fun` output (thus, the wordplay on "flatter").
837
%% The purpose of this function is to adapt to `Fun`s that return either a `[]`
838
%% or a term, and to avoid costs of list construction and flattening when
839
%% dealing with large lists.
840
-spec flattermap(Fun, [X]) -> [X] when
841
    Fun :: fun((X) -> [X] | X).
842
flattermap(_Fun, []) ->
843
    [];
×
844
flattermap(Fun, [X | Xs]) ->
845
    flatcomb(Fun(X), flattermap(Fun, Xs)).
×
846

847
flatcomb([], Zs) ->
848
    Zs;
×
849
flatcomb(Ys = [_ | _], []) ->
850
    Ys;
×
851
flatcomb(Ys = [_ | _], Zs = [_ | _]) ->
852
    Ys ++ Zs;
×
853
flatcomb(Y, Zs) ->
854
    [Y | Zs].
×
855

856
%% @doc Format IP address tuple or {IP, Port} tuple to string.
857
ntoa({IP, Port}) ->
858
    ntoa(IP) ++ ":" ++ integer_to_list(Port);
38✔
859
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
860
    %% v6 piggyback v4
861
    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
×
862
ntoa(IP) ->
863
    inet_parse:ntoa(IP).
441✔
864

865
%% @doc Return true if the provided string is a restricted string:
866
%% Start with a letter or a digit,
867
%% remaining characters can be '-' or '_' in addition to letters and digits
868
is_restricted_str(String) ->
869
    RE = <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>,
×
870
    match =:= re:run(String, RE, [{capture, none}]).
×
871

872
-ifdef(TEST).
873
-include_lib("eunit/include/eunit.hrl").
874

875
diff_lists_test() ->
876
    KeyFunc = fun(#{name := Name}) -> Name end,
1✔
877
    ?assertEqual(
1✔
878
        #{
879
            removed => [],
880
            added => [],
881
            identical => [],
882
            changed => []
883
        },
1✔
884
        diff_lists([], [], KeyFunc)
1✔
885
    ),
886
    %% test removed list
887
    ?assertEqual(
1✔
888
        #{
889
            removed => [#{name => a, value => 1}],
890
            added => [],
891
            identical => [],
892
            changed => []
893
        },
1✔
894
        diff_lists([], [#{name => a, value => 1}], KeyFunc)
1✔
895
    ),
896
    %% test added list
897
    ?assertEqual(
1✔
898
        #{
899
            removed => [],
900
            added => [#{name => a, value => 1}],
901
            identical => [],
902
            changed => []
903
        },
1✔
904
        diff_lists([#{name => a, value => 1}], [], KeyFunc)
1✔
905
    ),
906
    %% test identical list
907
    ?assertEqual(
1✔
908
        #{
909
            removed => [],
910
            added => [],
911
            identical => [#{name => a, value => 1}],
912
            changed => []
913
        },
1✔
914
        diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc)
1✔
915
    ),
916
    Old = [
1✔
917
        #{name => a, value => 1},
918
        #{name => b, value => 4},
919
        #{name => e, value => 2},
920
        #{name => d, value => 4}
921
    ],
922
    New = [
1✔
923
        #{name => a, value => 1},
924
        #{name => b, value => 2},
925
        #{name => e, value => 2},
926
        #{name => c, value => 3}
927
    ],
928
    Diff = diff_lists(New, Old, KeyFunc),
1✔
929
    ?assertEqual(
1✔
930
        #{
931
            added => [
932
                #{name => c, value => 3}
933
            ],
934
            identical => [
935
                #{name => a, value => 1},
936
                #{name => e, value => 2}
937
            ],
938
            removed => [
939
                #{name => d, value => 4}
940
            ],
941
            changed => [{#{name => b, value => 4}, #{name => b, value => 2}}]
942
        },
1✔
943
        Diff
1✔
944
    ),
945
    ok.
1✔
946

947
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc