• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8613439193

09 Apr 2024 09:25AM UTC coverage: 62.491% (-0.1%) from 62.636%
8613439193

push

github

web-flow
Merge pull request #12854 from id/0409-update-codeowners

chore: update codeowners

34606 of 55378 relevant lines covered (62.49%)

6551.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.68
/apps/emqx_management/src/emqx_mgmt.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_mgmt).
18

19
-include("emqx_mgmt.hrl").
20
-include_lib("emqx/include/emqx_cm.hrl").
21
-include_lib("emqx/include/logger.hrl").
22

23
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
24
-elvis([{elvis_style, god_modules, disable}]).
25

26
-include_lib("stdlib/include/qlc.hrl").
27

28
%% Nodes and Brokers API
29
-export([
30
    list_nodes/0,
31
    lookup_node/1,
32
    list_brokers/0,
33
    lookup_broker/1,
34
    node_info/0,
35
    node_info/1,
36
    broker_info/0,
37
    broker_info/1
38
]).
39

40
%% Metrics and Stats
41
-export([
42
    get_metrics/0,
43
    get_metrics/1,
44
    get_stats/0,
45
    get_stats/1
46
]).
47

48
%% Clients, Sessions
49
-export([
50
    lookup_client/2,
51
    lookup_client/3,
52
    kickout_client/1,
53
    kickout_clients/1,
54
    list_authz_cache/1,
55
    list_client_subscriptions/1,
56
    list_client_msgs/3,
57
    client_subscriptions/2,
58
    clean_authz_cache/1,
59
    clean_authz_cache/2,
60
    clean_authz_cache_all/0,
61
    clean_authz_cache_all/1,
62
    clean_pem_cache_all/0,
63
    clean_pem_cache_all/1,
64
    set_ratelimit_policy/2,
65
    set_quota_policy/2,
66
    set_keepalive/2,
67

68
    do_kickout_clients/1
69
]).
70

71
%% Internal exports
72
-export([lookup_running_client/2]).
73

74
%% Internal functions
75
-export([do_call_client/2]).
76

77
%% Subscriptions
78
-export([
79
    list_subscriptions/1,
80
    list_subscriptions_via_topic/2,
81
    list_subscriptions_via_topic/3,
82

83
    do_list_subscriptions/0
84
]).
85

86
%% PubSub
87
-export([
88
    subscribe/2,
89
    do_subscribe/2,
90
    publish/1,
91
    unsubscribe/2,
92
    do_unsubscribe/2,
93
    unsubscribe_batch/2,
94
    do_unsubscribe_batch/2
95
]).
96

97
%% Alarms
98
-export([
99
    get_alarms/1,
100
    get_alarms/2,
101
    deactivate/2,
102
    delete_all_deactivated_alarms/0,
103
    delete_all_deactivated_alarms/1
104
]).
105

106
%% Banned
107
-export([
108
    create_banned/1,
109
    delete_banned/1
110
]).
111

112
%% Common Table API
113
-export([
114
    default_row_limit/0,
115
    vm_stats/0,
116
    vm_stats/1
117
]).
118

119
-elvis([{elvis_style, god_modules, disable}]).
120

121
-define(maybe_log_node_errors(LogData, Errors),
122
    case Errors of
123
        [] -> ok;
124
        _ -> ?SLOG(error, (LogData)#{node_errors => Errors})
125
    end
126
).
127

128
%%--------------------------------------------------------------------
129
%% Node Info
130
%%--------------------------------------------------------------------
131

132
list_nodes() ->
133
    Running = emqx:cluster_nodes(running),
×
134
    %% all stopped core nodes
135
    Stopped = emqx:cluster_nodes(stopped),
×
136
    DownNodes = lists:map(fun stopped_node_info/1, Stopped),
×
137
    [{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes.
×
138

139
lookup_node(Node) ->
140
    [Info] = node_info([Node]),
×
141
    Info.
×
142

143
node_info() ->
144
    {UsedRatio, Total} = get_sys_memory(),
×
145
    Info = maps:from_list(emqx_vm:loads()),
×
146
    BrokerInfo = emqx_sys:info(),
×
147
    Info#{
×
148
        node => node(),
149
        otp_release => otp_rel(),
150
        memory_total => Total,
151
        memory_used => erlang:round(Total * UsedRatio),
152
        process_available => erlang:system_info(process_limit),
153
        process_used => erlang:system_info(process_count),
154

155
        max_fds => proplists:get_value(
156
            max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))
157
        ),
158
        connections => ets:info(?CHAN_TAB, size),
159
        live_connections => ets:info(?CHAN_LIVE_TAB, size),
160
        cluster_sessions => ets:info(?CHAN_REG_TAB, size),
161
        node_status => 'running',
162
        uptime => proplists:get_value(uptime, BrokerInfo),
163
        version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),
164
        edition => emqx_release:edition_longstr(),
165
        role => mria_rlog:role(),
166
        log_path => log_path(),
167
        sys_path => iolist_to_binary(code:root_dir())
168
    }.
169

170
log_path() ->
171
    RootDir = code:root_dir(),
×
172
    Configs = logger:get_handler_config(),
×
173
    case get_log_path(Configs) of
×
174
        undefined ->
175
            <<"log.file.enable is false, not logging to file.">>;
×
176
        Path ->
177
            iolist_to_binary(filename:join(RootDir, Path))
×
178
    end.
179

180
get_log_path([#{config := #{file := Path}} | _LoggerConfigs]) ->
181
    filename:dirname(Path);
×
182
get_log_path([_LoggerConfig | LoggerConfigs]) ->
183
    get_log_path(LoggerConfigs);
×
184
get_log_path([]) ->
185
    undefined.
×
186

187
get_sys_memory() ->
188
    case os:type() of
48✔
189
        {unix, linux} ->
190
            emqx_mgmt_cache:get_sys_memory();
48✔
191
        _ ->
192
            {0, 0}
×
193
    end.
194

195
node_info(Nodes) ->
196
    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)).
×
197

198
stopped_node_info(Node) ->
199
    {Node, #{node => Node, node_status => 'stopped', role => core}}.
×
200

201
%% Hide cpu stats if os_check is not supported.
202
vm_stats() ->
203
    {MemUsedRatio, MemTotal} = get_sys_memory(),
48✔
204
    cpu_stats() ++
48✔
205
        [
206
            {run_queue, vm_stats('run.queue')},
207
            {total_memory, MemTotal},
208
            {used_memory, erlang:round(MemTotal * MemUsedRatio)}
209
        ].
210

211
cpu_stats() ->
212
    case emqx_os_mon:is_os_check_supported() of
48✔
213
        false ->
214
            [];
×
215
        true ->
216
            vm_stats('cpu')
48✔
217
    end.
218

219
vm_stats('cpu') ->
220
    CpuUtilArg = [],
48✔
221
    case emqx_vm:cpu_util([CpuUtilArg]) of
48✔
222
        %% return 0.0 when `emqx_cpu_sup_worker` is not started
223
        {all, Use, Idle, _} ->
224
            NUse = floor(Use * 100) / 100,
48✔
225
            NIdle = ceil(Idle * 100) / 100,
48✔
226
            [{cpu_use, NUse}, {cpu_idle, NIdle}];
48✔
227
        _ ->
228
            [{cpu_use, 0}, {cpu_idle, 0}]
×
229
    end;
230
vm_stats('total.memory') ->
231
    {_, MemTotal} = get_sys_memory(),
×
232
    MemTotal;
×
233
vm_stats('used.memory') ->
234
    {MemUsedRatio, MemTotal} = get_sys_memory(),
×
235
    erlang:round(MemTotal * MemUsedRatio);
×
236
vm_stats('run.queue') ->
237
    erlang:statistics(run_queue).
48✔
238

239
%%--------------------------------------------------------------------
240
%% Brokers
241
%%--------------------------------------------------------------------
242

243
list_brokers() ->
244
    Running = emqx:running_nodes(),
×
245
    [{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)].
×
246

247
lookup_broker(Node) ->
248
    [Broker] = broker_info([Node]),
×
249
    Broker.
×
250

251
broker_info() ->
252
    Info = lists:foldl(fun convert_broker_info/2, #{}, emqx_sys:info()),
×
253
    Info#{node => node(), otp_release => otp_rel(), node_status => 'running'}.
×
254

255
convert_broker_info({uptime, Uptime}, M) ->
256
    M#{uptime => emqx_utils_calendar:human_readable_duration_string(Uptime)};
×
257
convert_broker_info({K, V}, M) ->
258
    M#{K => iolist_to_binary(V)}.
×
259

260
broker_info(Nodes) ->
261
    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)).
×
262

263
%%--------------------------------------------------------------------
264
%% Metrics and Stats
265
%%--------------------------------------------------------------------
266

267
get_metrics() ->
268
    nodes_info_count([get_metrics(Node) || Node <- emqx:running_nodes()]).
×
269

270
get_metrics(Node) ->
271
    unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
10✔
272

273
get_stats() ->
274
    GlobalStatsKeys =
×
275
        [
276
            'retained.count',
277
            'retained.max',
278
            'topics.count',
279
            'topics.max',
280
            'subscriptions.shared.count',
281
            'subscriptions.shared.max'
282
        ],
283
    CountStats = nodes_info_count(
×
284
        lists:foldl(
285
            fun(Node, Acc) ->
286
                case get_stats(Node) of
×
287
                    {error, _} ->
288
                        Acc;
×
289
                    Stats ->
290
                        [delete_keys(Stats, GlobalStatsKeys) | Acc]
×
291
                end
292
            end,
293
            [],
294
            emqx:running_nodes()
295
        )
296
    ),
297
    GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
×
298
    maps:merge(CountStats, GlobalStats).
×
299

300
delete_keys(List, []) ->
301
    List;
×
302
delete_keys(List, [Key | Keys]) ->
303
    delete_keys(proplists:delete(Key, List), Keys).
×
304

305
get_stats(Node) ->
306
    unwrap_rpc(emqx_proto_v1:get_stats(Node)).
5✔
307

308
nodes_info_count(PropList) ->
309
    NodeCount =
×
310
        fun({Key, Value}, Result) ->
311
            Count = maps:get(Key, Result, 0),
×
312
            Result#{Key => Count + Value}
×
313
        end,
314
    AllCount =
×
315
        fun(StatsMap, Result) ->
316
            lists:foldl(NodeCount, Result, StatsMap)
×
317
        end,
318
    lists:foldl(AllCount, #{}, PropList).
×
319

320
%%--------------------------------------------------------------------
321
%% Clients
322
%%--------------------------------------------------------------------
323

324
lookup_client({clientid, ClientId}, FormatFun) ->
325
    IsPersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
×
326
    case lookup_running_client(ClientId, FormatFun) of
×
327
        [] when IsPersistenceEnabled ->
328
            case emqx_persistent_session_ds_state:print_session(ClientId) of
×
329
                undefined -> [];
×
330
                Session -> [maybe_format(FormatFun, {ClientId, Session})]
×
331
            end;
332
        Res ->
333
            Res
×
334
    end;
335
lookup_client({username, Username}, FormatFun) ->
336
    lists:append([
×
337
        lookup_client(Node, {username, Username}, FormatFun)
×
338
     || Node <- emqx:running_nodes()
×
339
    ]).
340

341
lookup_client(Node, Key, FormatFun) ->
342
    case unwrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
×
343
        {error, Err} ->
344
            {error, Err};
×
345
        L ->
346
            lists:map(
×
347
                fun({Chan, Info0, Stats}) ->
348
                    Info = Info0#{node => Node},
×
349
                    maybe_format(FormatFun, {Chan, Info, Stats})
×
350
                end,
351
                L
352
            )
353
    end.
354

355
maybe_format(undefined, A) ->
356
    A;
×
357
maybe_format({M, F}, A) ->
358
    M:F(A).
×
359

360
kickout_client(ClientId) ->
361
    case lookup_client({clientid, ClientId}, undefined) of
×
362
        [] ->
363
            {error, not_found};
×
364
        [{ClientId, _}] ->
365
            %% Offline durable session (client ID is a plain binary
366
            %% without channel pid):
367
            emqx_persistent_session_ds:kick_offline_session(ClientId);
×
368
        _ ->
369
            Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()],
×
370
            check_results(Results)
×
371
    end.
372

373
kickout_client(Node, ClientId) ->
374
    unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
×
375

376
kickout_clients(ClientIds) when is_list(ClientIds) ->
377
    F = fun(Node) ->
1✔
378
        emqx_management_proto_v5:kickout_clients(Node, ClientIds)
1✔
379
    end,
380
    Results = lists:map(F, emqx:running_nodes()),
1✔
381
    lists:foreach(fun emqx_persistent_session_ds:kick_offline_session/1, ClientIds),
1✔
382
    case lists:filter(fun(Res) -> Res =/= ok end, Results) of
1✔
383
        [] ->
384
            ok;
1✔
385
        [Result | _] ->
386
            unwrap_rpc(Result)
×
387
    end.
388

389
do_kickout_clients(ClientIds) when is_list(ClientIds) ->
390
    F = fun(ClientId) ->
1✔
391
        ChanPids = emqx_cm:lookup_channels(local, ClientId),
3✔
392
        lists:foreach(
3✔
393
            fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end,
3✔
394
            ChanPids
395
        )
396
    end,
397
    lists:foreach(F, ClientIds).
1✔
398

399
list_authz_cache(ClientId) ->
400
    call_client(ClientId, list_authz_cache).
×
401

402
list_client_subscriptions(ClientId) ->
403
    case emqx_persistent_session_ds:list_client_subscriptions(ClientId) of
×
404
        {error, not_found} ->
405
            list_client_subscriptions_mem(ClientId);
×
406
        Result ->
407
            Result
×
408
    end.
409

410
%% List subscriptions of an in-memory session:
411
list_client_subscriptions_mem(ClientId) ->
412
    case lookup_client({clientid, ClientId}, undefined) of
×
413
        [] ->
414
            {error, not_found};
×
415
        _ ->
416
            Results = [client_subscriptions(Node, ClientId) || Node <- emqx:running_nodes()],
×
417
            Filter =
×
418
                fun
419
                    ({error, _}) ->
420
                        false;
×
421
                    ({_Node, List}) ->
422
                        erlang:is_list(List) andalso 0 < erlang:length(List)
×
423
                end,
424
            case lists:filter(Filter, Results) of
×
425
                [] -> [];
×
426
                [Result | _] -> Result
×
427
            end
428
    end.
429

430
list_client_msgs(MsgsType, ClientId, PagerParams) when
431
    MsgsType =:= inflight_msgs;
432
    MsgsType =:= mqueue_msgs
433
->
434
    call_client(ClientId, {MsgsType, PagerParams}).
×
435

436
client_subscriptions(Node, ClientId) ->
437
    {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
×
438

439
clean_authz_cache(ClientId) ->
440
    Results = [clean_authz_cache(Node, ClientId) || Node <- emqx:running_nodes()],
×
441
    check_results(Results).
×
442

443
clean_authz_cache(Node, ClientId) ->
444
    unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
×
445

446
clean_authz_cache_all() ->
447
    Results = [{Node, clean_authz_cache_all(Node)} || Node <- emqx:running_nodes()],
×
448
    wrap_results(Results).
×
449

450
clean_pem_cache_all() ->
451
    Results = [{Node, clean_pem_cache_all(Node)} || Node <- emqx:running_nodes()],
×
452
    wrap_results(Results).
×
453

454
wrap_results(Results) ->
455
    case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
×
456
        [] -> ok;
×
457
        BadNodes -> {error, BadNodes}
×
458
    end.
459

460
clean_authz_cache_all(Node) ->
461
    unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node)).
×
462

463
clean_pem_cache_all(Node) ->
464
    unwrap_rpc(emqx_proto_v1:clean_pem_cache(Node)).
×
465

466
set_ratelimit_policy(ClientId, Policy) ->
467
    call_client(ClientId, {ratelimit, Policy}).
×
468

469
set_quota_policy(ClientId, Policy) ->
470
    call_client(ClientId, {quota, Policy}).
×
471

472
set_keepalive(ClientId, Interval) when Interval >= 0 andalso Interval =< 65535 ->
473
    call_client(ClientId, {keepalive, Interval});
×
474
set_keepalive(_ClientId, _Interval) ->
475
    {error, <<"mqtt3.1.1 specification: keepalive must between 0~65535">>}.
×
476

477
%% @private
478
call_client(ClientId, Req) ->
479
    case emqx_cm_registry:is_enabled() of
×
480
        true ->
481
            do_call_client(ClientId, Req);
×
482
        false ->
483
            call_client_on_all_nodes(ClientId, Req)
×
484
    end.
485

486
call_client_on_all_nodes(ClientId, Req) ->
487
    Nodes = emqx:running_nodes(),
×
488
    Results = call_client(Nodes, ClientId, Req),
×
489
    {Expected, Errs} = lists:foldr(
×
490
        fun
491
            ({_N, {error, not_found}}, Acc) -> Acc;
×
492
            ({_N, {error, _}} = Err, {OkAcc, ErrAcc}) -> {OkAcc, [Err | ErrAcc]};
×
493
            ({_N, OkRes}, {OkAcc, ErrAcc}) -> {[OkRes | OkAcc], ErrAcc}
×
494
        end,
495
        {[], []},
496
        lists:zip(Nodes, Results)
497
    ),
498
    ?maybe_log_node_errors(#{msg => "call_client_failed", request => Req}, Errs),
×
499
    case Expected of
×
500
        [] ->
501
            case Errs of
×
502
                [] -> {error, not_found};
×
503
                [{_Node, FirstErr} | _] -> FirstErr
×
504
            end;
505
        [Result | _] ->
506
            Result
×
507
    end.
508

509
%% @private
510
-spec do_call_client(emqx_types:clientid(), term()) -> term().
511
do_call_client(ClientId, Req) ->
512
    case emqx_cm:lookup_channels(ClientId) of
×
513
        [] ->
514
            {error, not_found};
×
515
        Pids when is_list(Pids) ->
516
            Pid = lists:last(Pids),
×
517
            case emqx_cm:get_chan_info(ClientId, Pid) of
×
518
                #{conninfo := #{conn_mod := ConnMod}} ->
519
                    call_conn(ConnMod, Pid, Req);
×
520
                undefined ->
521
                    {error, not_found}
×
522
            end
523
    end.
524

525
%% @private
526
call_client(Nodes, ClientId, Req) ->
527
    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:call_client(Nodes, ClientId, Req)).
×
528

529
%%--------------------------------------------------------------------
530
%% Subscriptions
531
%%--------------------------------------------------------------------
532

533
-spec do_list_subscriptions() -> no_return().
534
do_list_subscriptions() ->
535
    %% [FIXME] Add function to `emqx_broker` that returns list of subscriptions
536
    %% and either redirect from here or bpapi directly (EMQX-8993).
537
    throw(not_implemented).
×
538

539
list_subscriptions(Node) ->
540
    unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)).
×
541

542
list_subscriptions_via_topic(Topic, FormatFun) ->
543
    lists:append([
×
544
        list_subscriptions_via_topic(Node, Topic, FormatFun)
×
545
     || Node <- emqx:running_nodes()
×
546
    ]).
547

548
list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
549
    case unwrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of
×
550
        {error, Reason} -> {error, Reason};
×
551
        Result -> M:F(Result)
×
552
    end.
553

554
%%--------------------------------------------------------------------
555
%% PubSub
556
%%--------------------------------------------------------------------
557

558
subscribe(ClientId, TopicTables) ->
559
    subscribe(emqx:running_nodes(), ClientId, TopicTables).
×
560

561
subscribe([Node | Nodes], ClientId, TopicTables) ->
562
    case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
×
563
        {error, _} -> subscribe(Nodes, ClientId, TopicTables);
×
564
        {subscribe, Res} -> {subscribe, Res, Node}
×
565
    end;
566
subscribe([], _ClientId, _TopicTables) ->
567
    {error, channel_not_found}.
×
568

569
-spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) ->
570
    {subscribe, _} | {error, atom()}.
571
do_subscribe(ClientId, TopicTables) ->
572
    case ets:lookup(?CHAN_TAB, ClientId) of
×
573
        [] -> {error, channel_not_found};
×
574
        [{_, Pid}] -> Pid ! {subscribe, TopicTables}
×
575
    end.
576

577
publish(Msg) ->
578
    emqx_metrics:inc_msg(Msg),
×
579
    emqx:publish(Msg).
×
580

581
-spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
582
    {unsubscribe, _} | {error, channel_not_found}.
583
unsubscribe(ClientId, Topic) ->
584
    unsubscribe(emqx:running_nodes(), ClientId, Topic).
×
585

586
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
587
    {unsubscribe, _} | {error, channel_not_found}.
588
unsubscribe([Node | Nodes], ClientId, Topic) ->
589
    case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of
×
590
        {error, _} -> unsubscribe(Nodes, ClientId, Topic);
×
591
        Re -> Re
×
592
    end;
593
unsubscribe([], _ClientId, _Topic) ->
594
    {error, channel_not_found}.
×
595

596
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
597
    {unsubscribe, _} | {error, _}.
598
do_unsubscribe(ClientId, Topic) ->
599
    case ets:lookup(?CHAN_TAB, ClientId) of
×
600
        [] -> {error, channel_not_found};
×
601
        [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
×
602
    end.
603

604
-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
605
    {unsubscribe, _} | {error, channel_not_found}.
606
unsubscribe_batch(ClientId, Topics) ->
607
    unsubscribe_batch(emqx:running_nodes(), ClientId, Topics).
×
608

609
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
610
    {unsubscribe_batch, _} | {error, channel_not_found}.
611
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
612
    case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of
×
613
        {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
×
614
        Re -> Re
×
615
    end;
616
unsubscribe_batch([], _ClientId, _Topics) ->
617
    {error, channel_not_found}.
×
618

619
-spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
620
    {unsubscribe_batch, _} | {error, _}.
621
do_unsubscribe_batch(ClientId, Topics) ->
622
    case ets:lookup(?CHAN_TAB, ClientId) of
×
623
        [] -> {error, channel_not_found};
×
624
        [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]}
×
625
    end.
626

627
%%--------------------------------------------------------------------
628
%% Get Alarms
629
%%--------------------------------------------------------------------
630

631
get_alarms(Type) ->
632
    [{Node, get_alarms(Node, Type)} || Node <- emqx:running_nodes()].
×
633

634
get_alarms(Node, Type) ->
635
    add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
×
636

637
deactivate(Node, Name) ->
638
    unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
×
639

640
delete_all_deactivated_alarms() ->
641
    [delete_all_deactivated_alarms(Node) || Node <- emqx:running_nodes()].
×
642

643
delete_all_deactivated_alarms(Node) ->
644
    unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
×
645

646
add_duration_field(Alarms) ->
647
    Now = erlang:system_time(microsecond),
×
648
    add_duration_field(Alarms, Now, []).
×
649

650
add_duration_field([], _Now, Acc) ->
651
    Acc;
×
652
add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
653
    add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
×
654
add_duration_field(
655
    [
656
        Alarm = #{
657
            activated := false,
658
            activate_at := ActivateAt,
659
            deactivate_at := DeactivateAt
660
        }
661
        | Rest
662
    ],
663
    Now,
664
    Acc
665
) ->
666
    add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
×
667

668
%%--------------------------------------------------------------------
669
%% Banned API
670
%%--------------------------------------------------------------------
671

672
create_banned(Banned) ->
673
    emqx_banned:create(Banned).
×
674

675
delete_banned(Who) ->
676
    emqx_banned:delete(Who).
×
677

678
%%--------------------------------------------------------------------
679
%% Internal exports
680
%%--------------------------------------------------------------------
681

682
lookup_running_client(ClientId, FormatFun) ->
683
    lists:append([
×
684
        lookup_client(Node, {clientid, ClientId}, FormatFun)
×
685
     || Node <- emqx:running_nodes()
×
686
    ]).
687

688
%%--------------------------------------------------------------------
689
%% Internal Functions.
690
%%--------------------------------------------------------------------
691

692
unwrap_rpc({badrpc, Reason}) ->
693
    {error, Reason};
×
694
unwrap_rpc(Res) ->
695
    Res.
15✔
696

697
otp_rel() ->
698
    iolist_to_binary([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
×
699

700
check_results(Results) ->
701
    case lists:any(fun(Item) -> Item =:= ok end, Results) of
×
702
        true -> ok;
×
703
        false -> unwrap_rpc(lists:last(Results))
×
704
    end.
705

706
default_row_limit() ->
707
    ?DEFAULT_ROW_LIMIT.
202✔
708

709
call_conn(ConnMod, Pid, Req) ->
710
    try
×
711
        erlang:apply(ConnMod, call, [Pid, Req])
×
712
    catch
713
        exit:R when R =:= shutdown; R =:= normal ->
714
            {error, shutdown};
×
715
        exit:{R, _} when R =:= shutdown; R =:= noproc ->
716
            {error, shutdown}
×
717
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc