• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 4896233493

pending completion
4896233493

push

github

GitHub
Merge pull request #10615 from qzhuyan/test/william/fix-master-tests

26105 of 31979 relevant lines covered (81.63%)

3476.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.06
/apps/emqx/src/emqx_listeners.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% @doc Start/Stop MQTT listeners.
18
-module(emqx_listeners).
19

20
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]).
21

22
-include("emqx_mqtt.hrl").
23
-include("emqx_schema.hrl").
24
-include("logger.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26

27
%% APIs
28
-export([
29
    list_raw/0,
30
    list/0,
31
    start/0,
32
    restart/0,
33
    stop/0,
34
    is_running/1,
35
    current_conns/2,
36
    max_conns/2,
37
    id_example/0,
38
    default_max_conn/0
39
]).
40

41
-export([
42
    start_listener/1,
43
    start_listener/3,
44
    stop_listener/1,
45
    stop_listener/3,
46
    restart_listener/1,
47
    restart_listener/3,
48
    has_enabled_listener_conf_by_type/1
49
]).
50

51
-export([
52
    listener_id/2,
53
    parse_listener_id/1,
54
    ensure_override_limiter_conf/2,
55
    esockd_access_rules/1
56
]).
57

58
-export([pre_config_update/3, post_config_update/5]).
59

60
-export([format_bind/1]).
61

62
-ifdef(TEST).
63
-export([certs_dir/2]).
64
-endif.
65

66
-type listener_id() :: atom() | binary().
67

68
-define(CONF_KEY_PATH, [listeners, '?', '?']).
69
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
70
-define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ).
71

72
-spec id_example() -> atom().
73
id_example() -> 'tcp:default'.
380✔
74

75
%% @doc List configured listeners.
76
-spec list_raw() -> [{ListenerId :: atom(), Type :: binary(), ListenerConf :: map()}].
77
list_raw() ->
78
    [
162✔
79
        {listener_id(Type, LName), Type, LConf}
792✔
80
     || {Type, LName, LConf} <- do_list_raw()
162✔
81
    ].
82

83
list() ->
84
    Listeners = maps:to_list(emqx:get_config([listeners], #{})),
851✔
85
    lists:flatmap(fun format_list/1, Listeners).
851✔
86

87
format_list(Listener) ->
88
    {Type, Conf} = Listener,
3,357✔
89
    [
3,357✔
90
        begin
91
            Running = is_running(Type, listener_id(Type, LName), LConf),
3,440✔
92
            {listener_id(Type, LName), maps:put(running, Running, LConf)}
3,440✔
93
        end
94
     || {LName, LConf} <- maps:to_list(Conf), is_map(LConf)
3,357✔
95
    ].
96

97
do_list_raw() ->
98
    %% GET /listeners from other nodes returns [] when init config is not loaded.
99
    case emqx_app:get_init_config_load_done() of
162✔
100
        true ->
101
            Key = <<"listeners">>,
157✔
102
            Raw = emqx_config:get_raw([Key], #{}),
157✔
103
            SchemaMod = emqx_config:get_schema_mod(Key),
157✔
104
            #{Key := RawWithDefault} = emqx_config:fill_defaults(SchemaMod, #{Key => Raw}, #{}),
157✔
105
            Listeners = maps:to_list(RawWithDefault),
157✔
106
            lists:flatmap(fun format_raw_listeners/1, Listeners);
157✔
107
        false ->
108
            []
5✔
109
    end.
110

111
format_raw_listeners({Type0, Conf}) ->
112
    Type = binary_to_atom(Type0),
628✔
113
    lists:filtermap(
628✔
114
        fun
115
            ({LName, LConf0}) when is_map(LConf0) ->
116
                Bind = parse_bind(LConf0),
792✔
117
                Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
792✔
118
                LConf1 = maps:remove(<<"authentication">>, LConf0),
792✔
119
                LConf2 = maps:put(<<"running">>, Running, LConf1),
792✔
120
                CurrConn =
792✔
121
                    case Running of
122
                        true -> current_conns(Type, LName, Bind);
722✔
123
                        false -> 0
70✔
124
                    end,
125
                LConf = maps:put(<<"current_connections">>, CurrConn, LConf2),
792✔
126
                {true, {Type0, LName, LConf}};
792✔
127
            ({_LName, _MarkDel}) ->
128
                false
×
129
        end,
130
        maps:to_list(Conf)
131
    ).
132

133
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
134
is_running(ListenerId) ->
135
    case
70✔
136
        [
137
            Running
42✔
138
         || {Id, #{running := Running}} <- list(),
70✔
139
            Id =:= ListenerId
362✔
140
        ]
141
    of
142
        [] -> {error, not_found};
28✔
143
        [IsRunning] -> IsRunning
42✔
144
    end.
145

146
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
147
    #{bind := ListenOn} = Conf,
2,116✔
148
    try esockd:listener({ListenerId, ListenOn}) of
2,116✔
149
        Pid when is_pid(Pid) ->
150
            true
1,332✔
151
    catch
152
        _:_ ->
153
            false
784✔
154
    end;
155
is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss ->
156
    try
2,100✔
157
        Info = ranch:info(ListenerId),
2,100✔
158
        proplists:get_value(status, Info) =:= running
1,323✔
159
    catch
160
        _:_ ->
161
            false
777✔
162
    end;
163
is_running(quic, ListenerId, _Conf) ->
164
    case quicer:listener(ListenerId) of
16✔
165
        {ok, Pid} when is_pid(Pid) ->
166
            true;
15✔
167
        _ ->
168
            false
1✔
169
    end.
170

171
current_conns(ID, ListenOn) ->
172
    {ok, #{type := Type, name := Name}} = parse_listener_id(ID),
1✔
173
    current_conns(Type, Name, ListenOn).
1✔
174

175
current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
176
    esockd:get_current_connections({listener_id(Type, Name), ListenOn});
365✔
177
current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
178
    proplists:get_value(all_connections, ranch:info(listener_id(Type, Name)));
358✔
179
current_conns(quic, _Name, _ListenOn) ->
180
    case quicer:perf_counters() of
×
181
        {ok, PerfCnts} -> proplists:get_value(conn_active, PerfCnts);
×
182
        _ -> 0
×
183
    end;
184
current_conns(_, _, _) ->
185
    {error, not_support}.
×
186

187
max_conns(ID, ListenOn) ->
188
    {ok, #{type := Type, name := Name}} = parse_listener_id(ID),
1✔
189
    max_conns(Type, Name, ListenOn).
1✔
190

191
max_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
192
    esockd:get_max_connections({listener_id(Type, Name), ListenOn});
1✔
193
max_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
194
    proplists:get_value(max_connections, ranch:info(listener_id(Type, Name)));
×
195
max_conns(_, _, _) ->
196
    {error, not_support}.
×
197

198
%% @doc Start all listeners.
199
-spec start() -> ok.
200
start() ->
201
    %% The ?MODULE:start/0 will be called by emqx_app when emqx get started,
202
    %% so we install the config handler here.
203
    ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
371✔
204
    foreach_listeners(fun start_listener/3).
371✔
205

206
-spec start_listener(listener_id()) -> ok | {error, term()}.
207
start_listener(ListenerId) ->
208
    apply_on_listener(ListenerId, fun start_listener/3).
384✔
209

210
-spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
211
start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
212
    case do_start_listener(Type, ListenerName, Conf) of
1,902✔
213
        {ok, {skipped, Reason}} when
214
            Reason =:= listener_disabled;
215
            Reason =:= quic_app_missing
216
        ->
217
            ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
6✔
218
            console_print(
6✔
219
                "Listener ~ts is NOT started due to: ~p.~n",
220
                [listener_id(Type, ListenerName), Reason]
221
            ),
222
            ok;
6✔
223
        {ok, _} ->
224
            ?tp(listener_started, #{type => Type, bind => Bind}),
1,523✔
225
            console_print(
1,523✔
226
                "Listener ~ts on ~ts started.~n",
227
                [listener_id(Type, ListenerName), format_bind(Bind)]
228
            ),
229
            ok;
1,523✔
230
        {error, {already_started, Pid}} ->
231
            ?tp(listener_not_started, #{
364✔
232
                type => Type, bind => Bind, status => {already_started, Pid}
233
            }),
234
            {error, {already_started, Pid}};
364✔
235
        {error, Reason} ->
236
            ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
9✔
237
            ListenerId = listener_id(Type, ListenerName),
9✔
238
            BindStr = format_bind(Bind),
9✔
239
            ?ELOG(
9✔
240
                "Failed to start listener ~ts on ~ts: ~0p.~n",
241
                [ListenerId, BindStr, Reason]
242
            ),
243
            Msg = lists:flatten(
9✔
244
                io_lib:format(
245
                    "~ts(~ts) : ~p",
246
                    [ListenerId, BindStr, filter_stacktrace(Reason)]
247
                )
248
            ),
249
            {error, {failed_to_start, Msg}}
9✔
250
    end.
251

252
%% @doc Restart all listeners
253
-spec restart() -> ok.
254
restart() ->
255
    foreach_listeners(fun restart_listener/3).
2✔
256

257
-spec restart_listener(listener_id()) -> ok | {error, term()}.
258
restart_listener(ListenerId) ->
259
    apply_on_listener(ListenerId, fun restart_listener/3).
8✔
260

261
-spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}.
262
restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
263
    restart_listener(Type, ListenerName, OldConf, NewConf);
15✔
264
restart_listener(Type, ListenerName, Conf) ->
265
    restart_listener(Type, ListenerName, Conf, Conf).
8✔
266

267
restart_listener(Type, ListenerName, OldConf, NewConf) ->
268
    case do_stop_listener(Type, ListenerName, OldConf) of
23✔
269
        ok -> start_listener(Type, ListenerName, NewConf);
23✔
270
        {error, not_found} -> start_listener(Type, ListenerName, NewConf);
×
271
        {error, Reason} -> {error, Reason}
×
272
    end.
273

274
%% @doc Stop all listeners.
275
-spec stop() -> ok.
276
stop() ->
277
    %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown,
278
    %% so we uninstall the config handler here.
279
    _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
363✔
280
    foreach_listeners(fun stop_listener/3).
363✔
281

282
-spec stop_listener(listener_id()) -> ok | {error, term()}.
283
stop_listener(ListenerId) ->
284
    apply_on_listener(ListenerId, fun stop_listener/3).
2✔
285

286
stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
287
    case do_stop_listener(Type, ListenerName, Conf) of
1,474✔
288
        ok ->
289
            console_print(
1,459✔
290
                "Listener ~ts on ~ts stopped.~n",
291
                [listener_id(Type, ListenerName), format_bind(Bind)]
292
            ),
293
            ok;
1,459✔
294
        {error, not_found} ->
295
            ?ELOG(
15✔
296
                "Failed to stop listener ~ts on ~ts: ~0p~n",
297
                [listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
298
            ),
299
            ok;
15✔
300
        {error, Reason} ->
301
            ?ELOG(
×
302
                "Failed to stop listener ~ts on ~ts: ~0p~n",
303
                [listener_id(Type, ListenerName), format_bind(Bind), Reason]
304
            ),
305
            {error, Reason}
×
306
    end.
307

308
-spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
309

310
do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl ->
311
    Id = listener_id(Type, ListenerName),
754✔
312
    del_limiter_bucket(Id, Conf),
754✔
313
    esockd:close(Id, ListenOn);
754✔
314
do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss ->
315
    Id = listener_id(Type, ListenerName),
727✔
316
    del_limiter_bucket(Id, Conf),
727✔
317
    cowboy:stop_listener(Id);
727✔
318
do_stop_listener(quic, ListenerName, Conf) ->
319
    Id = listener_id(quic, ListenerName),
16✔
320
    del_limiter_bucket(Id, Conf),
16✔
321
    quicer:stop_listener(Id).
16✔
322

323
-ifndef(TEST).
324
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
325
-else.
326
console_print(_Fmt, _Args) -> ok.
2,988✔
327
-endif.
328

329
%% Start MQTT/TCP listener
330
-spec do_start_listener(atom(), atom(), map()) ->
331
    {ok, pid() | {skipped, atom()}} | {error, term()}.
332
do_start_listener(_Type, _ListenerName, #{enabled := false}) ->
333
    {ok, {skipped, listener_disabled}};
6✔
334
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
335
    Type == tcp; Type == ssl
336
->
337
    Id = listener_id(Type, ListenerName),
772✔
338
    add_limiter_bucket(Id, Opts),
772✔
339
    esockd:open(
772✔
340
        Id,
341
        ListenOn,
342
        merge_default(esockd_opts(Id, Type, Opts)),
343
        {emqx_connection, start_link, [
344
            #{
345
                listener => {Type, ListenerName},
346
                zone => zone(Opts),
347
                limiter => limiter(Opts),
348
                enable_authn => enable_authn(Opts)
349
            }
350
        ]}
351
    );
352
%% Start MQTT/WS listener
353
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
354
    Type == ws; Type == wss
355
->
356
    Id = listener_id(Type, ListenerName),
743✔
357
    add_limiter_bucket(Id, Opts),
743✔
358
    RanchOpts = ranch_opts(Type, ListenOn, Opts),
743✔
359
    WsOpts = ws_opts(Type, ListenerName, Opts),
743✔
360
    case Type of
743✔
361
        ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
371✔
362
        wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
372✔
363
    end;
364
%% Start MQTT/QUIC listener
365
do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
366
    ListenOn =
381✔
367
        case Bind of
368
            {Addr, Port} when tuple_size(Addr) == 4 ->
369
                %% IPv4
370
                lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
×
371
            {Addr, Port} when tuple_size(Addr) == 8 ->
372
                %% IPv6
373
                lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
×
374
            Port ->
375
                Port
381✔
376
        end,
377

378
    case [A || {quicer, _, _} = A <- application:which_applications()] of
381✔
379
        [_] ->
380
            DefAcceptors = erlang:system_info(schedulers_online) * 8,
381✔
381
            SSLOpts = maps:merge(
381✔
382
                maps:with([certfile, keyfile], Opts),
383
                maps:get(ssl_options, Opts, #{})
384
            ),
385
            ListenOpts =
381✔
386
                [
387
                    {certfile, str(maps:get(certfile, SSLOpts))},
388
                    {keyfile, str(maps:get(keyfile, SSLOpts))},
389
                    {alpn, ["mqtt"]},
390
                    {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
391
                    {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
392
                    {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
393
                    {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
394
                    {server_resumption_level, maps:get(server_resumption_level, Opts, 2)},
395
                    {verify, maps:get(verify, SSLOpts, verify_none)}
396
                ] ++
397
                    case maps:get(cacertfile, SSLOpts, undefined) of
398
                        undefined -> [];
380✔
399
                        CaCertFile -> [{cacertfile, str(CaCertFile)}]
1✔
400
                    end ++
401
                    case maps:get(password, SSLOpts, undefined) of
402
                        undefined -> [];
380✔
403
                        Password -> [{password, str(Password)}]
1✔
404
                    end ++
405
                    optional_quic_listener_opts(Opts),
406
            ConnectionOpts = #{
381✔
407
                conn_callback => emqx_quic_connection,
408
                peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
409
                peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
410
                zone => zone(Opts),
411
                listener => {quic, ListenerName},
412
                limiter => limiter(Opts)
413
            },
414
            StreamOpts = #{
381✔
415
                stream_callback => emqx_quic_stream,
416
                active => 1
417
            },
418
            Id = listener_id(quic, ListenerName),
381✔
419
            add_limiter_bucket(Id, Opts),
381✔
420
            quicer:start_listener(
381✔
421
                Id,
422
                ListenOn,
423
                {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
424
            );
425
        [] ->
426
            {ok, {skipped, quic_app_missing}}
×
427
    end.
428

429
%% Update the listeners at runtime
430
pre_config_update([listeners, Type, Name], {create, NewConf}, V) when
431
    V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
432
->
433
    CertsDir = certs_dir(Type, Name),
30✔
434
    {ok, convert_certs(CertsDir, NewConf)};
30✔
435
pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
436
    {error, already_exist};
×
437
pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) ->
438
    {error, not_found};
×
439
pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
440
    NewConfT = emqx_utils_maps:deep_merge(RawConf, Request),
21✔
441
    NewConf = ensure_override_limiter_conf(NewConfT, Request),
21✔
442
    CertsDir = certs_dir(Type, Name),
21✔
443
    {ok, convert_certs(CertsDir, NewConf)};
21✔
444
pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
445
    NewConf = emqx_utils_maps:deep_merge(RawConf, Updated),
14✔
446
    {ok, NewConf};
14✔
447
pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) ->
448
    {ok, ?TOMBSTONE_VALUE};
36✔
449
pre_config_update(_Path, _Request, RawConf) ->
450
    {ok, RawConf}.
×
451

452
post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
453
    start_listener(Type, Name, NewConf);
30✔
454
post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
455
    try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
21✔
456
    ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
21✔
457
    case NewConf of
21✔
458
        #{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf});
13✔
459
        _ -> ok
8✔
460
    end;
461
post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when
462
    Op =:= ?MARK_DEL andalso is_map(OldConf)
463
->
464
    ok = unregister_ocsp_stapling_refresh(Type, Name),
22✔
465
    case stop_listener(Type, Name, OldConf) of
22✔
466
        ok ->
467
            _ = emqx_authentication:delete_chain(listener_id(Type, Name)),
22✔
468
            CertsDir = certs_dir(Type, Name),
22✔
469
            clear_certs(CertsDir, OldConf);
22✔
470
        Err ->
471
            Err
×
472
    end;
473
post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
474
    #{enabled := NewEnabled} = NewConf,
14✔
475
    #{enabled := OldEnabled} = OldConf,
14✔
476
    case {NewEnabled, OldEnabled} of
14✔
477
        {true, true} ->
478
            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
2✔
479
            restart_listener(Type, Name, {OldConf, NewConf});
2✔
480
        {true, false} ->
481
            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
2✔
482
            start_listener(Type, Name, NewConf);
2✔
483
        {false, true} ->
484
            ok = unregister_ocsp_stapling_refresh(Type, Name),
10✔
485
            stop_listener(Type, Name, OldConf);
10✔
486
        {false, false} ->
487
            ok = unregister_ocsp_stapling_refresh(Type, Name),
×
488
            stop_listener(Type, Name, OldConf)
×
489
    end;
490
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
491
    ok.
48✔
492

493
esockd_opts(ListenerId, Type, Opts0) ->
494
    Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
772✔
495
    Limiter = limiter(Opts0),
772✔
496
    Opts2 =
772✔
497
        case emqx_limiter_schema:extract_with_type(connection, Limiter) of
498
            undefined ->
499
                Opts1;
772✔
500
            BucketCfg ->
501
                Opts1#{
×
502
                    limiter => emqx_esockd_htb_limiter:new_create_options(
503
                        ListenerId, connection, BucketCfg
504
                    )
505
                }
506
        end,
507
    Opts3 = Opts2#{
772✔
508
        access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])),
509
        tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
510
    },
511
    maps:to_list(
772✔
512
        case Type of
513
            tcp ->
514
                Opts3#{tcp_options => tcp_opts(Opts0)};
385✔
515
            ssl ->
516
                OptsWithCRL = inject_crl_config(Opts0),
387✔
517
                OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
387✔
518
                SSLOpts = ssl_opts(OptsWithSNI),
387✔
519
                Opts3#{ssl_options => SSLOpts, tcp_options => tcp_opts(Opts0)}
387✔
520
        end
521
    ).
522

523
ws_opts(Type, ListenerName, Opts) ->
524
    WsPaths = [
743✔
525
        {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
526
            zone => zone(Opts),
527
            listener => {Type, ListenerName},
528
            limiter => limiter(Opts),
529
            enable_authn => enable_authn(Opts)
530
        }}
531
    ],
532
    Dispatch = cowboy_router:compile([{'_', WsPaths}]),
743✔
533
    ProxyProto = maps:get(proxy_protocol, Opts, false),
743✔
534
    #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
743✔
535

536
ranch_opts(Type, ListenOn, Opts) ->
537
    NumAcceptors = maps:get(acceptors, Opts, 4),
743✔
538
    MaxConnections = maps:get(max_connections, Opts, 1024),
743✔
539
    SocketOpts =
743✔
540
        case Type of
541
            wss -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts));
372✔
542
            ws -> tcp_opts(Opts)
371✔
543
        end,
544
    #{
743✔
545
        num_acceptors => NumAcceptors,
546
        max_connections => MaxConnections,
547
        handshake_timeout => maps:get(handshake_timeout, Opts, 15000),
548
        socket_opts => ip_port(ListenOn) ++
549
            %% cowboy don't allow us to set 'reuseaddr'
550
            proplists:delete(reuseaddr, SocketOpts)
551
    }.
552

553
ip_port(Port) when is_integer(Port) ->
554
    [{port, Port}];
×
555
ip_port({Addr, Port}) ->
556
    [{ip, Addr}, {port, Port}].
743✔
557

558
esockd_access_rules(StrRules) ->
559
    Access = fun(S, Acc) ->
1,034✔
560
        [A, CIDR] = string:tokens(S, " "),
769✔
561
        %% esockd rules only use words 'allow' and 'deny', both are existing
562
        %% comparison of strings may be better, but there is a loss of backward compatibility
563
        case emqx_utils:safe_to_existing_atom(A) of
769✔
564
            {ok, Action} ->
565
                [
769✔
566
                    {
567
                        Action,
568
                        case CIDR of
569
                            "all" -> all;
769✔
570
                            _ -> CIDR
×
571
                        end
572
                    }
573
                    | Acc
574
                ];
575
            _ ->
576
                ?SLOG(warning, #{msg => "invalid esockd access rule", rule => S}),
×
577
                Acc
×
578
        end
579
    end,
580
    lists:foldr(Access, [], StrRules).
1,034✔
581

582
merge_default(Options) ->
583
    case lists:keytake(tcp_options, 1, Options) of
772✔
584
        {value, {tcp_options, TcpOpts}, Options1} ->
585
            [{tcp_options, emqx_utils:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1];
772✔
586
        false ->
587
            [{tcp_options, ?MQTT_SOCKOPTS} | Options]
×
588
    end.
589

590
-spec format_bind(
591
    integer() | {tuple(), integer()} | string() | binary()
592
) -> io_lib:chars().
593
format_bind(Port) when is_integer(Port) ->
594
    %% **Note**:
595
    %% 'For TCP, UDP and IP networks, if the host is empty or a literal
596
    %% unspecified IP address, as in ":80", "0.0.0.0:80" or "[::]:80" for
597
    %% TCP and UDP, "", "0.0.0.0" or "::" for IP, the local system is
598
    %% assumed.'
599
    %%
600
    %% Quoted from: https://pkg.go.dev/net
601
    %% Decided to use this format to display the bind for all interfaces and
602
    %% IPv4/IPv6 support
603
    io_lib:format(":~w", [Port]);
493✔
604
format_bind({Addr, Port}) when is_list(Addr) ->
605
    io_lib:format("~ts:~w", [Addr, Port]);
4✔
606
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 4 ->
607
    io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
3,099✔
608
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 8 ->
609
    io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]);
1✔
610
%% Support string, binary type for Port or IP:Port
611
format_bind(Str) when is_list(Str) ->
612
    case emqx_schema:to_ip_port(Str) of
132✔
613
        {ok, {Ip, Port}} ->
614
            format_bind({Ip, Port});
129✔
615
        {ok, Port} ->
616
            format_bind(Port);
3✔
617
        {error, _} ->
618
            format_bind(list_to_integer(Str))
×
619
    end;
620
format_bind(Bin) when is_binary(Bin) ->
621
    format_bind(binary_to_list(Bin)).
130✔
622

623
listener_id(Type, ListenerName) ->
624
    list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
19,834✔
625

626
-spec parse_listener_id(listener_id()) -> {ok, #{type => atom(), name => atom()}} | {error, term()}.
627
parse_listener_id(Id) ->
628
    case string:split(str(Id), ":", leading) of
3,711✔
629
        [Type, Name] ->
630
            case lists:member(Type, ?TYPES_STRING) of
3,710✔
631
                true -> {ok, #{type => list_to_existing_atom(Type), name => list_to_atom(Name)}};
3,710✔
632
                false -> {error, {invalid_listener_id, Id}}
×
633
            end;
634
        _ ->
635
            {error, {invalid_listener_id, Id}}
×
636
    end.
637

638
zone(Opts) ->
639
    maps:get(zone, Opts, undefined).
2,668✔
640

641
limiter(Opts) ->
642
    maps:get(limiter, Opts, undefined).
2,668✔
643

644
add_limiter_bucket(Id, #{limiter := Limiter}) ->
645
    maps:fold(
384✔
646
        fun(Type, Cfg, _) ->
647
            emqx_limiter_server:add_bucket(Id, Type, Cfg)
×
648
        end,
649
        ok,
650
        maps:without([client], Limiter)
651
    );
652
add_limiter_bucket(_Id, _Cfg) ->
653
    ok.
1,512✔
654

655
del_limiter_bucket(Id, #{limiter := Limiters}) ->
656
    lists:foreach(
19✔
657
        fun(Type) ->
658
            emqx_limiter_server:del_bucket(Id, Type)
2✔
659
        end,
660
        maps:keys(Limiters)
661
    );
662
del_limiter_bucket(_Id, _Cfg) ->
663
    ok.
1,478✔
664

665
enable_authn(Opts) ->
666
    maps:get(enable_authn, Opts, true).
1,515✔
667

668
ssl_opts(Opts) ->
669
    emqx_tls_lib:to_server_opts(tls, maps:get(ssl_options, Opts, #{})).
759✔
670

671
tcp_opts(Opts) ->
672
    maps:to_list(
1,515✔
673
        maps:without(
674
            [active_n],
675
            maps:get(tcp_options, Opts, #{})
676
        )
677
    ).
678

679
foreach_listeners(Do) ->
680
    lists:foreach(
736✔
681
        fun({Id, LConf}) ->
682
            {ok, #{type := Type, name := Name}} = parse_listener_id(Id),
2,901✔
683
            case Do(Type, Name, LConf) of
2,901✔
684
                {error, {failed_to_start, _} = Reason} -> error(Reason);
×
685
                {error, {already_started, _}} -> ok;
×
686
                ok -> ok
2,901✔
687
            end
688
        end,
689
        list()
690
    ).
691

692
has_enabled_listener_conf_by_type(Type) ->
693
    lists:any(
×
694
        fun({Id, LConf}) when is_map(LConf) ->
695
            {ok, #{type := Type0}} = parse_listener_id(Id),
×
696
            Type =:= Type0 andalso maps:get(enabled, LConf, true)
×
697
        end,
698
        list()
699
    ).
700

701
apply_on_listener(ListenerId, Do) ->
702
    {ok, #{type := Type, name := Name}} = parse_listener_id(ListenerId),
394✔
703
    case emqx_config:find_listener_conf(Type, Name, []) of
393✔
704
        {not_found, _, _} -> error({listener_config_not_found, Type, Name});
×
705
        {ok, Conf} -> Do(Type, Name, Conf)
393✔
706
    end.
707

708
str(A) when is_atom(A) ->
709
    atom_to_list(A);
40,195✔
710
str(B) when is_binary(B) ->
711
    binary_to_list(B);
3,183✔
712
str(S) when is_list(S) ->
713
    S.
764✔
714

715
parse_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> Bind;
2✔
716
parse_bind(#{<<"bind">> := Bind}) ->
717
    case emqx_schema:to_ip_port(binary_to_list(Bind)) of
790✔
718
        {ok, L} -> L;
790✔
719
        {error, _} -> binary_to_integer(Bind)
×
720
    end.
721

722
%% The relative dir for ssl files.
723
certs_dir(Type, Name) ->
724
    iolist_to_binary(filename:join(["listeners", Type, Name])).
100✔
725

726
convert_certs(CertsDir, Conf) ->
727
    case emqx_tls_lib:ensure_ssl_files(CertsDir, get_ssl_options(Conf)) of
51✔
728
        {ok, undefined} ->
729
            Conf;
24✔
730
        {ok, SSL} ->
731
            Conf#{<<"ssl_options">> => SSL};
27✔
732
        {error, Reason} ->
733
            ?SLOG(error, Reason#{msg => "bad_ssl_config"}),
×
734
            throw({bad_ssl_config, Reason})
×
735
    end.
736

737
clear_certs(CertsDir, Conf) ->
738
    OldSSL = get_ssl_options(Conf),
22✔
739
    emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
22✔
740

741
filter_stacktrace({Reason, _Stacktrace}) -> Reason;
5✔
742
filter_stacktrace(Reason) -> Reason.
4✔
743

744
%% limiter config should override, not merge
745
ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) ->
746
    Conf#{<<"limiter">> => Limiter};
×
747
ensure_override_limiter_conf(Conf, _) ->
748
    Conf.
42✔
749

750
try_clear_ssl_files(CertsDir, NewConf, OldConf) ->
751
    NewSSL = get_ssl_options(NewConf),
21✔
752
    OldSSL = get_ssl_options(OldConf),
21✔
753
    emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL, OldSSL).
21✔
754

755
get_ssl_options(Conf) ->
756
    case maps:find(ssl_options, Conf) of
115✔
757
        {ok, SSL} ->
758
            SSL;
36✔
759
        error ->
760
            maps:get(<<"ssl_options">>, Conf, undefined)
79✔
761
    end.
762

763
%% @doc Get QUIC optional settings for low level tunings.
764
%% @see quicer:quic_settings()
765
-spec optional_quic_listener_opts(map()) -> proplists:proplist().
766
optional_quic_listener_opts(Conf) when is_map(Conf) ->
767
    maps:to_list(
381✔
768
        maps:filter(
769
            fun(Name, _V) ->
770
                lists:member(
3,841✔
771
                    Name,
772
                    quic_listener_optional_settings()
773
                )
774
            end,
775
            Conf
776
        )
777
    ).
778

779
-spec quic_listener_optional_settings() -> [atom()].
780
quic_listener_optional_settings() ->
781
    [
3,841✔
782
        max_bytes_per_key,
783
        %% In conf schema we use handshake_idle_timeout
784
        handshake_idle_timeout_ms,
785
        %% In conf schema we use idle_timeout
786
        idle_timeout_ms,
787
        %% not use since we are server
788
        %% tls_client_max_send_buffer,
789
        tls_server_max_send_buffer,
790
        stream_recv_window_default,
791
        stream_recv_buffer_default,
792
        conn_flow_control_window,
793
        max_stateless_operations,
794
        initial_window_packets,
795
        send_idle_timeout_ms,
796
        initial_rtt_ms,
797
        max_ack_delay_ms,
798
        disconnect_timeout_ms,
799
        %% In conf schema,  we use keep_alive_interval
800
        keep_alive_interval_ms,
801
        %% over written by conn opts
802
        peer_bidi_stream_count,
803
        %% over written by conn opts
804
        peer_unidi_stream_count,
805
        retry_memory_limit,
806
        load_balancing_mode,
807
        max_operations_per_drain,
808
        send_buffering_enabled,
809
        pacing_enabled,
810
        migration_enabled,
811
        datagram_receive_enabled,
812
        server_resumption_level,
813
        minimum_mtu,
814
        maximum_mtu,
815
        mtu_discovery_search_complete_timeout_us,
816
        mtu_discovery_missing_probe_count,
817
        max_binding_stateless_operations,
818
        stateless_operation_expiration_ms
819
    ].
820

821
inject_sni_fun(ListenerId, Conf = #{ssl_options := #{ocsp := #{enable_ocsp_stapling := true}}}) ->
822
    emqx_ocsp_cache:inject_sni_fun(ListenerId, Conf);
5✔
823
inject_sni_fun(_ListenerId, Conf) ->
824
    Conf.
382✔
825

826
inject_crl_config(
827
    Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
828
) ->
829
    HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
7✔
830
    Conf#{
7✔
831
        ssl_options := SSLOpts#{
832
            %% `crl_check => true' doesn't work
833
            crl_check => peer,
834
            crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
835
        }
836
    };
837
inject_crl_config(Conf) ->
838
    Conf.
380✔
839

840
maybe_unregister_ocsp_stapling_refresh(
841
    ssl = Type, Name, #{ssl_options := #{ocsp := #{enable_ocsp_stapling := false}}} = _Conf
842
) ->
843
    unregister_ocsp_stapling_refresh(Type, Name),
8✔
844
    ok;
8✔
845
maybe_unregister_ocsp_stapling_refresh(_Type, _Name, _Conf) ->
846
    ok.
17✔
847

848
unregister_ocsp_stapling_refresh(Type, Name) ->
849
    ListenerId = listener_id(Type, Name),
40✔
850
    emqx_ocsp_cache:unregister_listener(ListenerId),
40✔
851
    ok.
40✔
852

853
%% There is currently an issue with frontend
854
%% infinity is not a good value for it, so we use 5m for now
855
default_max_conn() ->
856
    %% TODO: <<"infinity">>
857
    5_000_000.
35,991✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc