• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8625278842

10 Apr 2024 02:52AM UTC coverage: 62.489% (-0.002%) from 62.491%
8625278842

push

github

web-flow
Merge pull request #12855 from JimMoen/fix-share-queue-format

fix(mgmt): $queue shared topics format in mgmt topics api

0 of 1 new or added line in 1 file covered. (0.0%)

49 existing lines in 10 files now uncovered.

34606 of 55379 relevant lines covered (62.49%)

6756.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.65
/apps/emqx/src/emqx_ws_connection.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% MQTT/WS|WSS Connection
18
-module(emqx_ws_connection).
19

20
-include("emqx.hrl").
21
-include("emqx_cm.hrl").
22
-include("emqx_mqtt.hrl").
23
-include("logger.hrl").
24
-include("types.hrl").
25

26
-ifdef(TEST).
27
-compile(export_all).
28
-compile(nowarn_export_all).
29
-endif.
30

31
%% API
32
-export([
33
    info/1,
34
    stats/1
35
]).
36

37
-export([
38
    call/2,
39
    call/3
40
]).
41

42
%% WebSocket callbacks
43
-export([
44
    init/2,
45
    websocket_init/1,
46
    websocket_handle/2,
47
    websocket_info/2,
48
    websocket_close/2,
49
    terminate/3
50
]).
51

52
%% Export for CT
53
-export([set_field/3]).
54

55
-import(
56
    emqx_utils,
57
    [
58
        maybe_apply/2,
59
        start_timer/2
60
    ]
61
).
62

63
-record(state, {
64
    %% Peername of the ws connection
65
    peername :: emqx_types:peername(),
66
    %% Sockname of the ws connection
67
    sockname :: emqx_types:peername(),
68
    %% Sock state
69
    sockstate :: emqx_types:sockstate(),
70
    %% MQTT Piggyback
71
    mqtt_piggyback :: single | multiple,
72
    %% Parse State
73
    parse_state :: emqx_frame:parse_state(),
74
    %% Serialize options
75
    serialize :: emqx_frame:serialize_opts(),
76
    %% Channel
77
    channel :: emqx_channel:channel(),
78
    %% GC State
79
    gc_state :: option(emqx_gc:gc_state()),
80
    %% Postponed Packets|Cmds|Events
81
    postponed :: list(emqx_types:packet() | ws_cmd() | tuple()),
82
    %% Stats Timer
83
    stats_timer :: disabled | option(reference()),
84
    %% Idle Timeout
85
    idle_timeout :: timeout(),
86
    %% Idle Timer
87
    idle_timer :: option(reference()),
88
    %% Zone name
89
    zone :: atom(),
90
    %% Listener Type and Name
91
    listener :: {Type :: atom(), Name :: atom()},
92

93
    %% Limiter
94
    limiter :: container(),
95

96
    %% cache operation when overload
97
    limiter_buffer :: queue:queue(cache()),
98

99
    %% limiter timers
100
    limiter_timer :: undefined | reference()
101
}).
102

103
-record(retry, {
104
    types :: list(limiter_type()),
105
    data :: any(),
106
    next :: check_succ_handler()
107
}).
108

109
-record(cache, {
110
    need :: list({pos_integer(), limiter_type()}),
111
    data :: any(),
112
    next :: check_succ_handler()
113
}).
114

115
-type state() :: #state{}.
116
-type cache() :: #cache{}.
117

118
-type ws_cmd() :: {active, boolean()} | close.
119

120
-define(ACTIVE_N, 100).
121
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
122
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
123

124
-define(ENABLED(X), (X =/= undefined)).
125
-define(LIMITER_BYTES_IN, bytes).
126
-define(LIMITER_MESSAGE_IN, messages).
127

128
-dialyzer({no_match, [info/2]}).
129
-dialyzer({nowarn_function, [websocket_init/1]}).
130

131
-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
132

133
%%--------------------------------------------------------------------
134
%% Info, Stats
135
%%--------------------------------------------------------------------
136

137
-spec info(pid() | state()) -> emqx_types:infos().
138
info(WsPid) when is_pid(WsPid) ->
139
    call(WsPid, info);
×
140
info(State = #state{channel = Channel}) ->
141
    ChanInfo = emqx_channel:info(Channel),
182✔
142
    SockInfo = maps:from_list(
182✔
143
        info(?INFO_KEYS, State)
144
    ),
145
    ChanInfo#{sockinfo => SockInfo}.
182✔
146

147
info(Keys, State) when is_list(Keys) ->
148
    [{Key, info(Key, State)} || Key <- Keys];
182✔
149
info(socktype, _State) ->
150
    ws;
182✔
151
info(peername, #state{peername = Peername}) ->
152
    Peername;
182✔
153
info(sockname, #state{sockname = Sockname}) ->
154
    Sockname;
182✔
155
info(sockstate, #state{sockstate = SockSt}) ->
156
    SockSt;
183✔
157
info(limiter, #state{limiter = Limiter}) ->
158
    Limiter;
1✔
159
info(channel, #state{channel = Channel}) ->
160
    emqx_channel:info(Channel);
1✔
161
info(gc_state, #state{gc_state = GcSt}) ->
162
    maybe_apply(fun emqx_gc:info/1, GcSt);
1✔
163
info(postponed, #state{postponed = Postponed}) ->
164
    Postponed;
5✔
165
info(stats_timer, #state{stats_timer = TRef}) ->
166
    TRef;
2✔
167
info(idle_timeout, #state{idle_timeout = Timeout}) ->
168
    Timeout;
×
169
info(idle_timer, #state{idle_timer = TRef}) ->
170
    TRef.
×
171

172
-spec stats(pid() | state()) -> emqx_types:stats().
173
stats(WsPid) when is_pid(WsPid) ->
174
    call(WsPid, stats);
×
175
stats(#state{channel = Channel}) ->
176
    SockStats = emqx_pd:get_counters(?SOCK_STATS),
158✔
177
    ChanStats = emqx_channel:stats(Channel),
158✔
178
    ProcStats = emqx_utils:proc_stats(),
158✔
179
    lists:append([SockStats, ChanStats, ProcStats]).
158✔
180

181
%% kick|discard|takeover
182
-spec call(pid(), Req :: term()) -> Reply :: term().
183
call(WsPid, Req) ->
184
    call(WsPid, Req, 5000).
4✔
185

186
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
187
    Mref = erlang:monitor(process, WsPid),
36✔
188
    WsPid ! {call, {self(), Mref}, Req},
36✔
189
    receive
36✔
190
        {Mref, Reply} ->
191
            ok = emqx_pmon:demonitor(Mref),
36✔
192
            Reply;
36✔
193
        {'DOWN', Mref, _, _, Reason} ->
UNCOV
194
            exit(Reason)
×
195
    after Timeout ->
196
        ok = emqx_pmon:demonitor(Mref),
×
197
        exit(timeout)
×
198
    end.
199

200
%%--------------------------------------------------------------------
201
%% WebSocket callbacks
202
%%--------------------------------------------------------------------
203

204
init(Req, #{listener := {Type, Listener}} = Opts) ->
205
    %% WS Transport Idle Timeout
206
    WsOpts = #{
116✔
207
        compress => get_ws_opts(Type, Listener, compress),
208
        deflate_opts => get_ws_opts(Type, Listener, deflate_opts),
209
        max_frame_size => get_ws_opts(Type, Listener, max_frame_size),
210
        idle_timeout => get_ws_opts(Type, Listener, idle_timeout),
211
        validate_utf8 => get_ws_opts(Type, Listener, validate_utf8)
212
    },
213
    case check_origin_header(Req, Opts) of
116✔
214
        {error, Reason} ->
215
            ?SLOG(error, #{msg => "invalid_origin_header", reason => Reason}),
1✔
216
            {ok, cowboy_req:reply(403, Req), WsOpts};
1✔
217
        ok ->
218
            parse_sec_websocket_protocol(Req, Opts, WsOpts)
115✔
219
    end.
220

221
parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) ->
222
    case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
115✔
223
        undefined ->
224
            case get_ws_opts(Type, Listener, fail_if_no_subprotocol) of
1✔
225
                true ->
226
                    {ok, cowboy_req:reply(400, Req), WsOpts};
1✔
227
                false ->
228
                    {cowboy_websocket, Req, [Req, Opts], WsOpts}
×
229
            end;
230
        Subprotocols ->
231
            SupportedSubprotocols = get_ws_opts(Type, Listener, supported_subprotocols),
114✔
232
            NSupportedSubprotocols = [
114✔
233
                list_to_binary(Subprotocol)
456✔
234
             || Subprotocol <- SupportedSubprotocols
114✔
235
            ],
236
            case pick_subprotocol(Subprotocols, NSupportedSubprotocols) of
114✔
237
                {ok, Subprotocol} ->
238
                    Resp = cowboy_req:set_resp_header(
113✔
239
                        <<"sec-websocket-protocol">>,
240
                        Subprotocol,
241
                        Req
242
                    ),
243
                    {cowboy_websocket, Resp, [Req, Opts], WsOpts};
113✔
244
                {error, no_supported_subprotocol} ->
245
                    {ok, cowboy_req:reply(400, Req), WsOpts}
1✔
246
            end
247
    end.
248

249
pick_subprotocol([], _SupportedSubprotocols) ->
250
    {error, no_supported_subprotocol};
1✔
251
pick_subprotocol([Subprotocol | Rest], SupportedSubprotocols) ->
252
    case lists:member(Subprotocol, SupportedSubprotocols) of
114✔
253
        true ->
254
            {ok, Subprotocol};
113✔
255
        false ->
256
            pick_subprotocol(Rest, SupportedSubprotocols)
1✔
257
    end.
258

259
parse_header_fun_origin(Req, #{listener := {Type, Listener}}) ->
260
    case cowboy_req:header(<<"origin">>, Req) of
2✔
261
        undefined ->
262
            case get_ws_opts(Type, Listener, allow_origin_absence) of
×
263
                true -> ok;
×
264
                false -> {error, origin_header_cannot_be_absent}
×
265
            end;
266
        Value ->
267
            case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of
2✔
268
                true -> ok;
1✔
269
                false -> {error, #{bad_origin => Value}}
1✔
270
            end
271
    end.
272

273
check_origin_header(Req, #{listener := {Type, Listener}} = Opts) ->
274
    case get_ws_opts(Type, Listener, check_origin_enable) of
116✔
275
        true -> parse_header_fun_origin(Req, Opts);
2✔
276
        false -> ok
114✔
277
    end.
278

279
websocket_init([Req, Opts]) ->
280
    #{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener} = ListenerCfg} = Opts,
158✔
281
    case check_max_connection(Type, Listener) of
158✔
282
        allow ->
283
            {Peername, PeerCert} = get_peer_info(Type, Listener, Req, Opts),
158✔
284
            Sockname = cowboy_req:sock(Req),
158✔
285
            WsCookie = get_ws_cookie(Req),
158✔
286
            ConnInfo = #{
158✔
287
                socktype => ws,
288
                peername => Peername,
289
                sockname => Sockname,
290
                peercert => PeerCert,
291
                ws_cookie => WsCookie,
292
                conn_mod => ?MODULE
293
            },
294
            Limiter = emqx_limiter_container:get_limiter_by_types(
158✔
295
                ListenerCfg,
296
                [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
297
                LimiterCfg
298
            ),
299
            MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback),
158✔
300
            FrameOpts = #{
158✔
301
                strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
302
                max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
303
            },
304
            ParseState = emqx_frame:initial_parse_state(FrameOpts),
158✔
305
            Serialize = emqx_frame:serialize_opts(),
158✔
306
            Channel = emqx_channel:init(ConnInfo, Opts),
158✔
307
            GcState = get_force_gc(Zone),
158✔
308
            StatsTimer = get_stats_enable(Zone),
158✔
309
            %% MQTT Idle Timeout
310
            IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
158✔
311
            IdleTimer = start_timer(IdleTimeout, idle_timeout),
158✔
312
            tune_heap_size(Channel),
158✔
313
            emqx_logger:set_metadata_peername(esockd:format(Peername)),
158✔
314
            {ok,
158✔
315
                #state{
316
                    peername = Peername,
317
                    sockname = Sockname,
318
                    sockstate = running,
319
                    mqtt_piggyback = MQTTPiggyback,
320
                    limiter = Limiter,
321
                    parse_state = ParseState,
322
                    serialize = Serialize,
323
                    channel = Channel,
324
                    gc_state = GcState,
325
                    postponed = [],
326
                    stats_timer = StatsTimer,
327
                    idle_timeout = IdleTimeout,
328
                    idle_timer = IdleTimer,
329
                    zone = Zone,
330
                    listener = {Type, Listener},
331
                    limiter_timer = undefined,
332
                    limiter_buffer = queue:new()
333
                },
334
                hibernate};
335
        {denny, Reason} ->
336
            {stop, Reason}
×
337
    end.
338

339
tune_heap_size(Channel) ->
340
    case
158✔
341
        emqx_config:get_zone_conf(
342
            emqx_channel:info(zone, Channel),
343
            [force_shutdown]
344
        )
345
    of
346
        #{enable := false} -> ok;
×
347
        ShutdownPolicy -> emqx_utils:tune_heap_size(ShutdownPolicy)
158✔
348
    end.
349

350
get_stats_enable(Zone) ->
351
    case emqx_config:get_zone_conf(Zone, [stats, enable]) of
158✔
352
        true -> undefined;
158✔
353
        false -> disabled
×
354
    end.
355

356
get_force_gc(Zone) ->
357
    case emqx_config:get_zone_conf(Zone, [force_gc]) of
158✔
358
        #{enable := false} -> undefined;
×
359
        GcPolicy -> emqx_gc:init(GcPolicy)
158✔
360
    end.
361

362
get_ws_cookie(Req) ->
363
    try
158✔
364
        cowboy_req:parse_cookies(Req)
158✔
365
    catch
366
        error:badarg ->
367
            ?SLOG(error, #{msg => "bad_cookie"}),
46✔
368
            undefined;
46✔
369
        Error:Reason ->
370
            ?SLOG(error, #{
×
371
                msg => "failed_to_parse_cookie",
372
                exception => Error,
373
                reason => Reason
374
            }),
×
375
            undefined
×
376
    end.
377

378
get_peer_info(Type, Listener, Req, Opts) ->
379
    case
158✔
380
        emqx_config:get_listener_conf(Type, Listener, [proxy_protocol]) andalso
158✔
381
            maps:get(proxy_header, Req)
×
382
    of
383
        #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
384
            SourceName = {SrcAddr, SrcPort},
×
385
            %% Notice: CN is only available in Proxy Protocol V2 additional info.
386
            %% `CN` is unsupported in Proxy Protocol V1
387
            %% `pp2_ssl_cn` is required by config `peer_cert_as_username` or `peer_cert_as_clientid`.
388
            %% It will be parsed by esockd.
389
            %% See also `emqx_channel:set_peercert_infos/3` and `esockd_peercert:common_name/1`
390
            SourceSSL =
×
391
                case maps:get(cn, SSL, undefined) of
392
                    undefined -> undefined;
×
393
                    CN -> [{pp2_ssl_cn, CN}]
×
394
                end,
395
            {SourceName, SourceSSL};
×
396
        #{src_address := SrcAddr, src_port := SrcPort} ->
397
            SourceName = {SrcAddr, SrcPort},
×
398
            {SourceName, nossl};
×
399
        _ ->
400
            {get_peer(Req, Opts), cowboy_req:cert(Req)}
158✔
401
    end.
402

403
websocket_handle({binary, Data}, State) when is_list(Data) ->
404
    websocket_handle({binary, iolist_to_binary(Data)}, State);
1✔
405
websocket_handle({binary, Data}, State) ->
406
    ?LOG(debug, #{
329✔
407
        msg => "raw_bin_received",
408
        size => iolist_size(Data),
409
        bin => binary_to_list(binary:encode_hex(Data)),
410
        type => "hex"
411
    }),
319✔
412
    State2 = ensure_stats_timer(State),
329✔
413
    {Packets, State3} = parse_incoming(Data, [], State2),
329✔
414
    LenMsg = erlang:length(Packets),
329✔
415
    ByteSize = erlang:iolist_size(Data),
329✔
416
    inc_recv_stats(LenMsg, ByteSize),
329✔
417
    State4 = check_limiter(
329✔
418
        [{ByteSize, ?LIMITER_BYTES_IN}, {LenMsg, ?LIMITER_MESSAGE_IN}],
419
        Packets,
420
        fun when_msg_in/3,
421
        [],
422
        State3
423
    ),
424
    return(State4);
329✔
425
%% Pings should be replied with pongs, cowboy does it automatically
426
%% Pongs can be safely ignored. Clause here simply prevents crash.
427
websocket_handle(Frame, State) when Frame =:= ping; Frame =:= pong ->
428
    return(State);
2✔
429
websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
430
    return(State);
2✔
431
websocket_handle({Frame, _}, State) ->
432
    %% TODO: should not close the ws connection
433
    ?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
1✔
434
    shutdown(unexpected_ws_frame, State).
1✔
435
websocket_info({call, From, Req}, State) ->
436
    handle_call(From, Req, State);
33✔
437
websocket_info({cast, rate_limit}, State) ->
438
    Stats = #{
1✔
439
        cnt => emqx_pd:reset_counter(incoming_pubs),
440
        oct => emqx_pd:reset_counter(incoming_bytes)
441
    },
442
    return(postpone({check_gc, Stats}, State));
1✔
443
websocket_info({cast, Msg}, State) ->
444
    handle_info(Msg, State);
1✔
445
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
446
    Serialize = emqx_frame:serialize_opts(ConnPkt),
105✔
447
    NState = State#state{serialize = Serialize},
105✔
448
    handle_incoming(Packet, cancel_idle_timer(NState));
105✔
449
websocket_info({incoming, Packet}, State) ->
450
    ?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}),
221✔
451
    handle_incoming(Packet, State);
221✔
452
websocket_info({outgoing, Packets}, State) ->
453
    return(enqueue(Packets, State));
90✔
454
websocket_info({check_gc, Stats}, State) ->
455
    return(check_oom(run_gc(Stats, State)));
1✔
456
websocket_info(
457
    Deliver = {deliver, _Topic, _Msg},
458
    State = #state{listener = {Type, Listener}}
459
) ->
460
    ActiveN = get_active_n(Type, Listener),
44✔
461
    Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
44✔
462
    with_channel(handle_deliver, [Delivers], State);
44✔
463
websocket_info(
464
    {timeout, _, limit_timeout},
465
    State
466
) ->
467
    return(retry_limiter(State));
1✔
468
websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
469
    case queue:peek(Buffer) of
×
470
        empty ->
471
            return(enqueue({active, true}, State#state{sockstate = running}));
×
472
        {value, #cache{need = Needs, data = Data, next = Next}} ->
473
            State2 = State#state{limiter_buffer = queue:drop(Buffer)},
×
474
            return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2))
×
475
    end;
476
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
477
    handle_timeout(TRef, Msg, State);
1,116✔
478
websocket_info({shutdown, Reason}, State) ->
479
    shutdown(Reason, State);
1✔
480
websocket_info({stop, Reason}, State) ->
481
    shutdown(Reason, State);
1✔
482
websocket_info(Info, State) ->
483
    handle_info(Info, State).
272✔
484

485
websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
486
    websocket_close(ReasonCode, State);
×
487
websocket_close(Reason, State) ->
488
    ?TRACE("SOCKET", "websocket_closed", #{reason => Reason}),
84✔
489
    handle_info({sock_closed, Reason}, State).
84✔
490

491
terminate(Reason, _Req, #state{channel = Channel}) ->
492
    ?TRACE("SOCKET", "websocket_terminated", #{reason => Reason}),
102✔
493
    emqx_channel:terminate(Reason, Channel);
102✔
494
terminate(_Reason, _Req, _UnExpectedState) ->
495
    ok.
2✔
496

497
%%--------------------------------------------------------------------
498
%% Handle call
499
%%--------------------------------------------------------------------
500

501
handle_call(From, info, State) ->
502
    gen_server:reply(From, info(State)),
×
503
    return(State);
×
504
handle_call(From, stats, State) ->
505
    gen_server:reply(From, stats(State)),
×
506
    return(State);
×
507
handle_call(From, Req, State = #state{channel = Channel}) ->
508
    case emqx_channel:handle_call(Req, Channel) of
33✔
509
        {reply, Reply, NChannel} ->
510
            gen_server:reply(From, Reply),
14✔
511
            return(State#state{channel = NChannel});
14✔
512
        {shutdown, Reason, Reply, NChannel} ->
513
            gen_server:reply(From, Reply),
17✔
514
            shutdown(Reason, State#state{channel = NChannel});
17✔
515
        {shutdown, Reason, Reply, Packet, NChannel} ->
516
            gen_server:reply(From, Reply),
2✔
517
            NState = State#state{channel = NChannel},
2✔
518
            shutdown(Reason, enqueue(Packet, NState))
2✔
519
    end.
520

521
%%--------------------------------------------------------------------
522
%% Handle Info
523
%%--------------------------------------------------------------------
524

525
handle_info({connack, ConnAck}, State) ->
526
    return(enqueue(ConnAck, State));
89✔
527
handle_info({close, Reason}, State) ->
528
    ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
2✔
529
    return(enqueue({close, Reason}, State));
2✔
530
handle_info({event, connected}, State = #state{channel = Channel}) ->
531
    ClientId = emqx_channel:info(clientid, Channel),
89✔
532
    emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
89✔
533
    return(State);
89✔
534
handle_info({event, disconnected}, State = #state{channel = Channel}) ->
535
    ClientId = emqx_channel:info(clientid, Channel),
33✔
536
    emqx_cm:set_chan_info(ClientId, info(State)),
33✔
537
    return(State);
33✔
538
handle_info({event, _Other}, State = #state{channel = Channel}) ->
539
    case emqx_channel:info(clientid, Channel) of
57✔
540
        %% ClientId is yet unknown (i.e. connect packet is not received yet)
541
        undefined ->
542
            ok;
×
543
        ClientId ->
544
            emqx_cm:set_chan_info(ClientId, info(State)),
57✔
545
            emqx_cm:set_chan_stats(ClientId, stats(State))
57✔
546
    end,
547
    return(State);
57✔
548
handle_info(Info, State) ->
549
    with_channel(handle_info, [Info], State).
92✔
550

551
%%--------------------------------------------------------------------
552
%% Handle timeout
553
%%--------------------------------------------------------------------
554

555
handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
556
    shutdown(idle_timeout, State);
1✔
557
handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
558
    RecvOct = emqx_pd:get_counter(recv_oct),
2✔
559
    handle_timeout(TRef, {keepalive, RecvOct}, State);
2✔
560
handle_timeout(
561
    TRef,
562
    emit_stats,
563
    State = #state{
564
        channel = Channel,
565
        stats_timer = TRef
566
    }
567
) ->
568
    ClientId = emqx_channel:info(clientid, Channel),
11✔
569
    emqx_cm:set_chan_stats(ClientId, stats(State)),
11✔
570
    return(State#state{stats_timer = undefined});
11✔
571
handle_timeout(TRef, TMsg, State) ->
572
    with_channel(handle_timeout, [TRef, TMsg], State).
1,107✔
573

574
%%--------------------------------------------------------------------
575
%% Ensure rate limit
576
%%--------------------------------------------------------------------
577

578
-type limiter_type() :: emqx_limiter_container:limiter_type().
579
-type container() :: emqx_limiter_container:container().
580
-type check_succ_handler() ::
581
    fun((any(), list(any()), state()) -> state()).
582

583
-spec check_limiter(
584
    list({pos_integer(), limiter_type()}),
585
    any(),
586
    check_succ_handler(),
587
    list(any()),
588
    state()
589
) -> state().
590
check_limiter(
591
    _Needs,
592
    Data,
593
    WhenOk,
594
    Msgs,
595
    #state{limiter = infinity} = State
596
) ->
597
    WhenOk(Data, Msgs, State);
329✔
598
check_limiter(
599
    Needs,
600
    Data,
601
    WhenOk,
602
    Msgs,
603
    #state{limiter_timer = undefined, limiter = Limiter} = State
604
) ->
605
    case emqx_limiter_container:check_list(Needs, Limiter) of
1✔
606
        {ok, Limiter2} ->
607
            WhenOk(Data, Msgs, State#state{limiter = Limiter2});
×
608
        {pause, Time, Limiter2} ->
609
            ?SLOG(debug, #{
1✔
610
                msg => "pause_time_due_to_rate_limit",
611
                needs => Needs,
612
                time_in_ms => Time
613
            }),
×
614

615
            Retry = #retry{
1✔
616
                types = [Type || {_, Type} <- Needs],
1✔
617
                data = Data,
618
                next = WhenOk
619
            },
620

621
            Limiter3 = emqx_limiter_container:set_retry_context(Retry, Limiter2),
1✔
622

623
            TRef = start_timer(Time, limit_timeout),
1✔
624

625
            enqueue(
1✔
626
                {active, false},
627
                State#state{
628
                    sockstate = blocked,
629
                    limiter = Limiter3,
630
                    limiter_timer = TRef
631
                }
632
            );
633
        {drop, Limiter2} ->
634
            {ok, State#state{limiter = Limiter2}}
×
635
    end;
636
check_limiter(
637
    Needs,
638
    Data,
639
    WhenOk,
640
    _Msgs,
641
    #state{limiter_buffer = Buffer} = State
642
) ->
643
    New = #cache{need = Needs, data = Data, next = WhenOk},
×
644
    State#state{limiter_buffer = queue:in(New, Buffer)}.
×
645

646
-spec retry_limiter(state()) -> state().
647
retry_limiter(#state{limiter = Limiter} = State) ->
648
    #retry{types = Types, data = Data, next = Next} = emqx_limiter_container:get_retry_context(
1✔
649
        Limiter
650
    ),
651
    case emqx_limiter_container:retry_list(Types, Limiter) of
1✔
652
        {ok, Limiter2} ->
653
            Next(
1✔
654
                Data,
655
                [check_limiter_buffer],
656
                State#state{
657
                    limiter = Limiter2,
658
                    limiter_timer = undefined
659
                }
660
            );
661
        {pause, Time, Limiter2} ->
662
            ?SLOG(debug, #{
×
663
                msg => "pause_time_due_to_rate_limit",
664
                types => Types,
665
                time_in_ms => Time
666
            }),
×
667

668
            TRef = start_timer(Time, limit_timeout),
×
669

670
            State#state{limiter = Limiter2, limiter_timer = TRef}
×
671
    end.
672

673
when_msg_in(Packets, [], State) ->
674
    postpone(Packets, State);
329✔
675
when_msg_in(Packets, Msgs, State) ->
676
    postpone(Packets, enqueue(Msgs, State)).
1✔
677

678
%%--------------------------------------------------------------------
679
%% Run GC, Check OOM
680
%%--------------------------------------------------------------------
681

682
run_gc(Stats, State = #state{gc_state = GcSt}) ->
683
    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
2✔
684
        false -> State;
×
685
        {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
2✔
686
    end.
687

688
check_oom(State = #state{channel = Channel}) ->
689
    ShutdownPolicy = emqx_config:get_zone_conf(
1✔
690
        emqx_channel:info(zone, Channel), [force_shutdown]
691
    ),
692
    case ShutdownPolicy of
1✔
693
        #{enable := false} ->
694
            State;
×
695
        #{enable := true} ->
696
            case emqx_utils:check_oom(ShutdownPolicy) of
1✔
697
                Shutdown = {shutdown, _Reason} ->
698
                    postpone(Shutdown, State);
×
699
                _Other ->
700
                    State
1✔
701
            end
702
    end.
703

704
%%--------------------------------------------------------------------
705
%% Parse incoming data
706
%%--------------------------------------------------------------------
707

708
parse_incoming(<<>>, Packets, State) ->
709
    {lists:reverse(Packets), State};
331✔
710
parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
711
    try emqx_frame:parse(Data, ParseState) of
332✔
712
        {more, NParseState} ->
713
            {Packets, State#state{parse_state = NParseState}};
1✔
714
        {ok, Packet, Rest, NParseState} ->
715
            NState = State#state{parse_state = NParseState},
330✔
716
            parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
330✔
717
    catch
718
        throw:{?FRAME_PARSE_ERROR, Reason} ->
719
            ?LOG(info, #{
1✔
720
                reason => Reason,
721
                at_state => emqx_frame:describe_state(ParseState),
722
                input_bytes => Data
723
            }),
×
724
            FrameError = {frame_error, Reason},
1✔
725
            {[{incoming, FrameError} | Packets], State};
1✔
726
        error:Reason:Stacktrace ->
727
            ?LOG(error, #{
×
728
                at_state => emqx_frame:describe_state(ParseState),
729
                input_bytes => Data,
730
                exception => Reason,
731
                stacktrace => Stacktrace
732
            }),
×
733
            FrameError = {frame_error, Reason},
×
734
            {[{incoming, FrameError} | Packets], State}
×
735
    end.
736

737
%%--------------------------------------------------------------------
738
%% Handle incoming packet
739
%%--------------------------------------------------------------------
740

741
handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when
742
    is_record(Packet, mqtt_packet)
743
->
744
    ok = inc_incoming_stats(Packet),
326✔
745
    NState =
326✔
746
        case
747
            emqx_pd:get_counter(incoming_pubs) >
748
                get_active_n(Type, Listener)
749
        of
750
            true -> postpone({cast, rate_limit}, State);
×
751
            false -> State
326✔
752
        end,
753
    with_channel(handle_in, [Packet], NState);
326✔
754
handle_incoming(FrameError, State) ->
755
    with_channel(handle_in, [FrameError], State).
1✔
756

757
%%--------------------------------------------------------------------
758
%% With Channel
759
%%--------------------------------------------------------------------
760

761
with_channel(Fun, Args, State = #state{channel = Channel}) ->
762
    case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
1,570✔
763
        ok ->
764
            return(State);
×
765
        {ok, NChannel} ->
766
            return(State#state{channel = NChannel});
1,156✔
767
        {ok, Replies, NChannel} ->
768
            return(postpone(Replies, State#state{channel = NChannel}));
330✔
769
        {shutdown, Reason, NChannel} ->
770
            shutdown(Reason, State#state{channel = NChannel});
64✔
771
        {shutdown, Reason, Packet, NChannel} ->
772
            NState = State#state{channel = NChannel},
16✔
773
            shutdown(Reason, postpone(Packet, NState))
16✔
774
    end.
775

776
%%--------------------------------------------------------------------
777
%% Handle outgoing packets
778
%%--------------------------------------------------------------------
779

780
handle_outgoing(
781
    Packets,
782
    State = #state{
783
        mqtt_piggyback = MQTTPiggyback,
784
        listener = {Type, Listener}
785
    }
786
) ->
787
    IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
241✔
788
    Oct = iolist_size(IoData),
241✔
789
    ok = inc_sent_stats(length(Packets), Oct),
241✔
790
    NState =
241✔
791
        case
792
            emqx_pd:get_counter(outgoing_pubs) >
793
                get_active_n(Type, Listener)
794
        of
795
            true ->
796
                Stats = #{
×
797
                    cnt => emqx_pd:reset_counter(outgoing_pubs),
798
                    oct => emqx_pd:reset_counter(outgoing_bytes)
799
                },
800
                postpone({check_gc, Stats}, State);
×
801
            false ->
802
                State
241✔
803
        end,
804

805
    {
241✔
806
        case MQTTPiggyback of
807
            single -> [{binary, IoData}];
×
808
            multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData)
241✔
809
        end,
810
        ensure_stats_timer(NState)
811
    }.
812

813
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
814
    fun(Packet) ->
241✔
815
        try emqx_frame:serialize_pkt(Packet, Serialize) of
302✔
816
            <<>> ->
817
                ?LOG(warning, #{
×
818
                    msg => "packet_discarded",
819
                    reason => "frame_too_large",
820
                    packet => emqx_packet:format(Packet)
821
                }),
×
822
                ok = emqx_metrics:inc('delivery.dropped.too_large'),
×
823
                ok = emqx_metrics:inc('delivery.dropped'),
×
824
                ok = inc_outgoing_stats({error, message_too_large}),
×
825
                <<>>;
×
826
            Data ->
827
                ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
302✔
828
                ok = inc_outgoing_stats(Packet),
302✔
829
                Data
302✔
830
        catch
831
            %% Maybe Never happen.
832
            throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
833
                ?LOG(info, #{
×
834
                    reason => Reason,
835
                    input_packet => Packet
836
                }),
×
837
                erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
×
838
            error:Reason:Stacktrace ->
839
                ?LOG(error, #{
×
840
                    input_packet => Packet,
841
                    exception => Reason,
842
                    stacktrace => Stacktrace
843
                }),
×
844
                erlang:error(?FRAME_SERIALIZE_ERROR)
×
845
        end
846
    end.
847

848
%%--------------------------------------------------------------------
849
%% Inc incoming/outgoing stats
850
%%--------------------------------------------------------------------
851

852
-compile(
853
    {inline, [
854
        inc_recv_stats/2,
855
        inc_incoming_stats/1,
856
        inc_outgoing_stats/1,
857
        inc_sent_stats/2
858
    ]}
859
).
860

861
inc_recv_stats(Cnt, Oct) ->
862
    inc_counter(incoming_bytes, Oct),
329✔
863
    inc_counter(recv_cnt, Cnt),
329✔
864
    inc_counter(recv_oct, Oct),
329✔
865
    emqx_metrics:inc('bytes.received', Oct).
329✔
866

867
inc_incoming_stats(Packet = ?PACKET(Type)) ->
868
    _ = emqx_pd:inc_counter(recv_pkt, 1),
326✔
869
    case Type of
326✔
870
        ?PUBLISH ->
871
            inc_counter(recv_msg, 1),
1✔
872
            inc_qos_stats(recv_msg, Packet),
1✔
873
            inc_counter(incoming_pubs, 1);
1✔
874
        _ ->
875
            ok
325✔
876
    end,
877
    emqx_metrics:inc_recv(Packet).
326✔
878

879
inc_outgoing_stats({error, message_too_large}) ->
880
    inc_counter('send_msg.dropped', 1),
×
881
    inc_counter('send_msg.dropped.too_large', 1);
×
882
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
883
    inc_counter(send_pkt, 1),
302✔
884
    case Type of
302✔
885
        ?PUBLISH ->
886
            inc_counter(send_msg, 1),
107✔
887
            inc_counter(outgoing_pubs, 1),
107✔
888
            inc_qos_stats(send_msg, Packet);
107✔
889
        _ ->
890
            ok
195✔
891
    end,
892
    emqx_metrics:inc_sent(Packet).
302✔
893

894
inc_sent_stats(Cnt, Oct) ->
895
    inc_counter(outgoing_bytes, Oct),
241✔
896
    inc_counter(send_cnt, Cnt),
241✔
897
    inc_counter(send_oct, Oct),
241✔
898
    emqx_metrics:inc('bytes.sent', Oct).
241✔
899

900
inc_counter(Name, Value) ->
901
    _ = emqx_pd:inc_counter(Name, Value),
2,336✔
902
    ok.
2,336✔
903

904
inc_qos_stats(Type, Packet) ->
905
    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
108✔
906
        undefined ->
907
            ignore;
×
908
        Key ->
909
            inc_counter(Key, 1)
108✔
910
    end.
911

912
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
2✔
913
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
62✔
914
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
43✔
915
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
×
916
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
1✔
917
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
×
918
%% for bad qos
919
inc_qos_stats_key(_, _) -> undefined.
×
920

921
%%--------------------------------------------------------------------
922
%% Helper functions
923
%%--------------------------------------------------------------------
924

925
-compile({inline, [cancel_idle_timer/1, ensure_stats_timer/1]}).
926

927
%%--------------------------------------------------------------------
928
%% Cancel idle timer
929

930
cancel_idle_timer(State = #state{idle_timer = IdleTimer}) ->
931
    ok = emqx_utils:cancel_timer(IdleTimer),
105✔
932
    State#state{idle_timer = undefined}.
105✔
933

934
%%--------------------------------------------------------------------
935
%% Ensure stats timer
936

937
ensure_stats_timer(
938
    State = #state{
939
        idle_timeout = Timeout,
940
        stats_timer = undefined
941
    }
942
) ->
943
    State#state{stats_timer = start_timer(Timeout, emit_stats)};
113✔
944
ensure_stats_timer(State) ->
945
    State.
457✔
946

947
-compile({inline, [postpone/2, enqueue/2, return/1, shutdown/2]}).
948

949
%%--------------------------------------------------------------------
950
%% Postpone the packet, cmd or event
951

952
postpone(Packet, State) when is_record(Packet, mqtt_packet) ->
953
    enqueue(Packet, State);
59✔
954
postpone(Event, State) when is_tuple(Event) ->
955
    enqueue(Event, State);
757✔
956
postpone(More, State) when is_list(More) ->
957
    lists:foldl(fun postpone/2, State, More).
462✔
958

959
enqueue([Packet], State = #state{postponed = Postponed}) ->
960
    State#state{postponed = [Packet | Postponed]};
32✔
961
enqueue(Packets, State = #state{postponed = Postponed}) when
962
    is_list(Packets)
963
->
964
    State#state{postponed = lists:reverse(Packets) ++ Postponed};
20✔
965
enqueue(Other, State = #state{postponed = Postponed}) ->
966
    State#state{postponed = [Other | Postponed]}.
950✔
967

968
shutdown(Reason, State = #state{postponed = Postponed}) ->
969
    return(State#state{postponed = [{shutdown, Reason} | Postponed]}).
104✔
970

971
return(State = #state{postponed = []}) ->
972
    {ok, State};
1,367✔
973
return(State = #state{postponed = Postponed}) ->
974
    {Packets, Cmds, Events} = classify(Postponed, [], [], []),
944✔
975
    ok = lists:foreach(fun trigger/1, Events),
944✔
976
    State1 = State#state{postponed = []},
944✔
977
    case {Packets, Cmds} of
944✔
978
        {[], []} ->
979
            {ok, State1};
548✔
980
        {[], Cmds} ->
981
            {Cmds, State1};
156✔
982
        {Packets, Cmds} ->
983
            {Frames, State2} = handle_outgoing(Packets, State1),
240✔
984
            {Frames ++ Cmds, State2}
240✔
985
    end.
986

987
classify([], Packets, Cmds, Events) ->
988
    {Packets, Cmds, Events};
944✔
989
classify([Packet | More], Packets, Cmds, Events) when
990
    is_record(Packet, mqtt_packet)
991
->
992
    classify(More, [Packet | Packets], Cmds, Events);
300✔
993
classify([Cmd = {active, _} | More], Packets, Cmds, Events) ->
994
    classify(More, Packets, [Cmd | Cmds], Events);
×
995
classify([Cmd = {shutdown, _Reason} | More], Packets, Cmds, Events) ->
996
    classify(More, Packets, [Cmd | Cmds], Events);
104✔
997
classify([Cmd = close | More], Packets, Cmds, Events) ->
998
    classify(More, Packets, [Cmd | Cmds], Events);
×
999
classify([Cmd = {close, _Reason} | More], Packets, Cmds, Events) ->
1000
    classify(More, Packets, [Cmd | Cmds], Events);
70✔
1001
classify([Event | More], Packets, Cmds, Events) ->
1002
    classify(More, Packets, Cmds, [Event | Events]).
689✔
1003

1004
trigger(Event) -> erlang:send(self(), Event).
689✔
1005

1006
get_peer(Req, #{listener := {Type, Listener}}) ->
1007
    {PeerAddr, PeerPort} = cowboy_req:peer(Req),
158✔
1008
    AddrHeaderName = get_ws_header_opts(Type, Listener, proxy_address_header),
158✔
1009
    AddrHeader = cowboy_req:header(AddrHeaderName, Req, <<>>),
158✔
1010
    ClientAddr =
158✔
1011
        case string:tokens(binary_to_list(AddrHeader), ", ") of
1012
            [] ->
1013
                undefined;
157✔
1014
            AddrList ->
1015
                hd(AddrList)
1✔
1016
        end,
1017
    Addr =
158✔
1018
        case inet:parse_address(ClientAddr) of
1019
            {ok, A} ->
1020
                A;
1✔
1021
            _ ->
1022
                PeerAddr
157✔
1023
        end,
1024
    PortHeaderName = get_ws_header_opts(Type, Listener, proxy_port_header),
158✔
1025
    PortHeader = cowboy_req:header(PortHeaderName, Req, <<>>),
158✔
1026
    ClientPort =
158✔
1027
        case string:tokens(binary_to_list(PortHeader), ", ") of
1028
            [] ->
1029
                undefined;
157✔
1030
            PortList ->
1031
                hd(PortList)
1✔
1032
        end,
1033
    try
158✔
1034
        {Addr, list_to_integer(ClientPort)}
158✔
1035
    catch
1036
        _:_ -> {Addr, PeerPort}
157✔
1037
    end.
1038

1039
check_max_connection(Type, Listener) ->
1040
    case emqx_config:get_listener_conf(Type, Listener, [max_connections]) of
158✔
1041
        infinity ->
1042
            allow;
158✔
1043
        Max ->
1044
            MatchSpec = [{{'_', emqx_ws_connection}, [], [true]}],
×
1045
            Curr = ets:select_count(?CHAN_CONN_TAB, MatchSpec),
×
1046
            case Curr >= Max of
×
1047
                false ->
1048
                    allow;
×
1049
                true ->
1050
                    Reason = #{
×
1051
                        max => Max,
1052
                        current => Curr,
1053
                        msg => "websocket_max_connections_limited"
1054
                    },
1055
                    ?SLOG(warning, Reason),
×
1056
                    {denny, Reason}
×
1057
            end
1058
    end.
1059
%%--------------------------------------------------------------------
1060
%% For CT tests
1061
%%--------------------------------------------------------------------
1062

1063
set_field(Name, Value, State) ->
1064
    Pos = emqx_utils:index_of(Name, record_info(fields, state)),
54✔
1065
    setelement(Pos + 1, State, Value).
54✔
1066

1067
%% ensure lowercase letters in headers
1068
get_ws_header_opts(Type, Listener, Key) ->
1069
    iolist_to_binary(string:lowercase(get_ws_opts(Type, Listener, Key))).
316✔
1070

1071
get_ws_opts(Type, Listener, Key) ->
1072
    emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
1,287✔
1073

1074
get_active_n(Type, Listener) ->
1075
    emqx_config:get_listener_conf(Type, Listener, [tcp_options, active_n]).
611✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc