• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.46
/apps/emqx/src/emqx_trace/emqx_trace.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_trace).
17

18
-behaviour(gen_server).
19

20
-include_lib("emqx/include/emqx.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("kernel/include/file.hrl").
23
-include_lib("snabbkaffe/include/trace.hrl").
24
-include_lib("emqx/include/emqx_trace.hrl").
25

26
-export([
27
    publish/1,
28
    subscribe/3,
29
    unsubscribe/2,
30
    log/3,
31
    log/4
32
]).
33

34
-export([
35
    start_link/0,
36
    list/0,
37
    list/1,
38
    get_trace_filename/1,
39
    create/1,
40
    delete/1,
41
    clear/0,
42
    update/2,
43
    check/0,
44
    now_second/0
45
]).
46

47
-export([
48
    format/1,
49
    zip_dir/0,
50
    filename/2,
51
    trace_dir/0,
52
    trace_file/1,
53
    trace_file_detail/1,
54
    delete_files_after_send/2
55
]).
56

57
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
58

59
-ifdef(TEST).
60
-export([
61
    log_file/2,
62
    find_closest_time/2
63
]).
64
-endif.
65

66
-export_type([ip_address/0]).
67
-type ip_address() :: string().
68

69
publish(#message{topic = <<"$SYS/", _/binary>>}) ->
70
    ignore;
23,693✔
71
publish(#message{from = From, topic = Topic, payload = Payload}) when
72
    is_binary(From); is_atom(From)
73
->
74
    ?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}).
22,091✔
75

76
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) ->
77
    ignore;
8✔
78
subscribe(Topic, SubId, SubOpts) ->
79
    ?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}).
16,556✔
80

81
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) ->
82
    ignore;
×
83
unsubscribe(Topic, SubOpts) ->
84
    ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
6,928✔
85

86
log(List, Msg, Meta) ->
87
    log(debug, List, Msg, Meta).
99✔
88

89
log(Level, List, Msg, Meta) ->
90
    Log = #{level => Level, meta => enrich_meta(Meta), msg => Msg},
99✔
91
    log_filter(List, Log).
99✔
92

93
enrich_meta(Meta) ->
94
    case logger:get_process_metadata() of
99✔
95
        undefined -> Meta;
4✔
96
        ProcMeta -> maps:merge(ProcMeta, Meta)
95✔
97
    end.
98

99
log_filter([], _Log) ->
100
    ok;
99✔
101
log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
102
    case FilterFun(Log0, {Filter, Name}) of
162✔
103
        stop ->
104
            stop;
66✔
105
        ignore ->
106
            ignore;
×
107
        Log ->
108
            case logger_config:get(logger, Id) of
96✔
109
                {ok, #{module := Module} = HandlerConfig0} ->
110
                    HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
87✔
111
                    try
87✔
112
                        Module:log(Log, HandlerConfig)
87✔
113
                    catch
114
                        C:R:S ->
115
                            case logger:remove_handler(Id) of
×
116
                                ok ->
117
                                    logger:internal_log(
×
118
                                        error, {removed_failing_handler, Id, C, R, S}
119
                                    );
120
                                {error, {not_found, _}} ->
121
                                    %% Probably already removed by other client
122
                                    %% Don't report again
123
                                    ok;
×
124
                                {error, Reason} ->
125
                                    logger:internal_log(
×
126
                                        error,
127
                                        {removed_handler_failed, Id, Reason, C, R, S}
128
                                    )
129
                            end
130
                    end;
131
                {error, {not_found, Id}} ->
132
                    ok;
9✔
133
                {error, Reason} ->
134
                    logger:internal_log(error, {find_handle_id_failed, Id, Reason})
×
135
            end
136
    end,
137
    log_filter(Rest, Log0).
162✔
138

139
-spec start_link() -> emqx_types:startlink_ret().
140
start_link() ->
141
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
29✔
142

143
-spec list() -> [tuple()].
144
list() ->
145
    ets:match_object(?TRACE, #?TRACE{_ = '_'}).
97✔
146

147
-spec list(boolean()) -> [tuple()].
148
list(Enable) ->
149
    ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
×
150

151
-spec create([{Key :: binary(), Value :: any()}] | #{atom() => any()}) ->
152
    {ok, #?TRACE{}}
153
    | {error,
154
        {duplicate_condition, iodata()}
155
        | {already_existed, iodata()}
156
        | {bad_type, any()}
157
        | iodata()}.
158
create(Trace) ->
159
    case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
58✔
160
        true ->
161
            case to_trace(Trace) of
57✔
162
                {ok, TraceRec} -> insert_new_trace(TraceRec);
47✔
163
                {error, Reason} -> {error, Reason}
10✔
164
            end;
165
        false ->
166
            {error,
1✔
167
                "The number of traces created has reached the maximum"
168
                " please delete the useless ones first"}
169
    end.
170

171
-spec delete(Name :: binary()) -> ok | {error, not_found}.
172
delete(Name) ->
173
    transaction(fun emqx_trace_dl:delete/1, [Name]).
4✔
174

175
-spec clear() -> ok | {error, Reason :: term()}.
176
clear() ->
177
    case mria:clear_table(?TRACE) of
16✔
178
        {atomic, ok} -> ok;
16✔
179
        {aborted, Reason} -> {error, Reason}
×
180
    end.
181

182
-spec update(Name :: binary(), Enable :: boolean()) ->
183
    ok | {error, not_found | finished}.
184
update(Name, Enable) ->
185
    transaction(fun emqx_trace_dl:update/2, [Name, Enable]).
7✔
186

187
check() ->
188
    gen_server:call(?MODULE, check).
4✔
189

190
-spec get_trace_filename(Name :: binary()) ->
191
    {ok, FileName :: string()} | {error, not_found}.
192
get_trace_filename(Name) ->
193
    transaction(fun emqx_trace_dl:get_trace_filename/1, [Name]).
4✔
194

195
-spec trace_file(File :: file:filename_all()) ->
196
    {ok, Node :: list(), Binary :: binary()}
197
    | {error, Node :: list(), Reason :: term()}.
198
trace_file(File) ->
199
    FileName = filename:join(trace_dir(), File),
1✔
200
    Node = atom_to_list(node()),
1✔
201
    case file:read_file(FileName) of
1✔
202
        {ok, Bin} -> {ok, Node, Bin};
1✔
203
        {error, Reason} -> {error, Node, Reason}
×
204
    end.
205

206
trace_file_detail(File) ->
207
    FileName = filename:join(trace_dir(), File),
2✔
208
    Node = atom_to_binary(node()),
2✔
209
    case file:read_file_info(FileName, [{'time', 'posix'}]) of
2✔
210
        {ok, #file_info{size = Size, mtime = Mtime}} ->
211
            {ok, #{size => Size, mtime => Mtime, node => Node}};
2✔
212
        {error, Reason} ->
213
            {error, #{reason => Reason, node => Node, file => File}}
×
214
    end.
215

216
delete_files_after_send(TraceLog, Zips) ->
217
    gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}).
×
218

219
-spec format(list(#?TRACE{})) -> list(map()).
220
format(Traces) ->
221
    Fields = record_info(fields, ?TRACE),
4✔
222
    lists:map(
4✔
223
        fun(Trace0 = #?TRACE{}) ->
224
            [_ | Values] = tuple_to_list(Trace0),
7✔
225
            maps:from_list(lists:zip(Fields, Values))
7✔
226
        end,
227
        Traces
228
    ).
229

230
init([]) ->
231
    erlang:process_flag(trap_exit, true),
29✔
232
    Fields = record_info(fields, ?TRACE),
29✔
233
    ok = mria:create_table(?TRACE, [
29✔
234
        {type, set},
235
        {rlog_shard, ?SHARD},
236
        {storage, disc_copies},
237
        {record_name, ?TRACE},
238
        {attributes, Fields}
239
    ]),
240
    ok = mria:wait_for_tables([?TRACE]),
29✔
241
    maybe_migrate_trace(Fields),
29✔
242
    {ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
29✔
243
    ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])),
29✔
244
    ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])),
29✔
245
    Traces = get_enabled_trace(),
29✔
246
    TRef = update_trace(Traces),
29✔
247
    update_trace_handler(),
29✔
248
    {ok, #{timer => TRef, monitors => #{}}}.
29✔
249

250
handle_call(check, _From, State) ->
251
    {_, NewState} = handle_info({mnesia_table_event, check}, State),
4✔
252
    {reply, ok, NewState};
4✔
253
handle_call(Req, _From, State) ->
254
    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
×
255
    {reply, ok, State}.
×
256

257
handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) ->
258
    erlang:monitor(process, Pid),
×
259
    {noreply, State#{monitors => Monitors#{Pid => Files}}};
×
260
handle_cast(Msg, State) ->
261
    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
×
262
    {noreply, State}.
×
263

264
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
265
    case maps:take(Pid, Monitors) of
×
266
        error ->
267
            {noreply, State};
×
268
        {Files, NewMonitors} ->
269
            lists:foreach(fun file:delete/1, Files),
×
270
            {noreply, State#{monitors => NewMonitors}}
×
271
    end;
272
handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
273
    Traces = get_enabled_trace(),
94✔
274
    NextTRef = update_trace(Traces),
94✔
275
    update_trace_handler(),
94✔
276
    ?tp(update_trace_done, #{}),
94✔
277
    {noreply, State#{timer => NextTRef}};
94✔
278
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
279
    emqx_utils:cancel_timer(TRef),
91✔
280
    handle_info({timeout, TRef, update_trace}, State);
91✔
281
handle_info(Info, State) ->
UNCOV
282
    ?SLOG(error, #{msg => "unexpected_info", req => Info}),
×
UNCOV
283
    {noreply, State}.
×
284

285
terminate(_Reason, #{timer := TRef}) ->
286
    _ = mnesia:unsubscribe({table, ?TRACE, simple}),
19✔
287
    emqx_utils:cancel_timer(TRef),
19✔
288
    stop_all_trace_handler(),
19✔
289
    update_trace_handler(),
19✔
290
    _ = file:del_dir_r(zip_dir()),
19✔
291
    ok.
19✔
292

293
code_change(_, State, _Extra) ->
294
    {ok, State}.
×
295

296
insert_new_trace(Trace) ->
297
    transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]).
47✔
298

299
update_trace(Traces) ->
300
    Now = now_second(),
123✔
301
    {_Waiting, Running, Finished} = classify_by_time(Traces, Now),
123✔
302
    disable_finished(Finished),
123✔
303
    Started = emqx_trace_handler:running(),
123✔
304
    {NeedRunning, AllStarted} = start_trace(Running, Started),
123✔
305
    NeedStop = filter_cli_handler(AllStarted) -- NeedRunning,
123✔
306
    ok = stop_trace(NeedStop, Started),
123✔
307
    clean_stale_trace_files(),
123✔
308
    NextTime = find_closest_time(Traces, Now),
123✔
309
    emqx_utils:start_timer(NextTime, update_trace).
123✔
310

311
stop_all_trace_handler() ->
312
    lists:foreach(
19✔
313
        fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
40✔
314
        emqx_trace_handler:running()
315
    ).
316

317
get_enabled_trace() ->
318
    {atomic, Traces} =
123✔
319
        mria:ro_transaction(?SHARD, fun emqx_trace_dl:get_enabled_trace/0),
320
    Traces.
123✔
321

322
find_closest_time(Traces, Now) ->
323
    Sec =
128✔
324
        lists:foldl(
325
            fun
326
                (#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
327
                    min(closest(End, Now, Closest), closest(Start, Now, Closest));
1,042✔
328
                (_, Closest) ->
329
                    Closest
2✔
330
            end,
331
            60 * 15,
332
            Traces
333
        ),
334
    timer:seconds(Sec).
128✔
335

336
closest(Time, Now, Closest) when Now >= Time -> Closest;
1,039✔
337
closest(Time, Now, Closest) -> min(Time - Now, Closest).
1,045✔
338

339
disable_finished([]) ->
340
    ok;
121✔
341
disable_finished(Traces) ->
342
    transaction(fun emqx_trace_dl:delete_finished/1, [Traces]).
2✔
343

344
start_trace(Traces, Started0) ->
345
    Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
123✔
346
    lists:foldl(
123✔
347
        fun(
348
            #?TRACE{name = Name} = Trace,
349
            {Running, StartedAcc}
350
        ) ->
351
            case lists:member(Name, StartedAcc) of
1,030✔
352
                true ->
353
                    {[Name | Running], StartedAcc};
947✔
354
                false ->
355
                    case start_trace(Trace) of
83✔
356
                        ok -> {[Name | Running], [Name | StartedAcc]};
83✔
357
                        {error, _Reason} -> {[Name | Running], StartedAcc}
×
358
                    end
359
            end
360
        end,
361
        {[], Started},
362
        Traces
363
    ).
364

365
start_trace(Trace) ->
366
    #?TRACE{
83✔
367
        name = Name,
368
        type = Type,
369
        filter = Filter,
370
        start_at = Start,
371
        payload_encode = PayloadEncode
372
    } = Trace,
373
    Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode},
83✔
374
    emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
83✔
375

376
stop_trace(Finished, Started) ->
377
    lists:foreach(
123✔
378
        fun(#{name := Name, id := HandlerID, dst := FilePath, type := Type, filter := Filter}) ->
379
            case lists:member(Name, Finished) of
998✔
380
                true ->
381
                    _ = maybe_sync_logfile(HandlerID),
43✔
382
                    case file:read_file_info(FilePath) of
43✔
383
                        {ok, #file_info{size = Size}} when Size > 0 ->
384
                            ?TRACE("API", "trace_stopping", #{Type => Filter});
4✔
385
                        _ ->
386
                            ok
39✔
387
                    end,
388
                    emqx_trace_handler:uninstall(Type, Name);
43✔
389
                false ->
390
                    ok
955✔
391
            end
392
        end,
393
        Started
394
    ).
395

396
maybe_sync_logfile(HandlerID) ->
397
    case logger:get_handler_config(HandlerID) of
43✔
398
        {ok, #{module := Mod}} ->
399
            case erlang:function_exported(Mod, filesync, 1) of
43✔
400
                true ->
401
                    Mod:filesync(HandlerID);
43✔
402
                false ->
403
                    ok
×
404
            end;
405
        _ ->
406
            ok
×
407
    end.
408

409
clean_stale_trace_files() ->
410
    TraceDir = trace_dir(),
123✔
411
    case file:list_dir(TraceDir) of
123✔
412
        {ok, AllFiles} when AllFiles =/= ["zip"] ->
413
            FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end,
83✔
414
            KeepFiles = lists:map(FileFun, list()),
83✔
415
            case AllFiles -- ["zip" | KeepFiles] of
83✔
416
                [] ->
417
                    ok;
74✔
418
                DeleteFiles ->
419
                    DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end,
9✔
420
                    lists:foreach(DelFun, DeleteFiles)
9✔
421
            end;
422
        _ ->
423
            ok
40✔
424
    end.
425

426
classify_by_time(Traces, Now) ->
427
    classify_by_time(Traces, Now, [], [], []).
123✔
428

429
classify_by_time([], _Now, Wait, Run, Finish) ->
430
    {Wait, Run, Finish};
123✔
431
classify_by_time(
432
    [Trace = #?TRACE{start_at = Start} | Traces],
433
    Now,
434
    Wait,
435
    Run,
436
    Finish
437
) when Start > Now ->
438
    classify_by_time(Traces, Now, [Trace | Wait], Run, Finish);
3✔
439
classify_by_time(
440
    [Trace = #?TRACE{end_at = End} | Traces],
441
    Now,
442
    Wait,
443
    Run,
444
    Finish
445
) when End =< Now ->
446
    classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]);
2✔
447
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
448
    classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
1,030✔
449

450
to_trace(TraceParam) ->
451
    case to_trace(ensure_map(TraceParam), #?TRACE{}) of
57✔
452
        {error, Reason} ->
453
            {error, Reason};
6✔
454
        {ok, #?TRACE{name = undefined}} ->
455
            {error, "name required"};
1✔
456
        {ok, #?TRACE{type = undefined}} ->
457
            {error, "type=[topic,clientid,ip_address] required"};
2✔
458
        {ok, TraceRec0 = #?TRACE{}} ->
459
            case fill_default(TraceRec0) of
48✔
460
                #?TRACE{start_at = Start, end_at = End} when End =< Start ->
461
                    {error, "failed by start_at >= end_at"};
1✔
462
                TraceRec ->
463
                    {ok, TraceRec}
47✔
464
            end
465
    end.
466

467
ensure_map(#{} = Trace) ->
468
    maps:fold(
4✔
469
        fun
470
            (K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
×
471
            (K, V, Acc) when is_atom(K) -> Acc#{K => V}
20✔
472
        end,
473
        #{},
474
        Trace
475
    );
476
ensure_map(Trace) when is_list(Trace) ->
477
    lists:foldl(
53✔
478
        fun
479
            ({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
83✔
480
            ({K, V}, Acc) when is_atom(K) -> Acc#{K => V};
90✔
481
            (_, Acc) -> Acc
×
482
        end,
483
        #{},
484
        Trace
485
    ).
486

487
fill_default(Trace = #?TRACE{start_at = undefined}) ->
488
    fill_default(Trace#?TRACE{start_at = now_second()});
35✔
489
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
490
    fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
39✔
491
fill_default(Trace) ->
492
    Trace.
48✔
493

494
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
495

496
to_trace(#{name := Name} = Trace, Rec) ->
497
    case re:run(Name, ?NAME_RE) of
56✔
498
        nomatch -> {error, "Name should be " ?NAME_RE};
1✔
499
        _ -> to_trace(maps:remove(name, Trace), Rec#?TRACE{name = Name})
55✔
500
    end;
501
to_trace(#{type := clientid, clientid := Filter} = Trace, Rec) ->
502
    Trace0 = maps:without([type, clientid], Trace),
7✔
503
    to_trace(Trace0, Rec#?TRACE{type = clientid, filter = Filter});
7✔
504
to_trace(#{type := topic, topic := Filter} = Trace, Rec) ->
505
    case validate_topic(Filter) of
43✔
506
        ok ->
507
            Trace0 = maps:without([type, topic], Trace),
42✔
508
            to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter});
42✔
509
        Error ->
510
            Error
1✔
511
    end;
512
to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
513
    case validate_ip_address(Filter) of
2✔
514
        ok ->
515
            Trace0 = maps:without([type, ip_address], Trace),
1✔
516
            to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)});
1✔
517
        Error ->
518
            Error
1✔
519
    end;
520
to_trace(#{type := Type}, _Rec) ->
521
    {error, io_lib:format("required ~s field", [Type])};
1✔
522
to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) ->
523
    to_trace(maps:remove(payload_encode, Trace), Rec#?TRACE{payload_encode = PayloadEncode});
×
524
to_trace(#{start_at := StartAt} = Trace, Rec) ->
525
    {ok, Sec} = to_system_second(StartAt),
15✔
526
    to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
15✔
527
to_trace(#{end_at := EndAt} = Trace, Rec) ->
528
    Now = now_second(),
11✔
529
    case to_system_second(EndAt) of
11✔
530
        {ok, Sec} when Sec > Now ->
531
            to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec});
9✔
532
        {ok, _Sec} ->
533
            {error, "end_at time has already passed"}
2✔
534
    end;
535
to_trace(_, Rec) ->
536
    {ok, Rec}.
51✔
537

538
validate_topic(TopicName) ->
539
    try emqx_topic:validate(filter, TopicName) of
43✔
540
        true -> ok
42✔
541
    catch
542
        error:Error ->
543
            {error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
1✔
544
    end.
545
validate_ip_address(IP) ->
546
    case inet:parse_address(binary_to_list(IP)) of
2✔
547
        {ok, _} -> ok;
1✔
548
        {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
1✔
549
    end.
550

551
to_system_second(Sec) ->
552
    {ok, erlang:max(now_second(), Sec)}.
26✔
553

554
zip_dir() ->
555
    filename:join([trace_dir(), "zip"]).
48✔
556

557
trace_dir() ->
558
    filename:join(emqx:data_dir(), "trace").
293✔
559

560
log_file(Name, Start) ->
561
    filename:join(trace_dir(), filename(Name, Start)).
89✔
562

563
filename(Name, Start) ->
564
    [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
1,169✔
565
    lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
1,169✔
566

567
transaction(Fun, Args) ->
568
    case mria:transaction(?COMMON_SHARD, Fun, Args) of
64✔
569
        {atomic, Res} -> Res;
58✔
570
        {aborted, Reason} -> {error, Reason}
6✔
571
    end.
572

573
update_trace_handler() ->
574
    case emqx_trace_handler:running() of
142✔
575
        [] ->
576
            persistent_term:erase(?TRACE_FILTER);
73✔
577
        Running ->
578
            List = lists:map(
69✔
579
                fun(
580
                    #{
581
                        id := Id,
582
                        filter_fun := FilterFun,
583
                        filter := Filter,
584
                        name := Name
585
                    }
586
                ) ->
587
                    {Id, FilterFun, Filter, Name}
1,038✔
588
                end,
589
                Running
590
            ),
591
            case List =/= persistent_term:get(?TRACE_FILTER, undefined) of
69✔
592
                true -> persistent_term:put(?TRACE_FILTER, List);
26✔
593
                false -> ok
43✔
594
            end
595
    end.
596

597
filter_cli_handler(Names) ->
598
    lists:filter(
123✔
599
        fun(Name) ->
600
            nomatch =:= re:run(Name, "^CLI-+.", [])
1,081✔
601
        end,
602
        Names
603
    ).
604

605
now_second() ->
606
    os:system_time(second).
200✔
607

608
maybe_migrate_trace(Fields) ->
609
    case mnesia:table_info(emqx_trace, attributes) =:= Fields of
29✔
610
        true ->
611
            ok;
28✔
612
        false ->
613
            TransFun = fun(Trace) ->
1✔
614
                case Trace of
2✔
615
                    {?TRACE, Name, Type, Filter, Enable, StartAt, EndAt} ->
616
                        #?TRACE{
1✔
617
                            name = Name,
618
                            type = Type,
619
                            filter = Filter,
620
                            enable = Enable,
621
                            start_at = StartAt,
622
                            end_at = EndAt,
623
                            payload_encode = text,
624
                            extra = #{}
625
                        };
626
                    #?TRACE{} ->
627
                        Trace
1✔
628
                end
629
            end,
630
            {atomic, ok} = mnesia:transform_table(?TRACE, TransFun, Fields, ?TRACE),
1✔
631
            ok
1✔
632
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc