• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.02
/apps/emqx/src/emqx_cm.erl
1
%%-------------------------------------------------------------------
2
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% Channel Manager
18
-module(emqx_cm).
19

20
-behaviour(gen_server).
21

22
-include("emqx_cm.hrl").
23
-include("logger.hrl").
24
-include("types.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26
-include_lib("stdlib/include/qlc.hrl").
27
-include_lib("stdlib/include/ms_transform.hrl").
28

29
-export([start_link/0]).
30

31
-export([
32
    register_channel/3,
33
    unregister_channel/1,
34
    insert_channel_info/3
35
]).
36

37
-export([
38
    get_chan_info/1,
39
    get_chan_info/2,
40
    set_chan_info/2
41
]).
42

43
-export([
44
    get_chan_stats/1,
45
    get_chan_stats/2,
46
    set_chan_stats/2
47
]).
48

49
-export([
50
    open_session/4,
51
    discard_session/1,
52
    discard_session/2,
53
    takeover_session_begin/1,
54
    takeover_session_end/1,
55
    kick_session/1,
56
    kick_session/2,
57
    takeover_kick/1
58
]).
59

60
-export([
61
    lookup_channels/1,
62
    lookup_channels/2,
63
    lookup_client/1,
64
    pick_channel/1
65
]).
66

67
%% Test/debug interface
68
-export([
69
    all_channels/0,
70
    all_client_ids/0
71
]).
72

73
%% Client management
74
-export([
75
    all_channels_table/1,
76
    live_connection_table/1
77
]).
78

79
%% gen_server callbacks
80
-export([
81
    init/1,
82
    handle_call/3,
83
    handle_cast/2,
84
    handle_info/2,
85
    terminate/2,
86
    code_change/3
87
]).
88

89
%% Internal export
90
-export([
91
    stats_fun/0,
92
    clean_down/1,
93
    mark_channel_connected/1,
94
    mark_channel_disconnected/1,
95
    is_channel_connected/1,
96
    get_connected_client_count/0
97
]).
98

99
%% RPC targets
100
-export([
101
    takeover_session/2,
102
    takeover_finish/2,
103
    do_kick_session/3,
104
    do_takeover_kick_session_v3/2,
105
    do_get_chan_info/2,
106
    do_get_chan_stats/2,
107
    do_get_chann_conn_mod/2
108
]).
109

110
-export_type([
111
    channel_info/0,
112
    chan_pid/0
113
]).
114

115
-type message() :: emqx_types:message().
116

117
-type chan_pid() :: pid().
118

119
-type channel_info() :: {
120
    _Chan :: {emqx_types:clientid(), pid()},
121
    _Info :: emqx_types:infos(),
122
    _Stats :: emqx_types:stats()
123
}.
124

125
-type takeover_state() :: {_ConnMod :: module(), _ChanPid :: pid()}.
126

127
-define(BPAPI_NAME, emqx_cm).
128

129
-define(CHAN_STATS, [
130
    {?CHAN_TAB, 'channels.count', 'channels.max'},
131
    {?CHAN_TAB, 'sessions.count', 'sessions.max'},
132
    {?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
133
    {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
134
    {?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
135
]).
136

137
%% Batch drain
138
-define(BATCH_SIZE, 100000).
139

140
%% Server name
141
-define(CM, ?MODULE).
142

143
-define(IS_CLIENTID(CLIENTID),
144
    (is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
145
).
146

147
%% linting overrides
148
-elvis([
149
    {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
150
    {elvis_style, god_modules, #{ignore => [emqx_cm]}}
151
]).
152

153
%% @doc Start the channel manager.
154
-spec start_link() -> startlink_ret().
155
start_link() ->
156
    gen_server:start_link({local, ?CM}, ?MODULE, [], []).
495✔
157

158
%%--------------------------------------------------------------------
159
%% API
160
%%--------------------------------------------------------------------
161

162
%% @doc Insert/Update the channel info and stats to emqx_channel table
163
-spec insert_channel_info(
164
    emqx_types:clientid(),
165
    emqx_types:infos(),
166
    emqx_types:stats()
167
) -> ok.
168
insert_channel_info(ClientId, Info, Stats) when ?IS_CLIENTID(ClientId) ->
169
    Chan = {ClientId, self()},
32,058✔
170
    true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
32,058✔
171
    ?tp(debug, insert_channel_info, #{clientid => ClientId}),
32,058✔
172
    ok.
32,058✔
173

174
%% @private
175
%% @doc Register a channel with pid and conn_mod.
176
%%
177
%% There is a Race-Condition on one node or cluster when many connections
178
%% login to Broker with the same clientid. We should register it and save
179
%% the conn_mod first for taking up the clientid access right.
180
%%
181
%% Note that: It should be called on a lock transaction
182
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when
183
    is_pid(ChanPid) andalso ?IS_CLIENTID(ClientId)
184
->
185
    Chan = {ClientId, ChanPid},
8,932✔
186
    %% cast (for process monitor) before inserting ets tables
187
    cast({registered, Chan}),
8,932✔
188
    true = ets:insert(?CHAN_TAB, Chan),
8,932✔
189
    true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
8,932✔
190
    ok = emqx_cm_registry:register_channel(Chan),
8,932✔
191
    mark_channel_connected(ChanPid),
8,932✔
192
    ok.
8,932✔
193

194
%% @doc Unregister a channel.
195
-spec unregister_channel(emqx_types:clientid()) -> ok.
196
unregister_channel(ClientId) when ?IS_CLIENTID(ClientId) ->
197
    true = do_unregister_channel({ClientId, self()}),
8✔
198
    ok.
8✔
199

200
%% @private
201
do_unregister_channel({_ClientId, ChanPid} = Chan) ->
202
    ok = emqx_cm_registry:unregister_channel(Chan),
6,776✔
203
    true = ets:delete(?CHAN_CONN_TAB, Chan),
6,776✔
204
    true = ets:delete(?CHAN_INFO_TAB, Chan),
6,776✔
205
    ets:delete_object(?CHAN_TAB, Chan),
6,776✔
206
    ok = emqx_hooks:run('cm.channel.unregistered', [ChanPid]),
6,776✔
207
    true.
6,776✔
208

209
%% @doc Get info of a channel.
210
-spec get_chan_info(emqx_types:clientid()) -> option(emqx_types:infos()).
211
get_chan_info(ClientId) ->
212
    with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
18✔
213

214
-spec do_get_chan_info(emqx_types:clientid(), chan_pid()) ->
215
    option(emqx_types:infos()).
216
do_get_chan_info(ClientId, ChanPid) ->
217
    Chan = {ClientId, ChanPid},
19✔
218
    try
19✔
219
        ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
19✔
220
    catch
221
        error:badarg -> undefined
×
222
    end.
223

224
-spec get_chan_info(emqx_types:clientid(), chan_pid()) ->
225
    option(emqx_types:infos()).
226
get_chan_info(ClientId, ChanPid) ->
227
    wrap_rpc(emqx_cm_proto_v2:get_chan_info(ClientId, ChanPid)).
19✔
228

229
%% @doc Update infos of the channel.
230
-spec set_chan_info(emqx_types:clientid(), emqx_types:channel_attrs()) -> boolean().
231
set_chan_info(ClientId, Info) when ?IS_CLIENTID(ClientId) ->
232
    Chan = {ClientId, self()},
2,434✔
233
    try
2,434✔
234
        ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info})
2,434✔
235
    catch
236
        error:badarg -> false
×
237
    end.
238

239
%% @doc Get channel's stats.
240
-spec get_chan_stats(emqx_types:clientid()) -> option(emqx_types:stats()).
241
get_chan_stats(ClientId) ->
242
    with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
4✔
243

244
-spec do_get_chan_stats(emqx_types:clientid(), chan_pid()) ->
245
    option(emqx_types:stats()).
246
do_get_chan_stats(ClientId, ChanPid) ->
247
    Chan = {ClientId, ChanPid},
5✔
248
    try
5✔
249
        ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
5✔
250
    catch
251
        error:badarg -> undefined
×
252
    end.
253

254
-spec get_chan_stats(emqx_types:clientid(), chan_pid()) ->
255
    option(emqx_types:stats()).
256
get_chan_stats(ClientId, ChanPid) ->
257
    wrap_rpc(emqx_cm_proto_v2:get_chan_stats(ClientId, ChanPid)).
5✔
258

259
%% @doc Set channel's stats.
260
-spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean().
261
set_chan_stats(ClientId, Stats) when ?IS_CLIENTID(ClientId) ->
262
    set_chan_stats(ClientId, self(), Stats).
118✔
263

264
-spec set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) ->
265
    boolean().
266
set_chan_stats(ClientId, ChanPid, Stats) when ?IS_CLIENTID(ClientId) ->
267
    Chan = {ClientId, ChanPid},
118✔
268
    try
118✔
269
        ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats})
118✔
270
    catch
271
        error:badarg -> false
×
272
    end.
273

274
%% @doc Open a session.
275
-spec open_session(
276
    _CleanStart :: boolean(),
277
    emqx_types:clientinfo(),
278
    emqx_types:conninfo(),
279
    emqx_maybe:t(message())
280
) ->
281
    {ok, #{
282
        session := emqx_session:t(),
283
        present := boolean(),
284
        replay => _ReplayContext
285
    }}
286
    | {error, Reason :: term()}.
287
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
288
    Self = self(),
8,275✔
289
    emqx_cm_locker:trans(ClientId, fun(_) ->
8,275✔
290
        ok = discard_session(ClientId),
6,333✔
291
        ok = emqx_session:destroy(ClientInfo, ConnInfo),
6,333✔
292
        create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, Self)
6,333✔
293
    end);
294
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg) ->
295
    Self = self(),
2,531✔
296
    emqx_cm_locker:trans(ClientId, fun(_) ->
2,531✔
297
        case emqx_session:open(ClientInfo, ConnInfo, MaybeWillMsg) of
2,531✔
298
            {true, Session, ReplayContext} ->
299
                ok = register_channel(ClientId, Self, ConnInfo),
333✔
300
                {ok, #{session => Session, present => true, replay => ReplayContext}};
333✔
301
            {false, Session} ->
302
                ok = register_channel(ClientId, Self, ConnInfo),
2,198✔
303
                {ok, #{session => Session, present => false}}
2,198✔
304
        end
305
    end).
306

307
create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) ->
308
    Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg),
6,333✔
309
    ok = register_channel(ClientId, ChanPid, ConnInfo),
6,333✔
310
    {ok, #{session => Session, present => false}}.
6,333✔
311

312
%% @doc Try to takeover a session from existing channel.
313
-spec takeover_session_begin(emqx_types:clientid()) ->
314
    {ok, emqx_session_mem:session(), takeover_state()} | none.
315
takeover_session_begin(ClientId) ->
316
    takeover_session_begin(ClientId, pick_channel(ClientId)).
2,466✔
317

318
takeover_session_begin(ClientId, ChanPid) when is_pid(ChanPid) ->
319
    case takeover_session(ClientId, ChanPid) of
295✔
320
        {living, ConnMod, ChanPid, Session} ->
321
            {ok, Session, {ConnMod, ChanPid}};
284✔
322
        _ ->
323
            none
11✔
324
    end;
325
takeover_session_begin(_ClientId, undefined) ->
326
    none.
2,171✔
327

328
%% @doc Conclude the session takeover process.
329
-spec takeover_session_end(takeover_state()) ->
330
    {ok, _ReplayContext} | {error, _Reason}.
331
takeover_session_end({ConnMod, ChanPid}) ->
332
    case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of
284✔
333
        {ok, Pendings} ->
334
            {ok, Pendings};
284✔
335
        {error, _} = Error ->
336
            Error
×
337
    end.
338

339
-spec pick_channel(emqx_types:clientid()) ->
340
    option(pid()).
341
pick_channel(ClientId) ->
342
    case lookup_channels(ClientId) of
2,466✔
343
        [] ->
344
            undefined;
2,171✔
345
        [ChanPid] ->
346
            ChanPid;
287✔
347
        ChanPids ->
348
            [ChanPid | StalePids] = lists:reverse(ChanPids),
8✔
349
            ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
8✔
350
            lists:foreach(
8✔
351
                fun(StalePid) ->
352
                    catch discard_session(ClientId, StalePid)
8✔
353
                end,
354
                StalePids
355
            ),
356
            ChanPid
8✔
357
    end.
358

359
%% Used by `emqx_persistent_session_ds'
360
-spec takeover_kick(emqx_types:clientid()) -> ok.
361
takeover_kick(ClientId) ->
362
    case lookup_channels(ClientId) of
86✔
363
        [] ->
364
            ok;
73✔
365
        ChanPids ->
366
            lists:foreach(
13✔
367
                fun(Pid) ->
368
                    do_takeover_session(ClientId, Pid)
13✔
369
                end,
370
                ChanPids
371
            )
372
    end.
373

374
%% Used by `emqx_persistent_session_ds'.
375
%% We stop any running channels with reason `takenover' so that correct reason codes and
376
%% will message processing may take place.  For older BPAPI nodes, we don't have much
377
%% choice other than calling the old `discard_session' code.
378
do_takeover_session(ClientId, Pid) ->
379
    Node = node(Pid),
13✔
380
    case emqx_bpapi:supported_version(Node, ?BPAPI_NAME) of
13✔
381
        undefined ->
382
            %% Race: node (re)starting? Assume v2.
383
            discard_session(ClientId, Pid);
×
384
        Vsn when Vsn =< 2 ->
385
            discard_session(ClientId, Pid);
×
386
        _Vsn ->
387
            takeover_kick_session(ClientId, Pid)
13✔
388
    end.
389

390
%% Used only by `emqx_session_mem'
391
takeover_finish(ConnMod, ChanPid) ->
392
    request_stepdown(
284✔
393
        {takeover, 'end'},
394
        ConnMod,
395
        ChanPid
396
    ).
397

398
%% @doc RPC Target @ emqx_cm_proto_v2:takeover_session/2
399
%% Used only by `emqx_session_mem'
400
takeover_session(ClientId, Pid) ->
401
    try
498✔
402
        do_takeover_begin(ClientId, Pid)
498✔
403
    catch
404
        _:R when
405
            R == noproc;
406
            R == timeout;
407
            %% request_stepdown/3
408
            R == unexpected_exception
409
        ->
410
            none;
11✔
411
        % rpc_call/3
412
        _:{'EXIT', {noproc, _}} ->
413
            none
×
414
    end.
415

416
do_takeover_begin(ClientId, ChanPid) when node(ChanPid) == node() ->
417
    case do_get_chann_conn_mod(ClientId, ChanPid) of
295✔
418
        undefined ->
419
            none;
×
420
        ConnMod when is_atom(ConnMod) ->
421
            case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
295✔
422
                {ok, Session} ->
423
                    {living, ConnMod, ChanPid, Session};
284✔
424
                {error, Reason} ->
425
                    error(Reason)
11✔
426
            end
427
    end;
428
do_takeover_begin(ClientId, ChanPid) ->
429
    case wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)) of
203✔
430
        %% NOTE: v5.3.0
431
        {living, ConnMod, Session} ->
432
            {living, ConnMod, ChanPid, Session};
×
433
        %% NOTE: other versions
434
        Res ->
435
            Res
203✔
436
    end.
437

438
%% @doc Discard all the sessions identified by the ClientId.
439
-spec discard_session(emqx_types:clientid()) -> ok.
440
discard_session(ClientId) when is_binary(ClientId) ->
441
    case lookup_channels(ClientId) of
6,429✔
442
        [] -> ok;
6,114✔
443
        ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
315✔
444
    end.
445

446
%% @private Kick a local stale session to force it step down.
447
%% If failed to kick (e.g. timeout) force a kill.
448
%% Keeping the stale pid around, or returning error or raise an exception
449
%% benefits nobody.
450
-spec request_stepdown(Action, module(), pid()) ->
451
    ok
452
    | {ok, emqx_session:t() | _ReplayContext}
453
    | {error, term()}
454
when
455
    Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
456
request_stepdown(Action, ConnMod, Pid) ->
457
    Timeout =
938✔
458
        case Action == kick orelse Action == discard of
938✔
459
            true -> ?T_KICK;
346✔
460
            _ -> ?T_TAKEOVER
592✔
461
        end,
462
    Return =
938✔
463
        %% this is essentially a gen_server:call implemented in emqx_connection
464
        %% and emqx_ws_connection.
465
        %% the handle_call is implemented in emqx_channel
466
        try apply(ConnMod, call, [Pid, Action, Timeout]) of
938✔
467
            ok -> ok;
300✔
468
            Reply -> {ok, Reply}
568✔
469
        catch
470
            % emqx_ws_connection: call
471
            _:noproc ->
472
                ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
1✔
473
                {error, noproc};
1✔
474
            % emqx_connection: gen_server:call
475
            _:{noproc, _} ->
476
                ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
22✔
477
                {error, noproc};
22✔
478
            _:{shutdown, _} ->
479
                ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
8✔
480
                {error, noproc};
8✔
481
            _:{{shutdown, _}, _} ->
482
                ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
25✔
483
                {error, noproc};
25✔
484
            _:{timeout, {gen_server, call, _}} ->
485
                ?tp(
8✔
486
                    warning,
487
                    "session_stepdown_request_timeout",
488
                    #{stale_pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}
489
                ),
490
                ok = force_kill(Pid),
8✔
491
                {error, timeout};
8✔
492
            _:Error:St ->
493
                ?tp(
6✔
494
                    error,
495
                    "session_stepdown_request_exception",
496
                    #{
497
                        stale_pid => Pid,
498
                        action => Action,
499
                        reason => Error,
500
                        stacktrace => St,
501
                        stale_channel => stale_channel_info(Pid)
502
                    }
503
                ),
504
                ok = force_kill(Pid),
6✔
505
                {error, unexpected_exception}
6✔
506
        end,
507
    case Action == kick orelse Action == discard of
938✔
508
        true -> ok;
346✔
509
        _ -> Return
592✔
510
    end.
511

512
force_kill(Pid) ->
513
    exit(Pid, kill),
14✔
514
    ok.
14✔
515

516
stale_channel_info(Pid) ->
517
    process_info(Pid, [status, message_queue_len, current_stacktrace]).
14✔
518

519
discard_session(ClientId, ChanPid) ->
520
    kick_session(discard, ClientId, ChanPid).
328✔
521

522
kick_session(ClientId, ChanPid) ->
523
    kick_session(kick, ClientId, ChanPid).
19✔
524

525
%% @doc RPC Target @ emqx_cm_proto_v2:kick_session/3
526
-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok.
527
do_kick_session(Action, ClientId, ChanPid) when node(ChanPid) =:= node() ->
528
    case do_get_chann_conn_mod(ClientId, ChanPid) of
347✔
529
        undefined ->
530
            %% already deregistered
531
            ok;
1✔
532
        ConnMod when is_atom(ConnMod) ->
533
            ok = request_stepdown(Action, ConnMod, ChanPid)
346✔
534
    end.
535

536
%% @doc RPC Target for emqx_cm_proto_v3:takeover_kick_session/3
537
-spec do_takeover_kick_session_v3(emqx_types:clientid(), chan_pid()) -> ok.
538
do_takeover_kick_session_v3(ClientId, ChanPid) when node(ChanPid) =:= node() ->
539
    case do_get_chann_conn_mod(ClientId, ChanPid) of
13✔
540
        undefined ->
541
            %% already deregistered
542
            ok;
×
543
        ConnMod when is_atom(ConnMod) ->
544
            ok = request_stepdown(takeover_kick, ConnMod, ChanPid)
13✔
545
    end.
546

547
%% @private This function is shared for session `kick' and `discard' (as the first arg
548
%% Action).
549
kick_session(Action, ClientId, ChanPid) ->
550
    try
347✔
551
        wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid))
347✔
552
    catch
553
        Error:Reason ->
554
            %% This should mostly be RPC failures.
555
            %% However, if the node is still running the old version
556
            %% code (prior to emqx app 4.3.10) some of the RPC handler
557
            %% exceptions may get propagated to a new version node
558
            ?SLOG(
×
559
                error,
×
560
                #{
561
                    msg => "failed_to_kick_session_on_remote_node",
562
                    node => node(ChanPid),
563
                    action => Action,
564
                    error => Error,
565
                    reason => Reason
566
                },
567
                #{clientid => ClientId}
×
568
            )
569
    end.
570

571
takeover_kick_session(ClientId, ChanPid) ->
572
    try
13✔
573
        wrap_rpc(emqx_cm_proto_v3:takeover_kick_session(ClientId, ChanPid))
13✔
574
    catch
575
        Error:Reason ->
576
            %% This should mostly be RPC failures.
577
            %% However, if the node is still running the old version
578
            %% code (prior to emqx app 4.3.10) some of the RPC handler
579
            %% exceptions may get propagated to a new version node
580
            ?SLOG(
3✔
581
                error,
3✔
582
                #{
583
                    msg => "failed_to_kick_session_on_remote_node",
584
                    node => node(ChanPid),
585
                    action => takeover,
586
                    error => Error,
587
                    reason => Reason
588
                },
589
                #{clientid => ClientId}
×
590
            )
591
    end.
592

593
kick_session(ClientId) ->
594
    case lookup_channels(ClientId) of
20✔
595
        [] ->
596
            ?SLOG(
8✔
597
                warning,
8✔
598
                #{msg => "kicked_an_unknown_session"},
599
                #{clientid => ClientId}
×
600
            ),
601
            ok;
8✔
602
        ChanPids ->
603
            case length(ChanPids) > 1 of
12✔
604
                true ->
605
                    ?SLOG(
4✔
606
                        warning,
4✔
607
                        #{
608
                            msg => "more_than_one_channel_found",
609
                            chan_pids => ChanPids
610
                        },
611
                        #{clientid => ClientId}
×
612
                    );
613
                false ->
614
                    ok
8✔
615
            end,
616
            lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
12✔
617
    end.
618

619
%% @doc Is clean start?
620
% is_clean_start(#{clean_start := false}) -> false;
621
% is_clean_start(_Attrs) -> true.
622

623
with_channel(ClientId, Fun) ->
624
    case lookup_channels(ClientId) of
22✔
625
        [] -> undefined;
2✔
626
        [Pid] -> Fun(Pid);
20✔
627
        Pids -> Fun(lists:last(Pids))
×
628
    end.
629

630
%% @doc Get all registered channel pids. Debug/test interface
631
all_channels() ->
632
    Pat = [{{'_', '$1'}, [], ['$1']}],
12✔
633
    ets:select(?CHAN_TAB, Pat).
12✔
634

635
%% @doc Get clientinfo for all clients
636
all_channels_table(ConnModuleList) ->
637
    Ms = ets:fun2ms(
175✔
638
        fun({{ClientId, _ChanPid}, Info, _Stats}) ->
639
            {ClientId, Info}
640
        end
641
    ),
642
    Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
175✔
643
    ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
175✔
644
    qlc:q([
175✔
645
        {ClientId, ConnState, ConnInfo, ClientInfo}
646
     || {ClientId, #{
647
            conn_state := ConnState,
648
            clientinfo := ClientInfo,
649
            conninfo := #{conn_mod := ConnModule} = ConnInfo
650
        }} <-
651
            Table,
175✔
652
        sets:is_element(ConnModule, ConnModules)
653
    ]).
654

655
%% @doc Get all local connection query handle
656
live_connection_table(ConnModules) ->
657
    Ms = lists:map(fun live_connection_ms/1, ConnModules),
212✔
658
    Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
212✔
659
    qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ChanPid)]).
212✔
660

661
live_connection_ms(ConnModule) ->
662
    {{{'$1', '$2'}, ConnModule}, [], [{{'$1', '$2'}}]}.
848✔
663

664
is_channel_connected(ChanPid) when node(ChanPid) =:= node() ->
665
    ets:member(?CHAN_LIVE_TAB, ChanPid);
36,381✔
666
is_channel_connected(_ChanPid) ->
667
    false.
×
668

669
%% @doc Get all registered clientIDs. Debug/test interface
670
all_client_ids() ->
671
    Pat = [{{'$1', '_'}, [], ['$1']}],
×
672
    ets:select(?CHAN_TAB, Pat).
×
673

674
%% @doc Lookup channels.
675
-spec lookup_channels(emqx_types:clientid()) -> list(chan_pid()).
676
lookup_channels(ClientId) ->
677
    lookup_channels(global, ClientId).
9,272✔
678

679
%% @doc Lookup local or global channels.
680
-spec lookup_channels(local | global, emqx_types:clientid()) -> list(chan_pid()).
681
lookup_channels(global, ClientId) ->
682
    case emqx_cm_registry:is_enabled() of
9,272✔
683
        true ->
684
            emqx_cm_registry:lookup_channels(ClientId);
9,272✔
685
        false ->
686
            lookup_channels(local, ClientId)
×
687
    end;
688
lookup_channels(local, ClientId) ->
689
    [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
11✔
690

691
-spec lookup_client({clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
692
    [channel_info()].
693
lookup_client({username, Username}) ->
694
    MatchSpec = [
×
695
        {{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']}
696
    ],
697
    ets:select(?CHAN_INFO_TAB, MatchSpec);
×
698
lookup_client({clientid, ClientId}) ->
699
    [
×
700
        Rec
×
701
     || Key <- ets:lookup(?CHAN_TAB, ClientId),
×
702
        Rec <- ets:lookup(?CHAN_INFO_TAB, Key)
×
703
    ].
704

705
%% @private
706
wrap_rpc(Result) ->
707
    case Result of
871✔
708
        {badrpc, Reason} ->
709
            %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
710
            %% should catch all exceptions and always return 'ok'.
711
            %% This leaves 'badrpc' only possible when there is problem
712
            %% calling the remote node.
713
            error({badrpc, Reason});
3✔
714
        Res ->
715
            Res
868✔
716
    end.
717

718
%% @private
719
cast(Msg) -> gen_server:cast(?CM, Msg).
8,932✔
720

721
%%--------------------------------------------------------------------
722
%% gen_server callbacks
723
%%--------------------------------------------------------------------
724

725
init([]) ->
726
    TabOpts = [public, {write_concurrency, true}],
495✔
727
    ok = emqx_utils_ets:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
495✔
728
    ok = emqx_utils_ets:new(?CHAN_CONN_TAB, [bag | TabOpts]),
495✔
729
    ok = emqx_utils_ets:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]),
495✔
730
    ok = emqx_utils_ets:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]),
495✔
731
    ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
495✔
732
    State = #{chan_pmon => emqx_pmon:new()},
495✔
733
    {ok, State}.
495✔
734

735
handle_call(Req, _From, State) ->
736
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
22✔
737
    {reply, ignored, State}.
22✔
738

739
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
740
    PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
8,932✔
741
    {noreply, State#{chan_pmon := PMon1}};
8,932✔
742
handle_cast(Msg, State) ->
743
    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
1✔
744
    {noreply, State}.
1✔
745

746
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
747
    ?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}),
5,496✔
748
    BatchSize = emqx:get_config([node, channel_cleanup_batch_size], ?BATCH_SIZE),
5,496✔
749
    ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
5,496✔
750
    {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
5,496✔
751
    lists:foreach(fun mark_channel_disconnected/1, ChanPids),
5,496✔
752
    ok = emqx_pool:async_submit_to_pool(
5,496✔
753
        ?CM_POOL,
754
        fun lists:foreach/2,
755
        [fun ?MODULE:clean_down/1, Items]
756
    ),
757
    {noreply, State#{chan_pmon := PMon1}};
5,496✔
758
handle_info(Info, State) ->
759
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
1✔
760

761
    {noreply, State}.
1✔
762

763
terminate(_Reason, _State) ->
764
    emqx_stats:cancel_update(chan_stats).
×
765

766
code_change(_OldVsn, State, _Extra) ->
767
    {ok, State}.
×
768

769
%%--------------------------------------------------------------------
770
%% Internal functions
771
%%--------------------------------------------------------------------
772

773
clean_down({ChanPid, ClientId}) ->
774
    try
6,768✔
775
        do_unregister_channel({ClientId, ChanPid})
6,768✔
776
    catch
777
        error:badarg -> ok
×
778
    end,
779
    ok = ?tp(debug, emqx_cm_clean_down, #{client_id => ClientId}).
6,768✔
780

781
stats_fun() ->
782
    lists:foreach(fun update_stats/1, ?CHAN_STATS).
2,662✔
783

784
update_stats({Tab, Stat, MaxStat}) ->
785
    case ets:info(Tab, size) of
13,310✔
UNCOV
786
        undefined -> ok;
×
787
        Size -> emqx_stats:setstat(Stat, MaxStat, Size)
13,310✔
788
    end.
789

790
-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) ->
791
    module() | undefined.
792
do_get_chann_conn_mod(ClientId, ChanPid) ->
793
    Chan = {ClientId, ChanPid},
655✔
794
    try
655✔
795
        [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2),
655✔
796
        ConnMod
654✔
797
    catch
798
        error:badarg -> undefined
1✔
799
    end.
800

801
mark_channel_connected(ChanPid) ->
802
    ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
8,933✔
803
    ?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
8,933✔
804
    ok.
8,933✔
805

806
mark_channel_disconnected(ChanPid) ->
807
    ?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
15,788✔
808
    ets:delete(?CHAN_LIVE_TAB, ChanPid),
15,788✔
809
    ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}),
15,788✔
810
    ok.
15,788✔
811

812
get_connected_client_count() ->
813
    case ets:info(?CHAN_LIVE_TAB, size) of
89✔
814
        undefined -> 0;
×
815
        Size -> Size
89✔
816
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc