• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8613439193

09 Apr 2024 09:25AM UTC coverage: 62.491% (-0.1%) from 62.636%
8613439193

push

github

web-flow
Merge pull request #12854 from id/0409-update-codeowners

chore: update codeowners

34606 of 55378 relevant lines covered (62.49%)

6551.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.87
/apps/emqx/src/emqx_connection.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% This module interacts with the transport layer of MQTT
18
%% Transport:
19
%%   - TCP connection
20
%%   - TCP/TLS connection
21
%%   - QUIC Stream
22
%%
23
%% for WebSocket @see emqx_ws_connection.erl
24
-module(emqx_connection).
25

26
-include("emqx.hrl").
27
-include("emqx_mqtt.hrl").
28
-include("logger.hrl").
29
-include("types.hrl").
30
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
31

32
-ifdef(TEST).
33
-compile(export_all).
34
-compile(nowarn_export_all).
35
-endif.
36

37
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]).
38

39
%% API
40
-export([
41
    start_link/3,
42
    stop/1
43
]).
44

45
-export([
46
    info/1,
47
    info/2,
48
    stats/1
49
]).
50

51
-export([
52
    async_set_keepalive/3,
53
    async_set_keepalive/5,
54
    async_set_socket_options/2
55
]).
56

57
-export([
58
    call/2,
59
    call/3,
60
    cast/2
61
]).
62

63
%% Callback
64
-export([init/4]).
65

66
%% Sys callbacks
67
-export([
68
    system_continue/3,
69
    system_terminate/4,
70
    system_code_change/4,
71
    system_get_state/1
72
]).
73

74
%% Internal callback
75
-export([wakeup_from_hib/2, recvloop/2, get_state/1]).
76

77
%% Export for CT
78
-export([set_field/3]).
79

80
-import(
81
    emqx_utils,
82
    [start_timer/2]
83
).
84

85
-record(state, {
86
    %% TCP/TLS Transport
87
    transport :: esockd:transport(),
88
    %% TCP/TLS Socket
89
    socket :: esockd:socket(),
90
    %% Peername of the connection
91
    peername :: emqx_types:peername(),
92
    %% Sockname of the connection
93
    sockname :: emqx_types:peername(),
94
    %% Sock State
95
    sockstate :: emqx_types:sockstate(),
96
    parse_state :: emqx_frame:parse_state(),
97
    %% Serialize options
98
    serialize :: emqx_frame:serialize_opts(),
99
    %% Channel State
100
    channel :: emqx_channel:channel(),
101
    %% GC State
102
    gc_state :: option(emqx_gc:gc_state()),
103
    %% Stats Timer
104
    stats_timer :: disabled | option(reference()),
105
    %% Idle Timeout
106
    idle_timeout :: integer() | infinity,
107
    %% Idle Timer
108
    idle_timer :: option(reference()),
109
    %% Zone name
110
    zone :: atom(),
111
    %% Listener Type and Name
112
    listener :: {Type :: atom(), Name :: atom()},
113

114
    %% Limiter
115
    limiter :: limiter(),
116

117
    %% limiter buffer for overload use
118
    limiter_buffer :: queue:queue(pending_req()),
119

120
    %% limiter timers
121
    limiter_timer :: undefined | reference(),
122

123
    %% QUIC conn owner pid if in use.
124
    quic_conn_pid :: option(pid())
125
}).
126

127
-record(retry, {
128
    types :: list(limiter_type()),
129
    data :: any(),
130
    next :: check_succ_handler()
131
}).
132

133
-record(pending_req, {
134
    need :: list({pos_integer(), limiter_type()}),
135
    data :: any(),
136
    next :: check_succ_handler()
137
}).
138

139
-type state() :: #state{}.
140
-type pending_req() :: #pending_req{}.
141

142
-define(ACTIVE_N, 100).
143

144
-define(INFO_KEYS, [
145
    socktype,
146
    peername,
147
    sockname,
148
    sockstate
149
]).
150

151
-define(SOCK_STATS, [
152
    recv_oct,
153
    recv_cnt,
154
    send_oct,
155
    send_cnt,
156
    send_pend
157
]).
158

159
-define(ENABLED(X), (X =/= undefined)).
160

161
-define(ALARM_TCP_CONGEST(Channel),
162
    list_to_binary(
163
        io_lib:format(
164
            "mqtt_conn/congested/~ts/~ts",
165
            [
166
                emqx_channel:info(clientid, Channel),
167
                emqx_channel:info(username, Channel)
168
            ]
169
        )
170
    )
171
).
172

173
-define(ALARM_CONN_INFO_KEYS, [
174
    socktype,
175
    sockname,
176
    peername,
177
    clientid,
178
    username,
179
    proto_name,
180
    proto_ver,
181
    connected_at
182
]).
183
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
184
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
185

186
-define(LIMITER_BYTES_IN, bytes).
187
-define(LIMITER_MESSAGE_IN, messages).
188

189
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
190

191
-dialyzer({no_match, [info/2]}).
192
-dialyzer(
193
    {nowarn_function, [
194
        init/4,
195
        init_state/3,
196
        run_loop/2,
197
        system_terminate/4,
198
        system_code_change/4
199
    ]}
200
).
201

202
-spec start_link
203
    (esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
204
        {ok, pid()};
205
    (
206
        emqx_quic_stream,
207
        {ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()},
208
        emqx_quic_connection:cb_state()
209
    ) ->
210
        {ok, pid()}.
211

212
start_link(Transport, Socket, Options) ->
213
    Args = [self(), Transport, Socket, Options],
8,550✔
214
    CPid = proc_lib:spawn_link(?MODULE, init, Args),
8,550✔
215
    {ok, CPid}.
8,550✔
216

217
%%--------------------------------------------------------------------
218
%% API
219
%%--------------------------------------------------------------------
220

221
%% @doc Get infos of the connection/channel.
222
-spec info(pid() | state()) -> emqx_types:infos().
223
info(CPid) when is_pid(CPid) ->
224
    call(CPid, info);
3✔
225
info(State = #state{channel = Channel}) ->
226
    ChanInfo = emqx_channel:info(Channel),
33,962✔
227
    SockInfo = maps:from_list(info(?INFO_KEYS, State)),
33,962✔
228
    ChanInfo#{sockinfo => SockInfo}.
33,962✔
229

230
-spec info([atom()] | atom() | tuple(), pid() | state()) -> term().
231
info(Keys, State) when is_list(Keys) ->
232
    [{Key, info(Key, State)} || Key <- Keys];
33,962✔
233
info(socktype, #state{transport = Transport, socket = Socket}) ->
234
    Transport:type(Socket);
33,962✔
235
info(peername, #state{peername = Peername}) ->
236
    Peername;
33,962✔
237
info(sockname, #state{sockname = Sockname}) ->
238
    Sockname;
33,962✔
239
info(sockstate, #state{sockstate = SockSt}) ->
240
    SockSt;
33,965✔
241
info(stats_timer, #state{stats_timer = StatsTimer}) ->
242
    StatsTimer;
6✔
243
info(limiter, #state{limiter = Limiter}) ->
244
    Limiter;
3✔
245
info(limiter_timer, #state{limiter_timer = Timer}) ->
246
    Timer;
1✔
247
info({channel, Info}, #state{channel = Channel}) ->
248
    emqx_channel:info(Info, Channel).
6✔
249

250
%% @doc Get stats of the connection/channel.
251
-spec stats(pid() | state()) -> emqx_types:stats().
252
stats(CPid) when is_pid(CPid) ->
253
    call(CPid, stats);
2✔
254
stats(#state{
255
    transport = Transport,
256
    socket = Socket,
257
    channel = Channel
258
}) ->
259
    SockStats =
31,666✔
260
        case Transport:getstat(Socket, ?SOCK_STATS) of
261
            {ok, Ss} -> Ss;
31,587✔
262
            {error, _} -> []
79✔
263
        end,
264
    ChanStats = emqx_channel:stats(Channel),
31,666✔
265
    ProcStats = emqx_utils:proc_stats(),
31,666✔
266
    lists:append([SockStats, ChanStats, ProcStats]).
31,666✔
267

268
%% @doc Set TCP keepalive socket options to override system defaults.
269
%% Idle: The number of seconds a connection needs to be idle before
270
%%       TCP begins sending out keep-alive probes (Linux default 7200).
271
%% Interval: The number of seconds between TCP keep-alive probes
272
%%           (Linux default 75).
273
%% Probes: The maximum number of TCP keep-alive probes to send before
274
%%         giving up and killing the connection if no response is
275
%%         obtained from the other end (Linux default 9).
276
%%
277
%% NOTE: This API sets TCP socket options, which has nothing to do with
278
%%       the MQTT layer's keepalive (PINGREQ and PINGRESP).
279
async_set_keepalive(Idle, Interval, Probes) ->
280
    async_set_keepalive(os:type(), self(), Idle, Interval, Probes).
×
281

282
async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
283
    case emqx_utils:tcp_keepalive_opts(OS, Idle, Interval, Probes) of
1✔
284
        {ok, Options} ->
285
            async_set_socket_options(Pid, Options);
1✔
286
        {error, {unsupported_os, OS}} ->
287
            ?LOG(warning, #{
×
288
                msg => "Unsupported operation: set TCP keepalive",
289
                os => OS
290
            }),
×
291
            ok
×
292
    end.
293

294
%% @doc Set custom socket options.
295
%% This API is made async because the call might be originated from
296
%% a hookpoint callback (otherwise deadlock).
297
%% If failed to set, the error message is logged.
298
async_set_socket_options(Pid, Options) ->
299
    cast(Pid, {async_set_socket_options, Options}).
1✔
300

301
cast(Pid, Req) ->
302
    gen_server:cast(Pid, Req).
1✔
303

304
call(Pid, Req) ->
305
    call(Pid, Req, infinity).
5✔
306
call(Pid, Req, Timeout) ->
307
    gen_server:call(Pid, Req, Timeout).
883✔
308

309
stop(Pid) ->
310
    gen_server:stop(Pid).
7✔
311

312
%%--------------------------------------------------------------------
313
%% callbacks
314
%%--------------------------------------------------------------------
315

316
init(Parent, Transport, RawSocket, Options) ->
317
    case Transport:wait(RawSocket) of
8,550✔
318
        {ok, Socket} ->
319
            run_loop(Parent, init_state(Transport, Socket, Options));
8,535✔
320
        {error, Reason} ->
321
            ok = Transport:fast_close(RawSocket),
14✔
322
            exit_on_sock_error(Reason)
14✔
323
    end.
324

325
init_state(
326
    Transport,
327
    Socket,
328
    #{zone := Zone, limiter := LimiterCfg, listener := Listener} = Opts
329
) ->
330
    {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
8,590✔
331
    {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
8,587✔
332
    Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
8,587✔
333
    ConnInfo = #{
8,587✔
334
        socktype => Transport:type(Socket),
335
        peername => Peername,
336
        sockname => Sockname,
337
        peercert => Peercert,
338
        conn_mod => ?MODULE
339
    },
340

341
    LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
8,587✔
342
    Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
8,587✔
343

344
    FrameOpts = #{
8,587✔
345
        strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
346
        max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
347
    },
348
    ParseState = emqx_frame:initial_parse_state(FrameOpts),
8,587✔
349
    Serialize = emqx_frame:serialize_opts(),
8,587✔
350
    %% Init Channel
351
    Channel = emqx_channel:init(ConnInfo, Opts),
8,587✔
352
    GcState =
8,587✔
353
        case emqx_config:get_zone_conf(Zone, [force_gc]) of
354
            #{enable := false} -> undefined;
×
355
            GcPolicy -> emqx_gc:init(GcPolicy)
8,587✔
356
        end,
357
    StatsTimer =
8,587✔
358
        case emqx_config:get_zone_conf(Zone, [stats, enable]) of
359
            true -> undefined;
8,587✔
360
            false -> disabled
×
361
        end,
362
    IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
8,587✔
363

364
    set_tcp_keepalive(Listener),
8,587✔
365

366
    IdleTimer = start_timer(IdleTimeout, idle_timeout),
8,587✔
367
    #state{
8,587✔
368
        transport = Transport,
369
        socket = Socket,
370
        peername = Peername,
371
        sockname = Sockname,
372
        sockstate = idle,
373
        limiter = Limiter,
374
        parse_state = ParseState,
375
        serialize = Serialize,
376
        channel = Channel,
377
        gc_state = GcState,
378
        stats_timer = StatsTimer,
379
        idle_timeout = IdleTimeout,
380
        idle_timer = IdleTimer,
381
        zone = Zone,
382
        listener = Listener,
383
        limiter_buffer = queue:new(),
384
        limiter_timer = undefined,
385
        %% for quic streams to inherit
386
        quic_conn_pid = maps:get(conn_pid, Opts, undefined)
387
    }.
388

389
run_loop(
390
    Parent,
391
    State = #state{
392
        transport = Transport,
393
        socket = Socket,
394
        peername = Peername,
395
        channel = Channel
396
    }
397
) ->
398
    emqx_logger:set_metadata_peername(esockd:format(Peername)),
8,532✔
399
    ShutdownPolicy = emqx_config:get_zone_conf(
8,532✔
400
        emqx_channel:info(zone, Channel),
401
        [force_shutdown]
402
    ),
403
    emqx_utils:tune_heap_size(ShutdownPolicy),
8,532✔
404
    case activate_socket(State) of
8,532✔
405
        {ok, NState} ->
406
            hibernate(Parent, NState);
8,528✔
407
        {error, Reason} ->
408
            ok = Transport:fast_close(Socket),
4✔
409
            exit_on_sock_error(Reason)
4✔
410
    end.
411

412
-spec exit_on_sock_error(any()) -> no_return().
413
exit_on_sock_error(Reason) when
414
    Reason =:= einval;
415
    Reason =:= enotconn;
416
    Reason =:= closed
417
->
418
    erlang:exit(normal);
10✔
419
exit_on_sock_error(timeout) ->
420
    erlang:exit({shutdown, ssl_upgrade_timeout});
1✔
421
exit_on_sock_error(Reason) ->
422
    erlang:exit({shutdown, Reason}).
7✔
423

424
%%--------------------------------------------------------------------
425
%% Recv Loop
426

427
recvloop(
428
    Parent,
429
    State = #state{
430
        idle_timeout = IdleTimeout0,
431
        zone = Zone
432
    }
433
) ->
434
    IdleTimeout =
110,139✔
435
        case IdleTimeout0 of
436
            infinity -> infinity;
×
437
            _ -> IdleTimeout0 + 100
110,139✔
438
        end,
439
    receive
110,139✔
440
        Msg ->
441
            handle_recv(Msg, Parent, State)
108,065✔
442
    after IdleTimeout ->
443
        case emqx_olp:backoff_hibernation(Zone) of
5✔
444
            true ->
445
                recvloop(Parent, State);
×
446
            false ->
447
                hibernate(Parent, cancel_stats_timer(State))
5✔
448
        end
449
    end.
450

451
handle_recv({system, From, Request}, Parent, State) ->
452
    sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
28✔
453
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
454
    %% FIXME: it's not trapping exit, should never receive an EXIT
455
    terminate(Reason, State);
×
456
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
457
    case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
108,037✔
458
        {ok, NewState} ->
459
            ?MODULE:recvloop(Parent, NewState);
101,585✔
460
        {stop, Reason, NewSate} ->
461
            terminate(Reason, NewSate)
6,449✔
462
    end.
463

464
hibernate(Parent, State) ->
465
    proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
8,533✔
466

467
%% Maybe do something here later.
468
wakeup_from_hib(Parent, State) ->
469
    ?MODULE:recvloop(Parent, State).
8,533✔
470

471
%%--------------------------------------------------------------------
472
%% Ensure/cancel stats timer
473

474
-compile({inline, [ensure_stats_timer/2]}).
475
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
476
    State#state{stats_timer = start_timer(Timeout, emit_stats)};
8,575✔
477
ensure_stats_timer(_Timeout, State) ->
478
    State.
99,464✔
479

480
-compile({inline, [cancel_stats_timer/1]}).
481
cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
482
    ?tp(debug, cancel_stats_timer, #{}),
1✔
483
    ok = emqx_utils:cancel_timer(TRef),
1✔
484
    State#state{stats_timer = undefined};
1✔
485
cancel_stats_timer(State) ->
486
    State.
6✔
487

488
%%--------------------------------------------------------------------
489
%% Process next Msg
490

491
process_msg([], State) ->
492
    {ok, State};
101,585✔
493
process_msg([Msg | More], State) ->
494
    try
237,322✔
495
        case handle_msg(Msg, State) of
237,322✔
496
            ok ->
497
                process_msg(More, State);
94,958✔
498
            {ok, NState} ->
499
                process_msg(More, NState);
39,377✔
500
            {ok, Msgs, NState} ->
501
                process_msg(append_msg(More, Msgs), NState);
96,535✔
502
            {stop, Reason, NState} ->
503
                {stop, Reason, NState};
6,441✔
504
            {stop, Reason} ->
505
                {stop, Reason, State}
×
506
        end
507
    catch
508
        exit:normal ->
509
            {stop, normal, State};
7✔
510
        exit:shutdown ->
511
            {stop, shutdown, State};
×
512
        exit:{shutdown, _} = Shutdown ->
513
            {stop, Shutdown, State};
1✔
514
        Exception:Context:Stack ->
515
            {stop,
×
516
                #{
517
                    exception => Exception,
518
                    context => Context,
519
                    stacktrace => Stack
520
                },
521
                State}
522
    end.
523

524
-compile({inline, [append_msg/2]}).
525
append_msg([], Msgs) when is_list(Msgs) ->
526
    Msgs;
79,724✔
527
append_msg([], Msg) ->
528
    [Msg];
16,661✔
529
append_msg(Q, Msgs) when is_list(Msgs) ->
530
    lists:append(Q, Msgs);
2✔
531
append_msg(Q, Msg) ->
532
    lists:append(Q, [Msg]).
152✔
533

534
%%--------------------------------------------------------------------
535
%% Handle a Msg
536
handle_msg({'$gen_call', From, Req}, State) ->
537
    case handle_call(From, Req, State) of
707✔
538
        {reply, Reply, NState} ->
539
            gen_server:reply(From, Reply),
275✔
540
            {ok, NState};
275✔
541
        {stop, Reason, Reply, NState} ->
542
            gen_server:reply(From, Reply),
432✔
543
            stop(Reason, NState)
432✔
544
    end;
545
handle_msg({'$gen_cast', Req}, State) ->
546
    NewState = handle_cast(Req, State),
1✔
547
    {ok, NewState};
1✔
548
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
549
    Oct = iolist_size(Data),
43,101✔
550
    inc_counter(incoming_bytes, Oct),
43,101✔
551
    ok = emqx_metrics:inc('bytes.received', Oct),
43,101✔
552
    when_bytes_in(Oct, Data, State);
43,101✔
553
handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
554
    inc_counter(incoming_bytes, Len),
3,485✔
555
    ok = emqx_metrics:inc('bytes.received', Len),
3,485✔
556
    when_bytes_in(Len, Data, State);
3,485✔
557
handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
558
    case queue:peek(Buffer) of
1✔
559
        empty ->
560
            handle_info(activate_socket, State);
1✔
561
        {value, #pending_req{need = Needs, data = Data, next = Next}} ->
562
            State2 = State#state{limiter_buffer = queue:drop(Buffer)},
×
563
            check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)
×
564
    end;
565
handle_msg(
566
    {incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
567
    State = #state{idle_timer = IdleTimer}
568
) ->
569
    ok = emqx_utils:cancel_timer(IdleTimer),
8,519✔
570
    Serialize = emqx_frame:serialize_opts(ConnPkt),
8,519✔
571
    NState = State#state{
8,519✔
572
        serialize = Serialize,
573
        idle_timer = undefined
574
    },
575
    handle_incoming(Packet, NState);
8,519✔
576
handle_msg({incoming, Packet}, State) ->
577
    ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
38,760✔
578
    handle_incoming(Packet, State);
38,760✔
579
handle_msg({outgoing, Packets}, State) ->
580
    handle_outgoing(Packets, State);
36,516✔
581
handle_msg({Error, _Sock, Reason}, State) when
582
    Error == tcp_error; Error == ssl_error
583
->
584
    handle_info({sock_error, Reason}, State);
1✔
585
handle_msg({Closed, _Sock}, State) when
586
    Closed == tcp_closed; Closed == ssl_closed
587
->
588
    handle_info({sock_closed, Closed}, close_socket(State));
6,910✔
589
handle_msg({Passive, _Sock}, State) when
590
    Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive
591
->
592
    %% In Stats
593
    Pubs = emqx_pd:reset_counter(incoming_pubs),
10✔
594
    Bytes = emqx_pd:reset_counter(incoming_bytes),
10✔
595
    InStats = #{cnt => Pubs, oct => Bytes},
10✔
596
    %% Run GC and Check OOM
597
    NState1 = check_oom(run_gc(InStats, State)),
10✔
598
    handle_info(activate_socket, NState1);
9✔
599
handle_msg(
600
    Deliver = {deliver, _Topic, _Msg},
601
    #state{listener = {Type, Listener}} = State
602
) ->
603
    ActiveN = get_active_n(Type, Listener),
3,425✔
604
    Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
3,425✔
605
    with_channel(handle_deliver, [Delivers], State);
3,425✔
606
%% Something sent
607
handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) ->
608
    case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
41,569✔
609
        true ->
610
            Pubs = emqx_pd:reset_counter(outgoing_pubs),
1✔
611
            Bytes = emqx_pd:reset_counter(outgoing_bytes),
1✔
612
            OutStats = #{cnt => Pubs, oct => Bytes},
1✔
613
            {ok, check_oom(run_gc(OutStats, State))};
1✔
614
        false ->
615
            ok
41,568✔
616
    end;
617
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
618
    handle_info({sock_error, Reason}, State);
3✔
619
handle_msg({connack, ConnAck}, State) ->
620
    handle_outgoing(ConnAck, State);
8,440✔
621
handle_msg({close, Reason}, State) ->
622
    %% @FIXME here it could be close due to appl error.
623
    ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
3,047✔
624
    handle_info({sock_closed, Reason}, close_socket(State));
3,047✔
625
handle_msg(
626
    {event, connected},
627
    State = #state{
628
        channel = Channel,
629
        serialize = Serialize,
630
        parse_state = PS,
631
        quic_conn_pid = QuicConnPid
632
    }
633
) ->
634
    QuicConnPid =/= undefined andalso
8,440✔
635
        emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
560✔
636
    ClientId = emqx_channel:info(clientid, Channel),
8,440✔
637
    emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
8,440✔
638
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
639
    ClientId = emqx_channel:info(clientid, Channel),
2,344✔
640
    emqx_cm:set_chan_info(ClientId, info(State)),
2,344✔
641
    {ok, State};
2,344✔
642
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
643
    case emqx_channel:info(clientid, Channel) of
23,173✔
644
        %% ClientId is yet unknown (i.e. connect packet is not received yet)
645
        undefined -> ok;
1✔
646
        ClientId -> emqx_cm:insert_channel_info(ClientId, info(State), stats(State))
23,172✔
647
    end,
648
    {ok, State};
23,173✔
649
handle_msg({timeout, TRef, TMsg}, State) ->
650
    handle_timeout(TRef, TMsg, State);
3,977✔
651
handle_msg(Shutdown = {shutdown, _Reason}, State) ->
652
    stop(Shutdown, State);
1✔
653
handle_msg(Msg, State) ->
654
    handle_info(Msg, State).
4,921✔
655

656
%%--------------------------------------------------------------------
657
%% Terminate
658

659
-spec terminate(any(), state()) -> no_return().
660
terminate(
661
    Reason,
662
    State = #state{
663
        channel = Channel,
664
        transport = Transport,
665
        socket = Socket
666
    }
667
) ->
668
    try
6,456✔
669
        Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
6,456✔
670
        emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
6,456✔
671
        emqx_channel:terminate(Reason, Channel1),
6,456✔
672
        close_socket_ok(State),
6,456✔
673
        ?TRACE("SOCKET", "emqx_connection_terminated", #{reason => Reason})
6,456✔
674
    catch
675
        E:C:S ->
676
            ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
×
677
    end,
678
    ?tp(info, terminate, #{reason => Reason}),
6,456✔
679
    maybe_raise_exception(Reason).
6,456✔
680

681
%% close socket, discard new state, always return ok.
682
close_socket_ok(State) ->
683
    _ = close_socket(State),
6,456✔
684
    ok.
6,456✔
685

686
%% tell truth about the original exception
687
-spec maybe_raise_exception(any()) -> no_return().
688
maybe_raise_exception(#{
689
    exception := Exception,
690
    context := Context,
691
    stacktrace := Stacktrace
692
}) ->
693
    erlang:raise(Exception, Context, Stacktrace);
×
694
maybe_raise_exception({shutdown, normal}) ->
695
    ok;
749✔
696
maybe_raise_exception(normal) ->
697
    ok;
14✔
698
maybe_raise_exception(shutdown) ->
699
    ok;
×
700
maybe_raise_exception(Reason) ->
701
    exit(Reason).
5,693✔
702

703
%%--------------------------------------------------------------------
704
%% Sys callbacks
705

706
system_continue(Parent, _Debug, State) ->
707
    ?MODULE:recvloop(Parent, State).
21✔
708

709
system_terminate(Reason, _Parent, _Debug, State) ->
710
    terminate(Reason, State).
7✔
711

712
system_code_change(State, _Mod, _OldVsn, _Extra) ->
713
    {ok, State}.
1✔
714

715
system_get_state(State) -> {ok, State}.
21✔
716

717
%%--------------------------------------------------------------------
718
%% Handle call
719

720
handle_call(_From, info, State) ->
721
    {reply, info(State), State};
3✔
722
handle_call(_From, stats, State) ->
723
    {reply, stats(State), State};
2✔
724
handle_call(_From, Req, State = #state{channel = Channel}) ->
725
    case emqx_channel:handle_call(Req, Channel) of
706✔
726
        {reply, Reply, NChannel} ->
727
            {reply, Reply, State#state{channel = NChannel}};
273✔
728
        {shutdown, Reason, Reply, NChannel} ->
729
            shutdown(Reason, Reply, State#state{channel = NChannel});
275✔
730
        {shutdown, Reason, Reply, OutPacket, NChannel} ->
731
            NState = State#state{channel = NChannel},
158✔
732
            ok = handle_outgoing(OutPacket, NState),
158✔
733
            NState2 = graceful_shutdown_transport(Reason, NState),
158✔
734
            shutdown(Reason, Reply, NState2)
158✔
735
    end.
736

737
%%--------------------------------------------------------------------
738
%% Handle timeout
739

740
handle_timeout(_TRef, idle_timeout, State) ->
741
    shutdown(idle_timeout, State);
3✔
742
handle_timeout(_TRef, limit_timeout, State) ->
743
    retry_limiter(State);
1✔
744
handle_timeout(
745
    _TRef,
746
    emit_stats,
747
    State = #state{
748
        channel = Channel,
749
        transport = Transport,
750
        socket = Socket
751
    }
752
) ->
753
    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
51✔
754
    ClientId = emqx_channel:info(clientid, Channel),
51✔
755
    emqx_cm:set_chan_stats(ClientId, stats(State)),
51✔
756
    {ok, State#state{stats_timer = undefined}};
51✔
757
handle_timeout(
758
    TRef,
759
    keepalive,
760
    State = #state{
761
        channel = Channel
762
    }
763
) ->
764
    case emqx_channel:info(conn_state, Channel) of
3✔
765
        disconnected ->
766
            {ok, State};
×
767
        _ ->
768
            %% recv_pkt: valid MQTT message
769
            RecvCnt = emqx_pd:get_counter(recv_pkt),
3✔
770
            handle_timeout(TRef, {keepalive, RecvCnt}, State)
3✔
771
    end;
772
handle_timeout(TRef, Msg, State) ->
773
    with_channel(handle_timeout, [TRef, Msg], State).
3,926✔
774

775
%%--------------------------------------------------------------------
776
%% Parse incoming data
777
-compile({inline, [when_bytes_in/3]}).
778
when_bytes_in(Oct, Data, State) ->
779
    ?LOG(debug, #{
46,586✔
780
        msg => "raw_bin_received",
781
        size => Oct,
782
        bin => binary_to_list(binary:encode_hex(Data)),
783
        type => "hex"
784
    }),
44,450✔
785
    {Packets, NState} = parse_incoming(Data, [], State),
46,586✔
786
    Len = erlang:length(Packets),
46,586✔
787
    check_limiter(
46,586✔
788
        [{Oct, ?LIMITER_BYTES_IN}, {Len, ?LIMITER_MESSAGE_IN}],
789
        Packets,
790
        fun next_incoming_msgs/3,
791
        [],
792
        NState
793
    ).
794

795
%% @doc: return a reversed Msg list
796
-compile({inline, [next_incoming_msgs/3]}).
797
next_incoming_msgs([Packet], Msgs, State) ->
798
    {ok, [{incoming, Packet} | Msgs], State};
46,199✔
799
next_incoming_msgs(Packets, Msgs, State) ->
800
    Fun = fun(Packet, Acc) -> [{incoming, Packet} | Acc] end,
390✔
801
    Msgs2 = lists:foldl(Fun, Msgs, Packets),
390✔
802
    {ok, Msgs2, State}.
390✔
803

804
parse_incoming(<<>>, Packets, State) ->
805
    {Packets, State};
46,577✔
806
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
807
    try emqx_frame:parse(Data, ParseState) of
47,282✔
808
        {more, NParseState} ->
809
            {Packets, State#state{parse_state = NParseState}};
10✔
810
        {ok, Packet, Rest, NParseState} ->
811
            NState = State#state{parse_state = NParseState},
47,271✔
812
            parse_incoming(Rest, [Packet | Packets], NState)
47,271✔
813
    catch
814
        throw:{?FRAME_PARSE_ERROR, Reason} ->
815
            ?LOG(info, #{
1✔
816
                reason => Reason,
817
                at_state => emqx_frame:describe_state(ParseState),
818
                input_bytes => Data,
819
                parsed_packets => Packets
820
            }),
1✔
821
            {[{frame_error, Reason} | Packets], State};
1✔
822
        error:Reason:Stacktrace ->
823
            ?LOG(error, #{
×
824
                at_state => emqx_frame:describe_state(ParseState),
825
                input_bytes => Data,
826
                parsed_packets => Packets,
827
                reason => Reason,
828
                stacktrace => Stacktrace
829
            }),
×
830
            {[{frame_error, Reason} | Packets], State}
×
831
    end.
832

833
%%--------------------------------------------------------------------
834
%% Handle incoming packet
835

836
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
837
    ok = inc_incoming_stats(Packet),
47,275✔
838
    with_channel(handle_in, [Packet], State);
47,275✔
839
handle_incoming(FrameError, State) ->
840
    with_channel(handle_in, [FrameError], State).
6✔
841

842
%%--------------------------------------------------------------------
843
%% With Channel
844

845
with_channel(Fun, Args, State = #state{channel = Channel}) ->
846
    case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
66,247✔
847
        ok ->
848
            {ok, State};
1✔
849
        {ok, NChannel} ->
850
            {ok, State#state{channel = NChannel}};
10,329✔
851
        {ok, Replies, NChannel} ->
852
            {ok, next_msgs(Replies), State#state{channel = NChannel}};
49,891✔
853
        {shutdown, Reason, NChannel} ->
854
            shutdown(Reason, State#state{channel = NChannel});
5,936✔
855
        {shutdown, Reason, Packet, NChannel} ->
856
            NState = State#state{channel = NChannel},
80✔
857
            ok = handle_outgoing(Packet, NState),
80✔
858
            shutdown(Reason, NState)
80✔
859
    end.
860

861
%%--------------------------------------------------------------------
862
%% Handle outgoing packets
863

864
handle_outgoing(Packets, State) ->
865
    Res = do_handle_outgoing(Packets, State),
45,196✔
866
    emqx_external_trace:end_trace_send(Packets),
45,196✔
867
    Res.
45,196✔
868

869
do_handle_outgoing(Packets, State) when is_list(Packets) ->
870
    send(lists:map(serialize_and_inc_stats_fun(State), Packets), State);
3,462✔
871
do_handle_outgoing(Packet, State) ->
872
    send((serialize_and_inc_stats_fun(State))(Packet), State).
41,734✔
873

874
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
875
    fun(Packet) ->
45,196✔
876
        try emqx_frame:serialize_pkt(Packet, Serialize) of
45,917✔
877
            <<>> ->
878
                ?LOG(warning, #{
×
879
                    msg => "packet_is_discarded",
880
                    reason => "frame_is_too_large",
881
                    packet => emqx_packet:format(Packet, hidden)
882
                }),
×
883
                ok = emqx_metrics:inc('delivery.dropped.too_large'),
×
884
                ok = emqx_metrics:inc('delivery.dropped'),
×
885
                ok = inc_outgoing_stats({error, message_too_large}),
×
886
                <<>>;
×
887
            Data ->
888
                ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}),
45,917✔
889
                ok = inc_outgoing_stats(Packet),
45,917✔
890
                Data
45,917✔
891
        catch
892
            %% Maybe Never happen.
893
            throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
894
                ?LOG(info, #{
×
895
                    reason => Reason,
896
                    input_packet => Packet
897
                }),
×
898
                erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
×
899
            error:Reason:Stacktrace ->
900
                ?LOG(error, #{
×
901
                    input_packet => Packet,
902
                    exception => Reason,
903
                    stacktrace => Stacktrace
904
                }),
×
905
                erlang:error(?FRAME_SERIALIZE_ERROR)
×
906
        end
907
    end.
908

909
%%--------------------------------------------------------------------
910
%% Send data
911

912
-spec send(iodata(), state()) -> ok.
913
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
914
    Oct = iolist_size(IoData),
45,196✔
915
    ok = emqx_metrics:inc('bytes.sent', Oct),
45,196✔
916
    inc_counter(outgoing_bytes, Oct),
45,196✔
917
    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
45,196✔
918
    case Transport:async_send(Socket, IoData, []) of
45,196✔
919
        ok ->
920
            ok;
44,915✔
921
        Error = {error, _Reason} ->
922
            %% Send an inet_reply to postpone handling the error
923
            %% @FIXME: why not just return error?
924
            self() ! {inet_reply, Socket, Error},
281✔
925
            ok
281✔
926
    end.
927

928
%%--------------------------------------------------------------------
929
%% Handle Info
930

931
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
932
    case activate_socket(State) of
11✔
933
        {ok, NState = #state{sockstate = NewSst}} ->
934
            case OldSst =/= NewSst of
11✔
935
                true -> {ok, {event, NewSst}, NState};
5✔
936
                false -> {ok, NState}
6✔
937
            end;
938
        {error, Reason} ->
939
            handle_info({sock_error, Reason}, State)
×
940
    end;
941
handle_info({sock_error, Reason}, State) ->
942
    case Reason =/= closed andalso Reason =/= einval of
5✔
943
        true -> ?SLOG(warning, #{msg => "socket_error", reason => Reason});
3✔
944
        false -> ok
2✔
945
    end,
946
    handle_info({sock_closed, Reason}, close_socket(State));
5✔
947
%% handle QUIC control stream events
948
handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
949
    case emqx_quic_stream:Event(Handle, Prop, State) of
3,274✔
950
        {{continue, Msgs}, NewState} ->
951
            {ok, Msgs, NewState};
62✔
952
        Other ->
953
            Other
3,212✔
954
    end;
955
handle_info(Info, State) ->
956
    with_channel(handle_info, [Info], State).
11,610✔
957

958
%%--------------------------------------------------------------------
959
%% Handle Info
960

961
handle_cast(
962
    {async_set_socket_options, Opts},
963
    State = #state{
964
        transport = Transport,
965
        socket = Socket
966
    }
967
) ->
968
    case Transport:setopts(Socket, Opts) of
1✔
969
        ok ->
970
            ?tp(debug, "custom_socket_options_successfully", #{opts => Opts});
1✔
971
        {error, einval} ->
972
            %% socket is already closed, ignore this error
973
            ?tp(debug, "socket already closed", #{reason => socket_already_closed}),
×
974
            ok;
×
975
        Err ->
976
            %% other errors
977
            ?tp(error, "failed_to_set_custom_socket_option", #{reason => Err})
×
978
    end,
979
    State;
1✔
980
handle_cast(Req, State) ->
981
    ?tp(error, "received_unknown_cast", #{cast => Req}),
×
982
    State.
×
983

984
%%--------------------------------------------------------------------
985
%% rate limit
986

987
-type limiter_type() :: emqx_limiter_container:limiter_type().
988
-type limiter() :: emqx_limiter_container:container().
989
-type check_succ_handler() ::
990
    fun((any(), list(any()), state()) -> _).
991

992
%% check limiters, if succeeded call WhenOk with Data and Msgs
993
%% Data is the data to be processed
994
%% Msgs include the next msg which after Data processed
995
-spec check_limiter(
996
    list({pos_integer(), limiter_type()}),
997
    any(),
998
    check_succ_handler(),
999
    list(any()),
1000
    state()
1001
) -> _.
1002

1003
check_limiter(
1004
    _Needs,
1005
    Data,
1006
    WhenOk,
1007
    Msgs,
1008
    #state{limiter = infinity} = State
1009
) ->
1010
    WhenOk(Data, Msgs, State);
46,581✔
1011
check_limiter(
1012
    Needs,
1013
    Data,
1014
    WhenOk,
1015
    Msgs,
1016
    #state{limiter_timer = undefined, limiter = Limiter} = State
1017
) ->
1018
    case emqx_limiter_container:check_list(Needs, Limiter) of
8✔
1019
        {ok, Limiter2} ->
1020
            WhenOk(Data, Msgs, State#state{limiter = Limiter2});
5✔
1021
        {pause, Time, Limiter2} ->
1022
            ?SLOG(debug, #{
3✔
1023
                msg => "pause_time_due_to_rate_limit",
1024
                needs => Needs,
1025
                time_in_ms => Time
1026
            }),
3✔
1027

1028
            Retry = #retry{
3✔
1029
                types = [Type || {_, Type} <- Needs],
5✔
1030
                data = Data,
1031
                next = WhenOk
1032
            },
1033

1034
            Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
3✔
1035

1036
            TRef = start_timer(Time, limit_timeout),
3✔
1037

1038
            {ok, State#state{
3✔
1039
                limiter = Limiter3,
1040
                limiter_timer = TRef
1041
            }};
1042
        {drop, Limiter2} ->
1043
            {ok, State#state{limiter = Limiter2}}
×
1044
    end;
1045
check_limiter(
1046
    Needs,
1047
    Data,
1048
    WhenOk,
1049
    _Msgs,
1050
    #state{limiter_buffer = Buffer} = State
1051
) ->
1052
    %% if there has a retry timer,
1053
    %% Buffer the operation and execute it after the retry is over
1054
    %% the maximum length of the buffer queue is equal to the active_n
1055
    New = #pending_req{need = Needs, data = Data, next = WhenOk},
×
1056
    {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}.
×
1057

1058
%% try to perform a retry
1059
-spec retry_limiter(state()) -> _.
1060
retry_limiter(#state{limiter = Limiter} = State) ->
1061
    #retry{types = Types, data = Data, next = Next} =
1✔
1062
        emqx_limiter_container:get_retry_context(Limiter),
1063
    case emqx_limiter_container:retry_list(Types, Limiter) of
1✔
1064
        {ok, Limiter2} ->
1065
            Next(
1✔
1066
                Data,
1067
                [check_limiter_buffer],
1068
                State#state{
1069
                    limiter = Limiter2,
1070
                    limiter_timer = undefined
1071
                }
1072
            );
1073
        {pause, Time, Limiter2} ->
1074
            ?SLOG(debug, #{
×
1075
                msg => "pause_time_due_to_rate_limit",
1076
                types => Types,
1077
                time_in_ms => Time
1078
            }),
×
1079

1080
            TRef = start_timer(Time, limit_timeout),
×
1081

1082
            {ok, State#state{
×
1083
                limiter = Limiter2,
1084
                limiter_timer = TRef
1085
            }}
1086
    end.
1087

1088
%%--------------------------------------------------------------------
1089
%% Run GC and Check OOM
1090

1091
run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) ->
1092
    case
11✔
1093
        ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone) andalso
11✔
1094
            emqx_gc:run(Stats, GcSt)
11✔
1095
    of
1096
        false -> State;
×
1097
        {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
11✔
1098
    end.
1099

1100
check_oom(State = #state{channel = Channel}) ->
1101
    ShutdownPolicy = emqx_config:get_zone_conf(
11✔
1102
        emqx_channel:info(zone, Channel), [force_shutdown]
1103
    ),
1104
    ?tp(debug, check_oom, #{policy => ShutdownPolicy}),
11✔
1105
    case emqx_utils:check_oom(ShutdownPolicy) of
11✔
1106
        {shutdown, Reason} ->
1107
            %% triggers terminate/2 callback immediately
1108
            erlang:exit({shutdown, Reason});
1✔
1109
        _ ->
1110
            ok
10✔
1111
    end,
1112
    State.
10✔
1113

1114
%%--------------------------------------------------------------------
1115
%% Activate Socket
1116
%% TODO: maybe we could keep socket passive for receiving socket closed event.
1117
-compile({inline, [activate_socket/1]}).
1118
activate_socket(#state{limiter_timer = Timer} = State) when
1119
    Timer =/= undefined
1120
->
1121
    {ok, State#state{sockstate = blocked}};
3✔
1122
activate_socket(
1123
    #state{
1124
        transport = Transport,
1125
        sockstate = SockState,
1126
        socket = Socket,
1127
        listener = {Type, Listener}
1128
    } = State
1129
) when
1130
    SockState =/= closed
1131
->
1132
    ActiveN = get_active_n(Type, Listener),
8,542✔
1133
    case Transport:setopts(Socket, [{active, ActiveN}]) of
8,542✔
1134
        ok -> {ok, State#state{sockstate = running}};
8,538✔
1135
        Error -> Error
4✔
1136
    end;
1137
activate_socket(State) ->
1138
    {ok, State}.
1✔
1139

1140
%%--------------------------------------------------------------------
1141
%% Close Socket
1142

1143
close_socket(State = #state{sockstate = closed}) ->
1144
    State;
8,061✔
1145
close_socket(State = #state{transport = Transport, socket = Socket}) ->
1146
    ok = Transport:fast_close(Socket),
8,359✔
1147
    State#state{sockstate = closed}.
8,359✔
1148

1149
%%--------------------------------------------------------------------
1150
%% Inc incoming/outgoing stats
1151

1152
-compile({inline, [inc_incoming_stats/1]}).
1153
inc_incoming_stats(Packet = ?PACKET(Type)) ->
1154
    inc_counter(recv_pkt, 1),
47,275✔
1155
    case Type =:= ?PUBLISH of
47,275✔
1156
        true ->
1157
            inc_counter(recv_msg, 1),
6,203✔
1158
            inc_qos_stats(recv_msg, Packet),
6,203✔
1159
            inc_counter(incoming_pubs, 1);
6,203✔
1160
        false ->
1161
            ok
41,072✔
1162
    end,
1163
    emqx_metrics:inc_recv(Packet).
47,275✔
1164

1165
-compile({inline, [inc_outgoing_stats/1]}).
1166
inc_outgoing_stats({error, message_too_large}) ->
1167
    inc_counter('send_msg.dropped', 1),
×
1168
    inc_counter('send_msg.dropped.too_large', 1);
×
1169
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
1170
    inc_counter(send_pkt, 1),
45,917✔
1171
    case Type of
45,917✔
1172
        ?PUBLISH ->
1173
            inc_counter(send_msg, 1),
4,171✔
1174
            inc_counter(outgoing_pubs, 1),
4,171✔
1175
            inc_qos_stats(send_msg, Packet);
4,171✔
1176
        _ ->
1177
            ok
41,746✔
1178
    end,
1179
    emqx_metrics:inc_sent(Packet).
45,917✔
1180

1181
inc_qos_stats(Type, Packet) ->
1182
    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
10,374✔
1183
        undefined ->
1184
            ignore;
×
1185
        Key ->
1186
            inc_counter(Key, 1)
10,374✔
1187
    end.
1188

1189
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
906✔
1190
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
3,099✔
1191
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
166✔
1192
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
1,048✔
1193
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
4,400✔
1194
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
755✔
1195
%% for bad qos
1196
inc_qos_stats_key(_, _) -> undefined.
×
1197

1198
%%--------------------------------------------------------------------
1199
%% Helper functions
1200

1201
-compile({inline, [next_msgs/1]}).
1202
next_msgs(Packet) when is_record(Packet, mqtt_packet) ->
1203
    {outgoing, Packet};
8,353✔
1204
next_msgs(Event) when is_tuple(Event) ->
1205
    Event;
8,396✔
1206
next_msgs(More) when is_list(More) ->
1207
    More.
33,145✔
1208

1209
-compile({inline, [shutdown/2, shutdown/3]}).
1210
shutdown(Reason, State) ->
1211
    stop({shutdown, Reason}, State).
6,019✔
1212

1213
shutdown(Reason, Reply, State) ->
1214
    stop({shutdown, Reason}, Reply, State).
433✔
1215

1216
-compile({inline, [stop/2, stop/3]}).
1217
stop(Reason, State) ->
1218
    {stop, Reason, State}.
6,452✔
1219

1220
stop(Reason, Reply, State) ->
1221
    {stop, Reason, Reply, State}.
433✔
1222

1223
inc_counter(Key, Inc) ->
1224
    _ = emqx_pd:inc_counter(Key, Inc),
216,096✔
1225
    ok.
216,096✔
1226

1227
set_tcp_keepalive({quic, _Listener}) ->
1228
    ok;
577✔
1229
set_tcp_keepalive({Type, Id}) ->
1230
    Conf = emqx_config:get_listener_conf(Type, Id, [tcp_options, keepalive], "none"),
8,010✔
1231
    case Conf of
8,010✔
1232
        "none" ->
1233
            ok;
8,010✔
1234
        Value ->
1235
            %% the value is already validated by schema, so we do not validate it again.
1236
            {Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(Value),
×
1237
            async_set_keepalive(Idle, Interval, Probes)
×
1238
    end.
1239

1240
-spec graceful_shutdown_transport(atom(), state()) -> state().
1241
graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) ->
1242
    %% @TODO Reason is reserved for future use, quic transport
1243
    Transport:shutdown(Socket, read_write),
158✔
1244
    S#state{sockstate = closed}.
158✔
1245

1246
%%--------------------------------------------------------------------
1247
%% For CT tests
1248
%%--------------------------------------------------------------------
1249

1250
set_field(Name, Value, State) ->
1251
    Pos = emqx_utils:index_of(Name, record_info(fields, state)),
69✔
1252
    setelement(Pos + 1, State, Value).
69✔
1253

1254
get_state(Pid) ->
1255
    State = sys:get_state(Pid),
8✔
1256
    maps:from_list(
8✔
1257
        lists:zip(
1258
            record_info(fields, state),
1259
            tl(tuple_to_list(State))
1260
        )
1261
    ).
1262

1263
get_active_n(quic, _Listener) ->
1264
    ?ACTIVE_N;
874✔
1265
get_active_n(Type, Listener) ->
1266
    emqx_config:get_listener_conf(Type, Listener, [tcp_options, active_n]).
52,662✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc