• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/apps/emqx/src/emqx_schema.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_schema).
18

19
-dialyzer(no_return).
20
-dialyzer(no_match).
21
-dialyzer(no_contracts).
22
-dialyzer(no_unused).
23
-dialyzer(no_fail_call).
24
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
25

26
-include("emqx_schema.hrl").
27
-include("emqx_access_control.hrl").
28
-include_lib("typerefl/include/types.hrl").
29
-include_lib("hocon/include/hoconsc.hrl").
30
-include_lib("logger.hrl").
31

32
-define(MAX_INT_MQTT_PACKET_SIZE, 268435456).
33
-define(MAX_INT_TIMEOUT_MS, 4294967295).
34
%% floor(?MAX_INT_TIMEOUT_MS / 1000).
35
-define(MAX_INT_TIMEOUT_S, 4294967).
36
-define(DEFAULT_WINDOW_TIME, <<"1m">>).
37

38
-type duration() :: integer().
39
-type duration_s() :: integer().
40
-type duration_ms() :: integer().
41
%% ?MAX_INT_TIMEOUT is defined loosely in some OTP modules like
42
%% `erpc', `rpc' `gen' and `peer', despite affecting `receive' blocks
43
%% as well.  It's `2^32 - 1'.
44
-type timeout_duration() :: 0..?MAX_INT_TIMEOUT_MS.
45
-type timeout_duration_s() :: 0..?MAX_INT_TIMEOUT_S.
46
-type timeout_duration_ms() :: 0..?MAX_INT_TIMEOUT_MS.
47
-type bytesize() :: integer().
48
-type wordsize() :: bytesize().
49
-type percent() :: float().
50
-type comma_separated_list() :: list(string()).
51
-type comma_separated_binary() :: [binary()].
52
-type comma_separated_atoms() :: [atom()].
53
-type ip_port() :: tuple() | integer().
54
-type cipher() :: map().
55
-type port_number() :: 1..65535.
56
-type server_parse_option() :: #{
57
    default_port => port_number(),
58
    no_port => boolean(),
59
    supported_schemes => [string()],
60
    default_scheme => string()
61
}.
62
-type url() :: binary().
63
-type json_binary() :: binary().
64

65
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
66
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
67
-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}).
68
-typerefl_from_string({timeout_duration/0, emqx_schema, to_timeout_duration}).
69
-typerefl_from_string({timeout_duration_s/0, emqx_schema, to_timeout_duration_s}).
70
-typerefl_from_string({timeout_duration_ms/0, emqx_schema, to_timeout_duration_ms}).
71
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
72
-typerefl_from_string({wordsize/0, emqx_schema, to_wordsize}).
73
-typerefl_from_string({percent/0, emqx_schema, to_percent}).
74
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
75
-typerefl_from_string({comma_separated_binary/0, emqx_schema, to_comma_separated_binary}).
76
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
77
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
78
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
79
-typerefl_from_string({url/0, emqx_schema, to_url}).
80
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
81

82
-type parsed_server() :: #{
83
    hostname := string(),
84
    port => port_number(),
85
    scheme => string()
86
}.
87

88
-export([
89
    validate_heap_size/1,
90
    validate_packet_size/1,
91
    user_lookup_fun_tr/2,
92
    validate_alarm_actions/1,
93
    validate_keepalive_multiplier/1,
94
    non_empty_string/1,
95
    validations/0,
96
    naive_env_interpolation/1,
97
    ensure_unicode_path/2,
98
    validate_server_ssl_opts/1,
99
    validate_tcp_keepalive/1,
100
    parse_tcp_keepalive/1
101
]).
102

103
-export([qos/0]).
104

105
% workaround: prevent being recognized as unused functions
106
-export([
107
    to_duration/1,
108
    to_duration_s/1,
109
    to_duration_ms/1,
110
    to_timeout_duration/1,
111
    to_timeout_duration_s/1,
112
    to_timeout_duration_ms/1,
113
    mk_duration/2,
114
    to_bytesize/1,
115
    to_wordsize/1,
116
    to_percent/1,
117
    to_comma_separated_list/1,
118
    to_comma_separated_binary/1,
119
    to_ip_port/1,
120
    to_erl_cipher_suite/1,
121
    to_comma_separated_atoms/1,
122
    to_url/1,
123
    to_json_binary/1
124
]).
125

126
-export([
127
    parse_server/2,
128
    parse_servers/2,
129
    servers_validator/2,
130
    servers_sc/2,
131
    convert_servers/1,
132
    convert_servers/2,
133
    mqtt_converter/2
134
]).
135

136
%% tombstone types
137
-export([
138
    tombstone_map/2,
139
    get_tombstone_map_value_type/1
140
]).
141

142
-export([listeners/0]).
143

144
-behaviour(hocon_schema).
145

146
-reflect_type([
147
    duration/0,
148
    duration_s/0,
149
    duration_ms/0,
150
    timeout_duration/0,
151
    timeout_duration_s/0,
152
    timeout_duration_ms/0,
153
    bytesize/0,
154
    wordsize/0,
155
    percent/0,
156
    comma_separated_list/0,
157
    comma_separated_binary/0,
158
    ip_port/0,
159
    cipher/0,
160
    comma_separated_atoms/0,
161
    url/0,
162
    json_binary/0,
163
    port_number/0
164
]).
165

166
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
167
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
168
-export([
169
    server_ssl_opts_schema/2,
170
    client_ssl_opts_schema/1,
171
    ciphers_schema/1,
172
    tls_versions_schema/1,
173
    description_schema/0,
174
    tags_schema/0
175
]).
176
-export([password_converter/2, bin_str_converter/2]).
177
-export([authz_fields/0]).
178
-export([sc/2, map/2]).
179

180
-elvis([{elvis_style, god_modules, disable}]).
181

182
-define(BIT(Bits), (1 bsl (Bits))).
183
-define(MAX_UINT(Bits), (?BIT(Bits) - 1)).
184
-define(DEFAULT_MULTIPLIER, 1.5).
185
-define(DEFAULT_BACKOFF, 0.75).
186

187
namespace() -> emqx.
44,666✔
188

189
tags() ->
190
    [<<"EMQX">>].
92,353✔
191

192
roots() ->
193
    %% TODO change config importance to a field metadata
194
    roots(high) ++ roots(medium) ++ roots(low).
12,033✔
195

196
roots(high) ->
197
    [
198
        {listeners,
199
            sc(
200
                ref("listeners"),
201
                #{importance => ?IMPORTANCE_HIGH}
202
            )},
203
        {mqtt,
204
            sc(
205
                ref("mqtt"),
206
                #{
207
                    desc => ?DESC(mqtt),
208
                    converter => fun ?MODULE:mqtt_converter/2,
209
                    importance => ?IMPORTANCE_MEDIUM
210
                }
211
            )},
212
        {zones,
213
            sc(
214
                map(name, ref("zone")),
215
                #{
216
                    desc => ?DESC(zones),
217
                    importance => ?IMPORTANCE_HIDDEN
218
                }
219
            )}
220
    ] ++
32,236✔
221
        emqx_schema_hooks:injection_point(
222
            'roots.high',
223
            [
224
                %% NOTE: authorization schema here is only to keep emqx app pure
225
                %% the full schema for EMQX node is injected in emqx_conf_schema.
226
                {?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_ATOM,
227
                    sc(
228
                        ref(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME),
229
                        #{importance => ?IMPORTANCE_HIDDEN}
230
                    )}
231
            ]
232
        );
233
roots(medium) ->
234
    [
32,236✔
235
        {broker,
236
            sc(
237
                ref("broker"),
238
                #{
239
                    desc => ?DESC(broker),
240
                    importance => ?IMPORTANCE_HIDDEN
241
                }
242
            )},
243
        {sys_topics,
244
            sc(
245
                ref("sys_topics"),
246
                #{desc => ?DESC(sys_topics)}
247
            )},
248
        {force_shutdown,
249
            sc(
250
                ref("force_shutdown"),
251
                #{}
252
            )},
253
        {overload_protection,
254
            sc(
255
                ref("overload_protection"),
256
                #{importance => ?IMPORTANCE_HIDDEN}
257
            )},
258
        {durable_storage,
259
            sc(
260
                ref(durable_storage),
261
                #{
262
                    importance => ?IMPORTANCE_MEDIUM,
263
                    desc => ?DESC(durable_storage)
264
                }
265
            )}
266
    ];
267
roots(low) ->
268
    [
32,236✔
269
        {force_gc,
270
            sc(
271
                ref("force_gc"),
272
                #{}
273
            )},
274
        {conn_congestion,
275
            sc(
276
                ref("conn_congestion"),
277
                #{
278
                    importance => ?IMPORTANCE_HIDDEN
279
                }
280
            )},
281
        {stats,
282
            sc(
283
                ref("stats"),
284
                #{
285
                    importance => ?IMPORTANCE_HIDDEN
286
                }
287
            )},
288
        {sysmon,
289
            sc(
290
                ref("sysmon"),
291
                #{}
292
            )},
293
        {alarm,
294
            sc(
295
                ref("alarm"),
296
                #{}
297
            )},
298
        {flapping_detect,
299
            sc(
300
                ref("flapping_detect"),
301
                #{
302
                    importance => ?IMPORTANCE_MEDIUM,
303
                    converter => fun flapping_detect_converter/2
304
                }
305
            )},
306
        {session_persistence,
307
            sc(
308
                ref("session_persistence"),
309
                #{
310
                    importance => ?IMPORTANCE_HIDDEN
311
                }
312
            )},
313
        {trace,
314
            sc(
315
                ref("trace"),
316
                #{importance => ?IMPORTANCE_HIDDEN}
317
            )},
318
        {crl_cache,
319
            sc(
320
                ref("crl_cache"),
321
                #{importance => ?IMPORTANCE_HIDDEN}
322
            )}
323
    ].
324

325
fields("stats") ->
326
    [
1,841✔
327
        {"enable",
328
            sc(
329
                boolean(),
UNCOV
330
                #{
×
331
                    default => true,
332
                    importance => ?IMPORTANCE_HIDDEN,
333
                    desc => ?DESC(stats_enable)
334
                }
335
            )}
336
    ];
337
fields("authorization") ->
338
    authz_fields();
339
fields("authz_cache") ->
340
    [
341
        {enable,
342
            sc(
343
                boolean(),
344
                #{
345
                    default => true,
346
                    required => true,
347
                    desc => ?DESC(fields_cache_enable)
348
                }
349
            )},
350
        {max_size,
351
            sc(
352
                range(1, 1048576),
353
                #{
354
                    default => 32,
355
                    desc => ?DESC(fields_cache_max_size)
356
                }
357
            )},
358
        {ttl,
359
            sc(
360
                duration(),
361
                #{
362
                    default => <<"1m">>,
363
                    desc => ?DESC(fields_cache_ttl)
364
                }
365
            )},
366
        {excludes,
367
            sc(hoconsc:array(binary()), #{
368
                default => [],
369
                desc => ?DESC(fields_authz_cache_excludes)
370
            })}
371
    ];
372
fields("mqtt") ->
373
    mqtt_general() ++ mqtt_session();
374
fields("zone") ->
375
    emqx_zone_schema:zones_without_default();
376
fields("flapping_detect") ->
377
    [
378
        {"enable",
379
            sc(
380
                boolean(),
381
                #{
382
                    default => false,
383
                    desc => ?DESC(flapping_detect_enable)
384
                }
385
            )},
386
        {"window_time",
387
            sc(
388
                duration(),
389
                #{
390
                    default => ?DEFAULT_WINDOW_TIME,
391
                    importance => ?IMPORTANCE_HIGH,
392
                    desc => ?DESC(flapping_detect_window_time)
393
                }
394
            )},
395
        {"max_count",
396
            sc(
397
                non_neg_integer(),
398
                #{
399
                    default => 15,
400
                    desc => ?DESC(flapping_detect_max_count)
401
                }
402
            )},
403
        {"ban_time",
404
            sc(
405
                duration(),
406
                #{
407
                    default => <<"5m">>,
408
                    desc => ?DESC(flapping_detect_ban_time)
409
                }
410
            )}
411
    ];
412
fields("force_shutdown") ->
413
    [
414
        {"enable",
415
            sc(
416
                boolean(),
417
                #{
418
                    default => true,
419
                    desc => ?DESC(force_shutdown_enable)
420
                }
421
            )},
422
        {"max_mailbox_size",
423
            sc(
424
                range(0, inf),
425
                #{
426
                    default => 1000,
427
                    aliases => [max_message_queue_len],
428
                    desc => ?DESC(force_shutdown_max_mailbox_size)
429
                }
430
            )},
431
        {"max_heap_size",
432
            sc(
433
                wordsize(),
434
                #{
435
                    default => <<"32MB">>,
436
                    desc => ?DESC(force_shutdown_max_heap_size),
437
                    validator => fun ?MODULE:validate_heap_size/1
438
                }
439
            )}
440
    ];
441
fields("overload_protection") ->
442
    [
443
        {"enable",
444
            sc(
445
                boolean(),
446
                #{
447
                    desc => ?DESC(overload_protection_enable),
448
                    default => false
449
                }
450
            )},
451
        {"backoff_delay",
452
            sc(
453
                range(0, inf),
454
                #{
455
                    desc => ?DESC(overload_protection_backoff_delay),
456
                    default => 1
457
                }
458
            )},
459
        {"backoff_gc",
460
            sc(
461
                boolean(),
462
                #{
463
                    desc => ?DESC(overload_protection_backoff_gc),
464
                    default => false
465
                }
466
            )},
467
        {"backoff_hibernation",
468
            sc(
469
                boolean(),
470
                #{
471
                    desc => ?DESC(overload_protection_backoff_hibernation),
472
                    default => true
473
                }
474
            )},
475
        {"backoff_new_conn",
476
            sc(
477
                boolean(),
478
                #{
479
                    desc => ?DESC(overload_protection_backoff_new_conn),
480
                    default => true
481
                }
482
            )}
483
    ];
484
fields("conn_congestion") ->
485
    [
486
        {"enable_alarm",
487
            sc(
488
                boolean(),
489
                #{
490
                    default => true,
491
                    desc => ?DESC(conn_congestion_enable_alarm)
492
                }
493
            )},
494
        {"min_alarm_sustain_duration",
495
            sc(
496
                duration(),
497
                #{
498
                    default => <<"1m">>,
499
                    desc => ?DESC(conn_congestion_min_alarm_sustain_duration)
500
                }
501
            )}
502
    ];
503
fields("force_gc") ->
504
    [
505
        {"enable",
506
            sc(
507
                boolean(),
508
                #{default => true, desc => ?DESC(force_gc_enable)}
509
            )},
510
        {"count",
511
            sc(
512
                range(0, inf),
513
                #{
514
                    default => 16000,
515
                    desc => ?DESC(force_gc_count)
516
                }
517
            )},
518
        {"bytes",
519
            sc(
520
                bytesize(),
521
                #{
522
                    default => <<"16MB">>,
523
                    desc => ?DESC(force_gc_bytes)
524
                }
525
            )}
526
    ];
527
fields("listeners") ->
528
    listeners();
529
fields("crl_cache") ->
530
    %% Note: we make the refresh interval and HTTP timeout global (not
531
    %% per-listener) because multiple SSL listeners might point to the
532
    %% same URL.  If they had diverging timeout options, it would be
533
    %% confusing.
534
    [
535
        {refresh_interval,
536
            sc(
537
                duration(),
538
                #{
539
                    default => <<"15m">>,
540
                    desc => ?DESC("crl_cache_refresh_interval")
541
                }
542
            )},
543
        {http_timeout,
544
            sc(
545
                duration(),
546
                #{
547
                    default => <<"15s">>,
548
                    desc => ?DESC("crl_cache_refresh_http_timeout")
549
                }
550
            )},
551
        {capacity,
552
            sc(
553
                pos_integer(),
554
                #{
555
                    default => 100,
556
                    desc => ?DESC("crl_cache_capacity")
557
                }
558
            )}
559
    ];
560
fields("mqtt_tcp_listener") ->
561
    mqtt_listener(1883) ++
562
        [
563
            {"tcp_options",
564
                sc(
565
                    ref("tcp_opts"),
566
                    #{}
567
                )}
568
        ];
569
fields("mqtt_ssl_listener") ->
570
    mqtt_listener(8883) ++
571
        [
572
            {"tcp_options",
573
                sc(
574
                    ref("tcp_opts"),
575
                    #{}
576
                )},
577
            {"ssl_options",
578
                sc(
579
                    ref("listener_ssl_opts"),
580
                    #{validator => fun mqtt_ssl_listener_ssl_options_validator/1}
581
                )}
582
        ];
583
fields("mqtt_ws_listener") ->
584
    mqtt_listener(8083) ++
585
        [
586
            {"tcp_options",
587
                sc(
588
                    ref("tcp_opts"),
589
                    #{}
590
                )},
591
            {"websocket",
592
                sc(
593
                    ref("ws_opts"),
594
                    #{}
595
                )}
596
        ];
597
fields("mqtt_wss_listener") ->
598
    mqtt_listener(8084) ++
599
        [
600
            {"tcp_options",
601
                sc(
602
                    ref("tcp_opts"),
603
                    #{}
604
                )},
605
            {"ssl_options",
606
                sc(
607
                    ref("listener_wss_opts"),
608
                    #{validator => fun validate_server_ssl_opts/1}
609
                )},
610
            {"websocket",
611
                sc(
612
                    ref("ws_opts"),
613
                    #{}
614
                )}
615
        ];
616
fields("mqtt_quic_listener") ->
617
    [
618
        {"certfile",
619
            sc(
620
                string(),
621
                #{
622
                    deprecated => {since, "5.1.0"},
623
                    desc => ?DESC(fields_mqtt_quic_listener_certfile),
624
                    importance => ?IMPORTANCE_HIDDEN
625
                }
626
            )},
627
        {"keyfile",
628
            sc(
629
                string(),
630
                #{
631
                    deprecated => {since, "5.1.0"},
632
                    desc => ?DESC(fields_mqtt_quic_listener_keyfile),
633
                    importance => ?IMPORTANCE_HIDDEN
634
                }
635
            )},
636
        {"ciphers", ciphers_schema(quic)},
637

638
        {"max_bytes_per_key",
639
            quic_lowlevel_settings_uint(
640
                1,
641
                ?MAX_UINT(64),
642
                ?DESC(fields_mqtt_quic_listener_max_bytes_per_key)
643
            )},
644
        {"tls_server_max_send_buffer",
645
            quic_lowlevel_settings_uint(
646
                1,
647
                ?MAX_UINT(32),
648
                ?DESC(fields_mqtt_quic_listener_tls_server_max_send_buffer)
649
            )},
650
        {"stream_recv_window_default",
651
            quic_lowlevel_settings_uint(
652
                1,
653
                ?MAX_UINT(32),
654
                ?DESC(fields_mqtt_quic_listener_stream_recv_window_default)
655
            )},
656
        {"stream_recv_buffer_default",
657
            quic_lowlevel_settings_uint(
658
                1,
659
                ?MAX_UINT(32),
660
                ?DESC(fields_mqtt_quic_listener_stream_recv_buffer_default)
661
            )},
662
        {"conn_flow_control_window",
663
            quic_lowlevel_settings_uint(
664
                1,
665
                ?MAX_UINT(32),
666
                ?DESC(fields_mqtt_quic_listener_conn_flow_control_window)
667
            )},
668
        {"max_stateless_operations",
669
            quic_lowlevel_settings_uint(
670
                1,
671
                ?MAX_UINT(32),
672
                ?DESC(fields_mqtt_quic_listener_max_stateless_operations)
673
            )},
674
        {"initial_window_packets",
675
            quic_lowlevel_settings_uint(
676
                1,
677
                ?MAX_UINT(32),
678
                ?DESC(fields_mqtt_quic_listener_initial_window_packets)
679
            )},
680
        {"send_idle_timeout_ms",
681
            quic_lowlevel_settings_uint(
682
                1,
683
                ?MAX_UINT(32),
684
                ?DESC(fields_mqtt_quic_listener_send_idle_timeout_ms)
685
            )},
686
        {"initial_rtt_ms",
687
            quic_lowlevel_settings_uint(
688
                1,
689
                ?MAX_UINT(32),
690
                ?DESC(fields_mqtt_quic_listener_initial_rtt_ms)
691
            )},
692
        {"max_ack_delay_ms",
693
            quic_lowlevel_settings_uint(
694
                1,
695
                ?MAX_UINT(32),
696
                ?DESC(fields_mqtt_quic_listener_max_ack_delay_ms)
697
            )},
698
        {"disconnect_timeout_ms",
699
            quic_lowlevel_settings_uint(
700
                1,
701
                ?MAX_UINT(32),
702
                ?DESC(fields_mqtt_quic_listener_disconnect_timeout_ms)
703
            )},
704
        {"idle_timeout",
705
            sc(
706
                timeout_duration_ms(),
707
                #{
708
                    default => 0,
709
                    desc => ?DESC(fields_mqtt_quic_listener_idle_timeout),
710
                    deprecated => {since, "5.1.0"},
711
                    %% deprecated, use idle_timeout_ms instead
712
                    importance => ?IMPORTANCE_HIDDEN
713
                }
714
            )},
715
        {"idle_timeout_ms",
716
            quic_lowlevel_settings_uint(
717
                0,
718
                ?MAX_UINT(64),
719
                ?DESC(fields_mqtt_quic_listener_idle_timeout_ms)
720
            )},
721
        {"handshake_idle_timeout",
722
            sc(
723
                timeout_duration_ms(),
724
                #{
725
                    default => <<"10s">>,
726
                    desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout),
727
                    deprecated => {since, "5.1.0"},
728
                    %% use handshake_idle_timeout_ms
729
                    importance => ?IMPORTANCE_HIDDEN
730
                }
731
            )},
732
        {"handshake_idle_timeout_ms",
733
            quic_lowlevel_settings_uint(
734
                1,
735
                ?MAX_UINT(64),
736
                ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout_ms)
737
            )},
738
        {"keep_alive_interval",
739
            sc(
740
                timeout_duration_ms(),
741
                #{
742
                    default => 0,
743
                    desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval),
744
                    %% TODO: deprecated => {since, "5.1.0"}
745
                    %% use keep_alive_interval_ms instead
746
                    importance => ?IMPORTANCE_HIDDEN
747
                }
748
            )},
749
        {"keep_alive_interval_ms",
750
            quic_lowlevel_settings_uint(
751
                0,
752
                ?MAX_UINT(32),
753
                ?DESC(fields_mqtt_quic_listener_keep_alive_interval_ms)
754
            )},
755
        {"peer_bidi_stream_count",
756
            quic_lowlevel_settings_uint(
757
                1,
758
                ?MAX_UINT(16),
759
                ?DESC(fields_mqtt_quic_listener_peer_bidi_stream_count)
760
            )},
761
        {"peer_unidi_stream_count",
762
            quic_lowlevel_settings_uint(
763
                0,
764
                ?MAX_UINT(16),
765
                ?DESC(fields_mqtt_quic_listener_peer_unidi_stream_count)
766
            )},
767
        {"retry_memory_limit",
768
            quic_lowlevel_settings_uint(
769
                0,
770
                ?MAX_UINT(16),
771
                ?DESC(fields_mqtt_quic_listener_retry_memory_limit)
772
            )},
773
        {"load_balancing_mode",
774
            quic_lowlevel_settings_uint(
775
                0,
776
                ?MAX_UINT(16),
777
                ?DESC(fields_mqtt_quic_listener_load_balancing_mode)
778
            )},
779
        {"max_operations_per_drain",
780
            quic_lowlevel_settings_uint(
781
                0,
782
                ?MAX_UINT(8),
783
                ?DESC(fields_mqtt_quic_listener_max_operations_per_drain)
784
            )},
785
        {"send_buffering_enabled",
786
            quic_feature_toggle(
787
                ?DESC(fields_mqtt_quic_listener_send_buffering_enabled)
788
            )},
789
        {"pacing_enabled",
790
            quic_feature_toggle(
791
                ?DESC(fields_mqtt_quic_listener_pacing_enabled)
792
            )},
793
        {"migration_enabled",
794
            quic_feature_toggle(
795
                ?DESC(fields_mqtt_quic_listener_migration_enabled)
796
            )},
797
        {"datagram_receive_enabled",
798
            quic_feature_toggle(
799
                ?DESC(fields_mqtt_quic_listener_datagram_receive_enabled)
800
            )},
801
        {"server_resumption_level",
802
            quic_lowlevel_settings_uint(
803
                0,
804
                ?MAX_UINT(8),
805
                ?DESC(fields_mqtt_quic_listener_server_resumption_level)
806
            )},
807
        {"minimum_mtu",
808
            quic_lowlevel_settings_uint(
809
                1,
810
                ?MAX_UINT(16),
811
                ?DESC(fields_mqtt_quic_listener_minimum_mtu)
812
            )},
813
        {"maximum_mtu",
814
            quic_lowlevel_settings_uint(
815
                1,
816
                ?MAX_UINT(16),
817
                ?DESC(fields_mqtt_quic_listener_maximum_mtu)
818
            )},
819
        {"mtu_discovery_search_complete_timeout_us",
820
            quic_lowlevel_settings_uint(
821
                0,
822
                ?MAX_UINT(64),
823
                ?DESC(fields_mqtt_quic_listener_mtu_discovery_search_complete_timeout_us)
824
            )},
825
        {"mtu_discovery_missing_probe_count",
826
            quic_lowlevel_settings_uint(
827
                1,
828
                ?MAX_UINT(8),
829
                ?DESC(fields_mqtt_quic_listener_mtu_discovery_missing_probe_count)
830
            )},
831
        {"max_binding_stateless_operations",
832
            quic_lowlevel_settings_uint(
833
                0,
834
                ?MAX_UINT(16),
835
                ?DESC(fields_mqtt_quic_listener_max_binding_stateless_operations)
836
            )},
837
        {"stateless_operation_expiration_ms",
838
            quic_lowlevel_settings_uint(
839
                0,
840
                ?MAX_UINT(16),
841
                ?DESC(fields_mqtt_quic_listener_stateless_operation_expiration_ms)
842
            )},
843
        {"ssl_options",
844
            sc(
845
                ref("listener_quic_ssl_opts"),
846
                #{
847
                    required => false,
848
                    desc => ?DESC(fields_mqtt_quic_listener_ssl_options)
849
                }
850
            )}
851
    ] ++ base_listener(14567);
852
fields("ws_opts") ->
853
    [
854
        {"mqtt_path",
855
            sc(
856
                string(),
857
                #{
858
                    default => <<"/mqtt">>,
859
                    desc => ?DESC(fields_ws_opts_mqtt_path)
860
                }
861
            )},
862
        {"mqtt_piggyback",
863
            sc(
864
                hoconsc:enum([single, multiple]),
865
                #{
866
                    default => multiple,
867
                    desc => ?DESC(fields_ws_opts_mqtt_piggyback)
868
                }
869
            )},
870
        {"compress",
871
            sc(
872
                boolean(),
873
                #{
874
                    default => false,
875
                    desc => ?DESC(fields_ws_opts_compress)
876
                }
877
            )},
878
        {"idle_timeout",
879
            sc(
880
                duration(),
881
                #{
882
                    default => <<"7200s">>,
883
                    desc => ?DESC(fields_ws_opts_idle_timeout)
884
                }
885
            )},
886
        {"max_frame_size",
887
            sc(
888
                hoconsc:union([infinity, integer()]),
889
                #{
890
                    default => infinity,
891
                    desc => ?DESC(fields_ws_opts_max_frame_size)
892
                }
893
            )},
894
        {"fail_if_no_subprotocol",
895
            sc(
896
                boolean(),
897
                #{
898
                    default => true,
899
                    desc => ?DESC(fields_ws_opts_fail_if_no_subprotocol)
900
                }
901
            )},
902
        {"supported_subprotocols",
903
            sc(
904
                comma_separated_list(),
905
                #{
906
                    default => <<"mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5">>,
907
                    desc => ?DESC(fields_ws_opts_supported_subprotocols)
908
                }
909
            )},
910
        {"check_origin_enable",
911
            sc(
912
                boolean(),
913
                #{
914
                    default => false,
915
                    desc => ?DESC(fields_ws_opts_check_origin_enable)
916
                }
917
            )},
918
        {"allow_origin_absence",
919
            sc(
920
                boolean(),
921
                #{
922
                    default => true,
923
                    desc => ?DESC(fields_ws_opts_allow_origin_absence)
924
                }
925
            )},
926
        {"check_origins",
927
            sc(
928
                comma_separated_binary(),
929
                #{
930
                    default => <<"http://localhost:18083, http://127.0.0.1:18083">>,
931
                    desc => ?DESC(fields_ws_opts_check_origins)
932
                }
933
            )},
934
        {"proxy_address_header",
935
            sc(
936
                string(),
937
                #{
938
                    default => <<"x-forwarded-for">>,
939
                    desc => ?DESC(fields_ws_opts_proxy_address_header)
940
                }
941
            )},
942
        {"proxy_port_header",
943
            sc(
944
                string(),
945
                #{
946
                    default => <<"x-forwarded-port">>,
947
                    desc => ?DESC(fields_ws_opts_proxy_port_header)
948
                }
949
            )},
950
        {"deflate_opts",
951
            sc(
952
                ref("deflate_opts"),
953
                #{}
954
            )},
955
        {"validate_utf8",
956
            sc(
957
                boolean(),
958
                #{
959
                    default => true,
960
                    desc => ?DESC(fields_ws_opts_validate_utf8)
961
                }
962
            )}
963
    ];
964
fields("tcp_opts") ->
965
    [
966
        {"active_n",
967
            sc(
968
                integer(),
969
                #{
970
                    default => 100,
971
                    desc => ?DESC(fields_tcp_opts_active_n)
972
                }
973
            )},
974
        {"backlog",
975
            sc(
976
                pos_integer(),
977
                #{
978
                    default => 1024,
979
                    desc => ?DESC(fields_tcp_opts_backlog)
980
                }
981
            )},
982
        {"send_timeout",
983
            sc(
984
                duration(),
985
                #{
986
                    default => <<"15s">>,
987
                    desc => ?DESC(fields_tcp_opts_send_timeout)
988
                }
989
            )},
990
        {"send_timeout_close",
991
            sc(
992
                boolean(),
993
                #{
994
                    default => true,
995
                    desc => ?DESC(fields_tcp_opts_send_timeout_close)
996
                }
997
            )},
998
        {"recbuf",
999
            sc(
1000
                bytesize(),
1001
                #{
1002
                    example => <<"2KB">>,
1003
                    desc => ?DESC(fields_tcp_opts_recbuf)
1004
                }
1005
            )},
1006
        {"sndbuf",
1007
            sc(
1008
                bytesize(),
1009
                #{
1010
                    example => <<"4KB">>,
1011
                    desc => ?DESC(fields_tcp_opts_sndbuf)
1012
                }
1013
            )},
1014
        {"buffer",
1015
            sc(
1016
                bytesize(),
1017
                #{
1018
                    default => <<"4KB">>,
1019
                    example => <<"4KB">>,
1020
                    desc => ?DESC(fields_tcp_opts_buffer)
1021
                }
1022
            )},
1023
        {"high_watermark",
1024
            sc(
1025
                bytesize(),
1026
                #{
1027
                    default => <<"1MB">>,
1028
                    desc => ?DESC(fields_tcp_opts_high_watermark)
1029
                }
1030
            )},
1031
        {"nodelay",
1032
            sc(
1033
                boolean(),
1034
                #{
1035
                    default => true,
1036
                    desc => ?DESC(fields_tcp_opts_nodelay)
1037
                }
1038
            )},
1039
        {"reuseaddr",
1040
            sc(
1041
                boolean(),
1042
                #{
1043
                    default => true,
1044
                    desc => ?DESC(fields_tcp_opts_reuseaddr)
1045
                }
1046
            )},
1047
        {"keepalive",
1048
            sc(
1049
                string(),
1050
                #{
1051
                    default => <<"none">>,
1052
                    desc => ?DESC(fields_tcp_opts_keepalive),
1053
                    validator => fun validate_tcp_keepalive/1
1054
                }
1055
            )}
1056
    ];
1057
fields("listener_ssl_opts") ->
1058
    server_ssl_opts_schema(
1059
        #{
1060
            depth => 10,
1061
            reuse_sessions => true,
1062
            versions => tls_all_available,
1063
            ciphers => tls_all_available
1064
        },
1065
        false
1066
    );
1067
fields("listener_wss_opts") ->
1068
    server_ssl_opts_schema(
1069
        #{
1070
            depth => 10,
1071
            reuse_sessions => true,
1072
            versions => tls_all_available,
1073
            ciphers => tls_all_available
1074
        },
1075
        true
1076
    );
1077
fields("listener_quic_ssl_opts") ->
1078
    %% Mark unsupported TLS options deprecated.
1079
    Schema0 = server_ssl_opts_schema(#{}, false),
1080
    Schema1 = lists:keydelete("ocsp", 1, Schema0),
1081
    lists:map(
1082
        fun({Name, Schema}) ->
1083
            case is_quic_ssl_opts(Name) of
1084
                true ->
1085
                    {Name, Schema};
1086
                false ->
1087
                    {Name, Schema#{
1088
                        deprecated => {since, "5.0.20"}, importance => ?IMPORTANCE_HIDDEN
1089
                    }}
1090
            end
1091
        end,
1092
        Schema1
1093
    );
1094
fields("ssl_client_opts") ->
1095
    client_ssl_opts_schema(#{});
1096
fields("ocsp") ->
1097
    [
1098
        {enable_ocsp_stapling,
1099
            sc(
1100
                boolean(),
1101
                #{
1102
                    default => false,
1103
                    desc => ?DESC("server_ssl_opts_schema_enable_ocsp_stapling")
1104
                }
1105
            )},
1106
        {responder_url,
1107
            sc(
1108
                url(),
1109
                #{
1110
                    required => false,
1111
                    desc => ?DESC("server_ssl_opts_schema_ocsp_responder_url")
1112
                }
1113
            )},
1114
        {issuer_pem,
1115
            sc(
1116
                binary(),
1117
                #{
1118
                    required => false,
1119
                    desc => ?DESC("server_ssl_opts_schema_ocsp_issuer_pem")
1120
                }
1121
            )},
1122
        {refresh_interval,
1123
            sc(
1124
                duration(),
1125
                #{
1126
                    default => <<"5m">>,
1127
                    desc => ?DESC("server_ssl_opts_schema_ocsp_refresh_interval")
1128
                }
1129
            )},
1130
        {refresh_http_timeout,
1131
            sc(
1132
                duration(),
1133
                #{
1134
                    default => <<"15s">>,
1135
                    desc => ?DESC("server_ssl_opts_schema_ocsp_refresh_http_timeout")
1136
                }
1137
            )}
1138
    ];
1139
fields("deflate_opts") ->
1140
    [
1141
        {"level",
1142
            sc(
1143
                hoconsc:enum([none, default, best_compression, best_speed]),
1144
                #{desc => ?DESC(fields_deflate_opts_level)}
1145
            )},
1146
        {"mem_level",
1147
            sc(
1148
                range(1, 9),
1149
                #{
1150
                    default => 8,
1151
                    desc => ?DESC(fields_deflate_opts_mem_level)
1152
                }
1153
            )},
1154
        {"strategy",
1155
            sc(
1156
                hoconsc:enum([default, filtered, huffman_only, rle]),
1157
                #{
1158
                    default => default,
1159
                    desc => ?DESC(fields_deflate_opts_strategy)
1160
                }
1161
            )},
1162
        {"server_context_takeover",
1163
            sc(
1164
                hoconsc:enum([takeover, no_takeover]),
1165
                #{
1166
                    default => takeover,
1167
                    desc => ?DESC(fields_deflate_opts_server_context_takeover)
1168
                }
1169
            )},
1170
        {"client_context_takeover",
1171
            sc(
1172
                hoconsc:enum([takeover, no_takeover]),
1173
                #{
1174
                    default => takeover,
1175
                    desc => ?DESC(fields_deflate_opts_client_context_takeover)
1176
                }
1177
            )},
1178
        {"server_max_window_bits",
1179
            sc(
1180
                range(8, 15),
1181
                #{
1182
                    default => 15,
1183
                    desc => ?DESC(fields_deflate_opts_server_max_window_bits)
1184
                }
1185
            )},
1186
        {"client_max_window_bits",
1187
            sc(
1188
                range(8, 15),
1189
                #{
1190
                    default => 15,
1191
                    desc => ?DESC(fields_deflate_opts_client_max_window_bits)
1192
                }
1193
            )}
1194
    ];
1195
fields("broker") ->
1196
    [
1197
        {enable_session_registry,
1198
            sc(
1199
                boolean(),
1200
                #{
1201
                    default => true,
1202
                    importance => ?IMPORTANCE_HIGH,
1203
                    desc => ?DESC(broker_enable_session_registry)
1204
                }
1205
            )},
1206
        {session_history_retain,
1207
            sc(
1208
                duration_s(),
1209
                #{
1210
                    default => <<"0s">>,
1211
                    importance => ?IMPORTANCE_LOW,
1212
                    desc => ?DESC("broker_session_history_retain")
1213
                }
1214
            )},
1215
        {session_locking_strategy,
1216
            sc(
1217
                hoconsc:enum([local, leader, quorum, all]),
1218
                #{
1219
                    default => quorum,
1220
                    importance => ?IMPORTANCE_HIDDEN,
1221
                    desc => ?DESC(broker_session_locking_strategy)
1222
                }
1223
            )},
1224
        %% moved to under mqtt root
1225
        {shared_subscription_strategy,
1226
            sc(
1227
                string(),
1228
                #{
1229
                    deprecated => {since, "5.1.0"},
1230
                    importance => ?IMPORTANCE_HIDDEN
1231
                }
1232
            )},
1233
        {shared_dispatch_ack_enabled,
1234
            sc(
1235
                boolean(),
1236
                #{
1237
                    deprecated => {since, "5.1.0"},
1238
                    importance => ?IMPORTANCE_HIDDEN,
1239
                    default => false,
1240
                    desc => ?DESC(broker_shared_dispatch_ack_enabled)
1241
                }
1242
            )},
1243
        {route_batch_clean,
1244
            sc(
1245
                boolean(),
1246
                #{
1247
                    default => true,
1248
                    desc => "This config is stale since 4.3",
1249
                    importance => ?IMPORTANCE_HIDDEN
1250
                }
1251
            )},
1252
        {perf,
1253
            sc(
1254
                ref("broker_perf"),
1255
                #{importance => ?IMPORTANCE_HIDDEN}
1256
            )},
1257
        {routing,
1258
            sc(
1259
                ref("broker_routing"),
1260
                #{importance => ?IMPORTANCE_HIDDEN}
1261
            )},
1262
        %% FIXME: Need new design for shared subscription group
1263
        {shared_subscription_group,
1264
            sc(
1265
                map(name, ref("shared_subscription_group")),
1266
                #{
1267
                    example => #{<<"example_group">> => #{<<"strategy">> => <<"random">>}},
1268
                    desc => ?DESC(shared_subscription_group_strategy),
1269
                    importance => ?IMPORTANCE_HIDDEN
1270
                }
1271
            )}
1272
    ];
1273
fields("broker_routing") ->
1274
    [
1275
        {"storage_schema",
1276
            sc(
1277
                hoconsc:enum([v1, v2]),
1278
                #{
1279
                    default => v2,
1280
                    'readOnly' => true,
1281
                    desc => ?DESC(broker_routing_storage_schema)
1282
                }
1283
            )},
1284
        {"batch_sync",
1285
            sc(
1286
                ref("broker_routing_batch_sync"),
1287
                #{importance => ?IMPORTANCE_HIDDEN}
1288
            )}
1289
    ];
1290
fields("broker_routing_batch_sync") ->
1291
    [
1292
        {"enable_on",
1293
            sc(
1294
                hoconsc:enum([none, core, replicant, all]),
1295
                #{
1296
                    %% TODO
1297
                    %% Make `replicant` the default value after initial release.
1298
                    default => none,
1299
                    desc => ?DESC(broker_routing_batch_sync_enable_on)
1300
                }
1301
            )}
1302
    ];
1303
fields("shared_subscription_group") ->
1304
    [
1305
        {"strategy",
1306
            sc(
1307
                hoconsc:enum([
1308
                    random,
1309
                    round_robin,
1310
                    round_robin_per_group,
1311
                    sticky,
1312
                    local,
1313
                    hash_topic,
1314
                    hash_clientid
1315
                ]),
1316
                #{
1317
                    default => random,
1318
                    desc => ?DESC(shared_subscription_strategy_enum)
1319
                }
1320
            )}
1321
    ];
1322
fields("broker_perf") ->
1323
    [
1324
        {"route_lock_type",
1325
            sc(
1326
                hoconsc:enum([key, tab, global]),
1327
                #{
1328
                    default => key,
1329
                    desc => ?DESC(broker_perf_route_lock_type)
1330
                }
1331
            )},
1332
        {"trie_compaction",
1333
            sc(
1334
                boolean(),
1335
                #{
1336
                    default => true,
1337
                    desc => ?DESC(broker_perf_trie_compaction)
1338
                }
1339
            )}
1340
    ];
1341
fields("sys_topics") ->
1342
    [
1343
        {"sys_msg_interval",
1344
            sc(
1345
                hoconsc:union([disabled, duration()]),
1346
                #{
1347
                    default => <<"1m">>,
1348
                    desc => ?DESC(sys_msg_interval)
1349
                }
1350
            )},
1351
        {"sys_heartbeat_interval",
1352
            sc(
1353
                hoconsc:union([disabled, duration()]),
1354
                #{
1355
                    default => <<"30s">>,
1356
                    desc => ?DESC(sys_heartbeat_interval)
1357
                }
1358
            )},
1359
        {"sys_event_messages",
1360
            sc(
1361
                ref("event_names"),
1362
                #{desc => ?DESC(sys_event_messages)}
1363
            )}
1364
    ];
1365
fields("event_names") ->
1366
    [
1367
        {"client_connected",
1368
            sc(
1369
                boolean(),
1370
                #{
1371
                    default => true,
1372
                    desc => ?DESC(sys_event_client_connected)
1373
                }
1374
            )},
1375
        {"client_disconnected",
1376
            sc(
1377
                boolean(),
1378
                #{
1379
                    default => true,
1380
                    desc => ?DESC(sys_event_client_disconnected)
1381
                }
1382
            )},
1383
        {"client_subscribed",
1384
            sc(
1385
                boolean(),
1386
                #{
1387
                    default => false,
1388
                    desc => ?DESC(sys_event_client_subscribed)
1389
                }
1390
            )},
1391
        {"client_unsubscribed",
1392
            sc(
1393
                boolean(),
1394
                #{
1395
                    default => false,
1396
                    desc => ?DESC(sys_event_client_unsubscribed)
1397
                }
1398
            )}
1399
    ];
1400
fields("sysmon") ->
1401
    [
1402
        {"vm",
1403
            sc(
1404
                ref("sysmon_vm"),
1405
                #{}
1406
            )},
1407
        {"os",
1408
            sc(
1409
                ref("sysmon_os"),
1410
                #{}
1411
            )},
1412
        {"top",
1413
            sc(
1414
                ref("sysmon_top"),
1415
                %% Userful monitoring solution when benchmarking,
1416
                %% but hardly common enough for regular users.
1417
                #{importance => ?IMPORTANCE_HIDDEN}
1418
            )}
1419
    ];
1420
fields("sysmon_vm") ->
1421
    [
1422
        {"process_check_interval",
1423
            sc(
1424
                duration(),
1425
                #{
1426
                    default => <<"30s">>,
1427
                    desc => ?DESC(sysmon_vm_process_check_interval)
1428
                }
1429
            )},
1430
        {"process_high_watermark",
1431
            sc(
1432
                percent(),
1433
                #{
1434
                    default => <<"80%">>,
1435
                    desc => ?DESC(sysmon_vm_process_high_watermark)
1436
                }
1437
            )},
1438
        {"process_low_watermark",
1439
            sc(
1440
                percent(),
1441
                #{
1442
                    default => <<"60%">>,
1443
                    desc => ?DESC(sysmon_vm_process_low_watermark)
1444
                }
1445
            )},
1446
        {"long_gc",
1447
            sc(
1448
                hoconsc:union([disabled, duration()]),
1449
                #{
1450
                    default => disabled,
1451
                    desc => ?DESC(sysmon_vm_long_gc)
1452
                }
1453
            )},
1454
        {"long_schedule",
1455
            sc(
1456
                hoconsc:union([disabled, duration()]),
1457
                #{
1458
                    default => <<"240ms">>,
1459
                    desc => ?DESC(sysmon_vm_long_schedule)
1460
                }
1461
            )},
1462
        {"large_heap",
1463
            sc(
1464
                hoconsc:union([disabled, bytesize()]),
1465
                #{
1466
                    default => <<"32MB">>,
1467
                    desc => ?DESC(sysmon_vm_large_heap)
1468
                }
1469
            )},
1470
        {"busy_dist_port",
1471
            sc(
1472
                boolean(),
1473
                #{
1474
                    default => true,
1475
                    desc => ?DESC(sysmon_vm_busy_dist_port)
1476
                }
1477
            )},
1478
        {"busy_port",
1479
            sc(
1480
                boolean(),
1481
                #{
1482
                    default => true,
1483
                    desc => ?DESC(sysmon_vm_busy_port)
1484
                }
1485
            )}
1486
    ];
1487
fields("sysmon_os") ->
1488
    [
1489
        {"cpu_check_interval",
1490
            sc(
1491
                duration(),
1492
                #{
1493
                    default => <<"60s">>,
1494
                    desc => ?DESC(sysmon_os_cpu_check_interval)
1495
                }
1496
            )},
1497
        {"cpu_high_watermark",
1498
            sc(
1499
                percent(),
1500
                #{
1501
                    default => <<"80%">>,
1502
                    desc => ?DESC(sysmon_os_cpu_high_watermark)
1503
                }
1504
            )},
1505
        {"cpu_low_watermark",
1506
            sc(
1507
                percent(),
1508
                #{
1509
                    default => <<"60%">>,
1510
                    desc => ?DESC(sysmon_os_cpu_low_watermark)
1511
                }
1512
            )},
1513
        {"mem_check_interval",
1514
            sc(
1515
                hoconsc:union([disabled, duration()]),
1516
                #{
1517
                    default => default_mem_check_interval(),
1518
                    desc => ?DESC(sysmon_os_mem_check_interval)
1519
                }
1520
            )},
1521
        {"sysmem_high_watermark",
1522
            sc(
1523
                percent(),
1524
                #{
1525
                    default => <<"70%">>,
1526
                    desc => ?DESC(sysmon_os_sysmem_high_watermark)
1527
                }
1528
            )},
1529
        {"procmem_high_watermark",
1530
            sc(
1531
                percent(),
1532
                #{
1533
                    default => <<"5%">>,
1534
                    desc => ?DESC(sysmon_os_procmem_high_watermark)
1535
                }
1536
            )}
1537
    ];
1538
fields("sysmon_top") ->
1539
    [
1540
        {"num_items",
1541
            sc(
1542
                non_neg_integer(),
1543
                #{
1544
                    mapping => "system_monitor.top_num_items",
1545
                    default => 10,
1546
                    desc => ?DESC(sysmon_top_num_items)
1547
                }
1548
            )},
1549
        {"sample_interval",
1550
            sc(
1551
                emqx_schema:duration(),
1552
                #{
1553
                    mapping => "system_monitor.top_sample_interval",
1554
                    default => <<"2s">>,
1555
                    desc => ?DESC(sysmon_top_sample_interval)
1556
                }
1557
            )},
1558
        {"max_procs",
1559
            sc(
1560
                non_neg_integer(),
1561
                #{
1562
                    mapping => "system_monitor.top_max_procs",
1563
                    default => 1_000_000,
1564
                    desc => ?DESC(sysmon_top_max_procs)
1565
                }
1566
            )},
1567
        {"db_hostname",
1568
            sc(
1569
                string(),
1570
                #{
1571
                    mapping => "system_monitor.db_hostname",
1572
                    desc => ?DESC(sysmon_top_db_hostname),
1573
                    default => <<>>
1574
                }
1575
            )},
1576
        {"db_port",
1577
            sc(
1578
                integer(),
1579
                #{
1580
                    mapping => "system_monitor.db_port",
1581
                    default => 5432,
1582
                    desc => ?DESC(sysmon_top_db_port)
1583
                }
1584
            )},
1585
        {"db_username",
1586
            sc(
1587
                string(),
1588
                #{
1589
                    mapping => "system_monitor.db_username",
1590
                    default => <<"system_monitor">>,
1591
                    desc => ?DESC(sysmon_top_db_username)
1592
                }
1593
            )},
1594
        {"db_password",
1595
            sc(
1596
                binary(),
1597
                #{
1598
                    mapping => "system_monitor.db_password",
1599
                    default => <<"system_monitor_password">>,
1600
                    desc => ?DESC(sysmon_top_db_password),
1601
                    converter => fun password_converter/2,
1602
                    sensitive => true
1603
                }
1604
            )},
1605
        {"db_name",
1606
            sc(
1607
                string(),
1608
                #{
1609
                    mapping => "system_monitor.db_name",
1610
                    default => <<"postgres">>,
1611
                    desc => ?DESC(sysmon_top_db_name)
1612
                }
1613
            )}
1614
    ];
1615
fields("alarm") ->
1616
    [
1617
        {"actions",
1618
            sc(
1619
                hoconsc:array(atom()),
1620
                #{
1621
                    default => [log, publish],
1622
                    validator => fun ?MODULE:validate_alarm_actions/1,
1623
                    example => [log, publish],
1624
                    desc => ?DESC(alarm_actions)
1625
                }
1626
            )},
1627
        {"size_limit",
1628
            sc(
1629
                range(1, 3000),
1630
                #{
1631
                    default => 1000,
1632
                    example => 1000,
1633
                    desc => ?DESC(alarm_size_limit)
1634
                }
1635
            )},
1636
        {"validity_period",
1637
            sc(
1638
                duration(),
1639
                #{
1640
                    default => <<"24h">>,
1641
                    example => "24h",
1642
                    desc => ?DESC(alarm_validity_period)
1643
                }
1644
            )}
1645
    ];
1646
fields("trace") ->
1647
    [
1648
        {"payload_encode",
1649
            sc(hoconsc:enum([hex, text, hidden]), #{
1650
                default => text,
1651
                deprecated => {since, "5.0.22"},
1652
                importance => ?IMPORTANCE_HIDDEN,
1653
                desc => ?DESC(fields_trace_payload_encode)
1654
            })}
1655
    ];
1656
fields("session_persistence") ->
1657
    [
1658
        {"enable",
1659
            sc(
1660
                boolean(), #{
1661
                    desc => ?DESC(session_persistence_enable),
1662
                    default => false
1663
                }
1664
            )},
1665
        {"batch_size",
1666
            sc(
1667
                pos_integer(),
1668
                #{
1669
                    default => 100,
1670
                    desc => ?DESC(session_ds_batch_size),
1671
                    importance => ?IMPORTANCE_MEDIUM
1672
                }
1673
            )},
1674
        {"idle_poll_interval",
1675
            sc(
1676
                timeout_duration(),
1677
                #{
1678
                    default => <<"100ms">>,
1679
                    desc => ?DESC(session_ds_idle_poll_interval)
1680
                }
1681
            )},
1682
        {"last_alive_update_interval",
1683
            sc(
1684
                timeout_duration(),
1685
                #{
1686
                    default => <<"5000ms">>,
1687
                    desc => ?DESC(session_ds_last_alive_update_interval)
1688
                }
1689
            )},
1690
        {"renew_streams_interval",
1691
            sc(
1692
                timeout_duration(),
1693
                #{
1694
                    default => <<"5000ms">>,
1695
                    importance => ?IMPORTANCE_HIDDEN
1696
                }
1697
            )},
1698
        {"session_gc_interval",
1699
            sc(
1700
                timeout_duration(),
1701
                #{
1702
                    default => <<"10m">>,
1703
                    desc => ?DESC(session_ds_session_gc_interval)
1704
                }
1705
            )},
1706
        {"session_gc_batch_size",
1707
            sc(
1708
                pos_integer(),
1709
                #{
1710
                    default => 100,
1711
                    importance => ?IMPORTANCE_LOW,
1712
                    desc => ?DESC(session_ds_session_gc_batch_size)
1713
                }
1714
            )},
1715
        {"message_retention_period",
1716
            sc(
1717
                timeout_duration(),
1718
                #{
1719
                    default => <<"1d">>,
1720
                    desc => ?DESC(session_ds_message_retention_period)
1721
                }
1722
            )},
1723
        {"force_persistence",
1724
            sc(
1725
                boolean(),
1726
                #{
1727
                    default => false,
1728
                    %% Only for testing, shall remain hidden
1729
                    importance => ?IMPORTANCE_HIDDEN
1730
                }
1731
            )}
1732
    ];
1733
fields(durable_storage) ->
1734
    emqx_ds_schema:schema();
1735
fields("client_attrs_init") ->
1736
    [
1737
        {extract_from,
1738
            sc(
1739
                hoconsc:enum([clientid, username, cn, dn, user_property]),
1740
                #{desc => ?DESC("client_attrs_init_extract_from")}
1741
            )},
1742
        {extract_regexp, sc(binary(), #{desc => ?DESC("client_attrs_init_extract_regexp")})},
1743
        {extract_as,
1744
            sc(binary(), #{
1745
                default => <<"alias">>,
1746
                desc => ?DESC("client_attrs_init_extract_as"),
1747
                validator => fun restricted_string/1
1748
            })}
1749
    ].
1750

1751
restricted_string(Str) ->
1752
    case emqx_utils:is_restricted_str(Str) of
1753
        true -> ok;
1754
        false -> {error, <<"Invalid string for attribute name">>}
1755
    end.
1756

1757
mqtt_listener(Bind) ->
1758
    base_listener(Bind) ++
1759
        [
1760
            {"access_rules",
1761
                sc(
1762
                    hoconsc:array(string()),
1763
                    #{
1764
                        desc => ?DESC(mqtt_listener_access_rules),
1765
                        default => [<<"allow all">>]
1766
                    }
1767
                )},
1768
            {"proxy_protocol",
1769
                sc(
1770
                    boolean(),
1771
                    #{
1772
                        desc => ?DESC(mqtt_listener_proxy_protocol),
1773
                        default => false
1774
                    }
1775
                )},
1776
            {"proxy_protocol_timeout",
1777
                sc(
1778
                    duration(),
1779
                    #{
1780
                        desc => ?DESC(mqtt_listener_proxy_protocol_timeout),
1781
                        default => <<"3s">>
1782
                    }
1783
                )}
1784
        ] ++ emqx_schema_hooks:injection_point('mqtt.listener').
1785

1786
base_listener(Bind) ->
1787
    [
1788
        {"enable",
1789
            sc(
1790
                boolean(),
1791
                #{
1792
                    default => true,
1793
                    aliases => [enabled],
1794
                    desc => ?DESC(fields_listener_enabled)
1795
                }
1796
            )},
1797
        {"bind",
1798
            sc(
1799
                ip_port(),
1800
                #{
1801
                    default => Bind,
1802
                    required => true,
1803
                    desc => ?DESC(base_listener_bind)
1804
                }
1805
            )},
1806
        {"acceptors",
1807
            sc(
1808
                pos_integer(),
1809
                #{
1810
                    default => 16,
1811
                    desc => ?DESC(base_listener_acceptors)
1812
                }
1813
            )},
1814
        {"max_connections",
1815
            sc(
1816
                hoconsc:union([infinity, pos_integer()]),
1817
                #{
1818
                    default => emqx_listeners:default_max_conn(),
1819
                    desc => ?DESC(base_listener_max_connections)
1820
                }
1821
            )},
1822
        {"mountpoint",
1823
            sc(
1824
                binary(),
1825
                #{
1826
                    default => <<>>,
1827
                    desc => ?DESC(base_listener_mountpoint)
1828
                }
1829
            )},
1830
        {"zone",
1831
            sc(
1832
                atom(),
1833
                #{
1834
                    desc => ?DESC(base_listener_zone),
1835
                    default => 'default',
1836
                    importance => ?IMPORTANCE_HIDDEN
1837
                }
1838
            )},
1839
        {"limiter",
1840
            sc(
1841
                ?R_REF(
1842
                    emqx_limiter_schema,
1843
                    listener_fields
1844
                ),
1845
                #{
1846
                    desc => ?DESC(base_listener_limiter),
1847
                    importance => ?IMPORTANCE_HIDDEN
1848
                }
1849
            )},
1850
        {"enable_authn",
1851
            sc(
1852
                hoconsc:enum([true, false, quick_deny_anonymous]),
1853
                #{
1854
                    desc => ?DESC(base_listener_enable_authn),
1855
                    default => true
1856
                }
1857
            )}
1858
    ] ++ emqx_limiter_schema:short_paths_fields().
1859

1860
desc("persistent_session_store") ->
1861
    "Settings for message persistence.";
1862
desc("persistent_session_builtin") ->
1863
    "Settings for the built-in storage engine of persistent messages.";
1864
desc("persistent_table_mria_opts") ->
1865
    "Tuning options for the mria table.";
1866
desc("stats") ->
1867
    "Enable/disable statistic data collection.\n"
1868
    "Statistic data such as message receive/send count/rate etc. "
1869
    "It provides insights of system performance and helps to diagnose issues. "
1870
    "You can find statistic data from the dashboard, or from the '/stats' API.";
1871
desc("authorization") ->
1872
    "Settings for client authorization.";
1873
desc("mqtt") ->
1874
    "Global MQTT configuration.";
1875
desc("authz_cache") ->
1876
    "Settings for the authorization cache.";
1877
desc("zone") ->
1878
    "A `Zone` defines a set of configuration items (such as the maximum number of connections)"
1879
    " that can be shared between multiple listeners.\n\n"
1880
    "`Listener` can refer to a `Zone` through the configuration item"
1881
    " <code>listener.\\<Protocol>.\\<Listener Name>.zone</code>.\n\n"
1882
    "The configs defined in the zones will override the global configs with the same key.\n\n"
1883
    "For example, given the following config:\n"
1884
    "```\n"
1885
    "a {\n"
1886
    "    b: 1, c: 1\n"
1887
    "}\n"
1888
    "zone.my_zone {\n"
1889
    "  a {\n"
1890
    "    b:2\n"
1891
    "  }\n"
1892
    "}\n"
1893
    "```\n\n"
1894
    "The global config `a` is overridden by the configs `a` inside the zone `my_zone`.\n\n"
1895
    "If there is a listener using the zone `my_zone`, the value of config `a` will be: "
1896
    "`{b:2, c: 1}`.\n"
1897
    "Note that although the default value of `a.c` is `0`, the global value is used,"
1898
    " i.e. configs in the zone have no default values. To override `a.c` one must configure"
1899
    " it explicitly in the zone.\n\n"
1900
    "All the global configs that can be overridden in zones are:\n"
1901
    " - `stats.*`\n"
1902
    " - `mqtt.*`\n"
1903
    " - `authorization.*`\n"
1904
    " - `flapping_detect.*`\n"
1905
    " - `force_shutdown.*`\n"
1906
    " - `conn_congestion.*`\n"
1907
    " - `force_gc.*`\n\n";
1908
desc("flapping_detect") ->
1909
    "This config controls the allowed maximum number of `CONNECT` packets received\n"
1910
    "from the same clientid in a time frame defined by `window_time`.\n"
1911
    "After the limit is reached, successive `CONNECT` requests are forbidden\n"
1912
    "(banned) until the end of the time period defined by `ban_time`.";
1913
desc("force_shutdown") ->
1914
    "When the process message queue length, or the memory bytes\n"
1915
    "reaches a certain value, the process is forced to close.\n\n"
1916
    "Note: \"message queue\" here refers to the \"message mailbox\"\n"
1917
    "of the Erlang process, not the `mqueue` of QoS 1 and QoS 2.";
1918
desc("overload_protection") ->
1919
    "Overload protection mechanism monitors the load of the system and temporarily\n"
1920
    "disables some features (such as accepting new connections) when the load is high.";
1921
desc("conn_congestion") ->
1922
    "Settings for `conn_congestion` alarm.\n\n"
1923
    "Sometimes the MQTT connection (usually an MQTT subscriber) may\n"
1924
    "get \"congested\", because there are too many packets to be sent.\n"
1925
    "The socket tries to buffer the packets until the buffer is\n"
1926
    "full. If more packets arrive after that, the packets will be\n"
1927
    "\"pending\" in the queue, and we consider the connection\n"
1928
    "congested.\n\n"
1929
    "Note: `sndbuf` can be set to larger value if the\n"
1930
    "alarm is triggered too often.\n"
1931
    "The name of the alarm is of format `conn_congestion/<ClientID>/<Username>`,\n"
1932
    "where the `<ClientID>` is the client ID of the congested MQTT connection,\n"
1933
    "and `<Username>` is the username or `unknown_user`.";
1934
desc("force_gc") ->
1935
    "Force garbage collection in MQTT connection process after\n"
1936
    " they process certain number of messages or bytes of data.";
1937
desc("listeners") ->
1938
    "MQTT listeners identified by their protocol type and assigned names";
1939
desc("mqtt_tcp_listener") ->
1940
    "Settings for the MQTT over TCP listener.";
1941
desc("mqtt_ssl_listener") ->
1942
    "Settings for the MQTT over SSL listener.";
1943
desc("mqtt_ws_listener") ->
1944
    "Settings for the MQTT over WebSocket listener.";
1945
desc("mqtt_wss_listener") ->
1946
    "Settings for the MQTT over WebSocket/SSL listener.";
1947
desc("mqtt_quic_listener") ->
1948
    "Settings for the MQTT over QUIC listener.";
1949
desc("ws_opts") ->
1950
    "WebSocket listener options.";
1951
desc("tcp_opts") ->
1952
    "TCP listener options.";
1953
desc("listener_ssl_opts") ->
1954
    "Socket options for SSL connections.";
1955
desc("listener_wss_opts") ->
1956
    "Socket options for WebSocket/SSL connections.";
1957
desc("fields_mqtt_quic_listener_certfile") ->
1958
    "Path to the certificate file. Will be deprecated in 5.1, use '.ssl_options.certfile' instead.";
1959
desc("fields_mqtt_quic_listener_keyfile") ->
1960
    "Path to the secret key file. Will be deprecated in 5.1, use '.ssl_options.keyfile' instead.";
1961
desc("listener_quic_ssl_opts") ->
1962
    "TLS options for QUIC transport.";
1963
desc("ssl_client_opts") ->
1964
    "Socket options for SSL clients.";
1965
desc("deflate_opts") ->
1966
    "Compression options.";
1967
desc("broker") ->
1968
    "Message broker options.";
1969
desc("broker_perf") ->
1970
    "Broker performance tuning parameters.";
1971
desc("sys_topics") ->
1972
    "The EMQX Broker periodically publishes its own status, message statistics,\n"
1973
    "client online and offline events to the system topic starting with `$SYS/`.\n\n"
1974
    "The following options control the behavior of `$SYS` topics.";
1975
desc("event_names") ->
1976
    "Enable or disable client lifecycle event publishing.\n\n"
1977
    "The following options affect MQTT clients as well as\n"
1978
    "gateway clients. The types of the clients\n"
1979
    "are distinguished by the topic prefix:\n\n"
1980
    "- For the MQTT clients, the format is:\n"
1981
    "`$SYS/broker/<node>/clients/<clientid>/<event>`\n"
1982
    "- For the Gateway clients, it is\n"
1983
    "`$SYS/broker/<node>/gateway/<gateway-name>/clients/<clientid>/<event>`\n";
1984
desc("sysmon") ->
1985
    "Features related to system monitoring and introspection.";
1986
desc("sysmon_vm") ->
1987
    "This part of the configuration is responsible for collecting\n"
1988
    " BEAM VM events, such as long garbage collection, traffic congestion in the inter-broker\n"
1989
    " communication, etc.";
1990
desc("sysmon_os") ->
1991
    "This part of the configuration is responsible for monitoring\n"
1992
    " the host OS health, such as free memory, disk space, CPU load, etc.";
1993
desc("sysmon_top") ->
1994
    "This part of the configuration is responsible for monitoring\n"
1995
    " the Erlang processes in the VM. This information can be sent to an external\n"
1996
    " PostgreSQL database. This feature is inactive unless the PostgreSQL sink is configured.";
1997
desc("alarm") ->
1998
    "Settings for the alarms.";
1999
desc("trace") ->
2000
    "Real-time filtering logs for the ClientID or Topic or IP for debugging.";
2001
desc("shared_subscription_group") ->
2002
    "Per group dispatch strategy for shared subscription";
2003
desc("ocsp") ->
2004
    "Per listener OCSP Stapling configuration.";
2005
desc("crl_cache") ->
2006
    "Global CRL cache options.";
2007
desc("session_persistence") ->
2008
    "Settings governing durable sessions persistence.";
2009
desc(durable_storage) ->
2010
    ?DESC(durable_storage);
2011
desc("client_attrs_init") ->
2012
    ?DESC(client_attrs_init);
2013
desc(_) ->
2014
    undefined.
2015

2016
%% utils
2017
-spec conf_get(string() | [string()], hocon:config()) -> term().
2018
conf_get(Key, Conf) ->
2019
    ensure_list(hocon_maps:get(Key, Conf)).
2020

2021
conf_get(Key, Conf, Default) ->
2022
    ensure_list(hocon_maps:get(Key, Conf, Default)).
2023

2024
ensure_list(V) ->
2025
    case is_binary(V) of
2026
        true ->
2027
            binary_to_list(V);
2028
        false ->
2029
            V
2030
    end.
2031

2032
filter(Opts) ->
2033
    [{K, V} || {K, V} <- Opts, V =/= undefined].
2034

2035
%% @private This function defines the SSL opts which are commonly used by
2036
%% SSL listener and client.
2037
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().
2038
common_ssl_opts_schema(Defaults, Type) ->
2039
    D = fun(Field) -> maps:get(Field, Defaults, undefined) end,
2040
    Df = fun(Field, Default) -> maps:get(Field, Defaults, Default) end,
2041
    Collection = maps:get(versions, Defaults, tls_all_available),
2042
    [
2043
        {"cacertfile",
2044
            sc(
2045
                binary(),
2046
                #{
2047
                    default => cert_file("cacert.pem", Type),
2048
                    required => false,
2049
                    desc => ?DESC(common_ssl_opts_schema_cacertfile)
2050
                }
2051
            )},
2052
        {"cacerts",
2053
            sc(
2054
                boolean(),
2055
                #{
2056
                    default => false,
2057
                    deprecated => {since, "5.1.4"}
2058
                }
2059
            )},
2060
        {"certfile",
2061
            sc(
2062
                binary(),
2063
                #{
2064
                    default => cert_file("cert.pem", Type),
2065
                    required => false,
2066
                    desc => ?DESC(common_ssl_opts_schema_certfile)
2067
                }
2068
            )},
2069
        {"keyfile",
2070
            sc(
2071
                binary(),
2072
                #{
2073
                    default => cert_file("key.pem", Type),
2074
                    required => false,
2075
                    desc => ?DESC(common_ssl_opts_schema_keyfile)
2076
                }
2077
            )},
2078
        {"verify",
2079
            sc(
2080
                hoconsc:enum([verify_peer, verify_none]),
2081
                #{
2082
                    default => Df(verify, verify_none),
2083
                    desc => ?DESC(common_ssl_opts_schema_verify)
2084
                }
2085
            )},
2086
        {"reuse_sessions",
2087
            sc(
2088
                boolean(),
2089
                #{
2090
                    default => Df(reuse_sessions, true),
2091
                    desc => ?DESC(common_ssl_opts_schema_reuse_sessions)
2092
                }
2093
            )},
2094
        {"depth",
2095
            sc(
2096
                non_neg_integer(),
2097
                #{
2098
                    default => Df(depth, 10),
2099
                    desc => ?DESC(common_ssl_opts_schema_depth)
2100
                }
2101
            )},
2102
        {"password",
2103
            sc(
2104
                string(),
2105
                #{
2106
                    sensitive => true,
2107
                    required => false,
2108
                    example => <<"">>,
2109
                    format => <<"password">>,
2110
                    desc => ?DESC(common_ssl_opts_schema_password),
2111
                    importance => ?IMPORTANCE_LOW,
2112
                    converter => fun password_converter/2
2113
                }
2114
            )},
2115
        {"versions", tls_versions_schema(Collection)},
2116
        {"ciphers", ciphers_schema(D(ciphers))},
2117
        {"user_lookup_fun",
2118
            sc(
2119
                typerefl:alias("string", any()),
2120
                #{
2121
                    default => <<"emqx_tls_psk:lookup">>,
2122
                    converter => fun ?MODULE:user_lookup_fun_tr/2,
2123
                    importance => ?IMPORTANCE_HIDDEN,
2124
                    desc => ?DESC(common_ssl_opts_schema_user_lookup_fun)
2125
                }
2126
            )},
2127
        {"secure_renegotiate",
2128
            sc(
2129
                boolean(),
2130
                #{
2131
                    default => Df(secure_renegotiate, true),
2132
                    desc => ?DESC(common_ssl_opts_schema_secure_renegotiate)
2133
                }
2134
            )},
2135
        {"log_level",
2136
            sc(
2137
                hoconsc:enum([
2138
                    emergency, alert, critical, error, warning, notice, info, debug, none, all
2139
                ]),
2140
                #{
2141
                    default => notice,
2142
                    desc => ?DESC(common_ssl_opts_schema_log_level),
2143
                    importance => ?IMPORTANCE_LOW
2144
                }
2145
            )},
2146

2147
        {"hibernate_after",
2148
            sc(
2149
                duration(),
2150
                #{
2151
                    default => Df(hibernate_after, <<"5s">>),
2152
                    desc => ?DESC(common_ssl_opts_schema_hibernate_after)
2153
                }
2154
            )}
2155
    ].
2156

2157
%% @doc Make schema for SSL listener options.
2158
-spec server_ssl_opts_schema(map(), boolean()) -> hocon_schema:field_schema().
2159
server_ssl_opts_schema(Defaults, IsRanchListener) ->
2160
    D = fun(Field) -> maps:get(Field, Defaults, undefined) end,
2161
    Df = fun(Field, Default) -> maps:get(Field, Defaults, Default) end,
2162
    common_ssl_opts_schema(Defaults, server) ++
2163
        [
2164
            {"dhfile",
2165
                sc(
2166
                    string(),
2167
                    #{
2168
                        default => D(dhfile),
2169
                        required => false,
2170
                        desc => ?DESC(server_ssl_opts_schema_dhfile)
2171
                    }
2172
                )},
2173
            {"fail_if_no_peer_cert",
2174
                sc(
2175
                    boolean(),
2176
                    #{
2177
                        default => Df(fail_if_no_peer_cert, false),
2178
                        desc => ?DESC(server_ssl_opts_schema_fail_if_no_peer_cert)
2179
                    }
2180
                )},
2181
            {"honor_cipher_order",
2182
                sc(
2183
                    boolean(),
2184
                    #{
2185
                        default => Df(honor_cipher_order, true),
2186
                        desc => ?DESC(server_ssl_opts_schema_honor_cipher_order)
2187
                    }
2188
                )},
2189
            {"client_renegotiation",
2190
                sc(
2191
                    boolean(),
2192
                    #{
2193
                        default => Df(client_renegotiation, true),
2194
                        desc => ?DESC(server_ssl_opts_schema_client_renegotiation)
2195
                    }
2196
                )},
2197
            {"handshake_timeout",
2198
                sc(
2199
                    duration(),
2200
                    #{
2201
                        default => Df(handshake_timeout, <<"15s">>),
2202
                        desc => ?DESC(server_ssl_opts_schema_handshake_timeout)
2203
                    }
2204
                )}
2205
        ] ++
2206
        [
2207
            Field
2208
         || not IsRanchListener,
2209
            Field <- [
2210
                {gc_after_handshake,
2211
                    sc(boolean(), #{
2212
                        default => false,
2213
                        desc => ?DESC(server_ssl_opts_schema_gc_after_handshake)
2214
                    })},
2215
                {ocsp,
2216
                    sc(
2217
                        ref("ocsp"),
2218
                        #{
2219
                            required => false,
2220
                            validator => fun ocsp_inner_validator/1
2221
                        }
2222
                    )},
2223
                {enable_crl_check,
2224
                    sc(
2225
                        boolean(),
2226
                        #{
2227
                            default => false,
2228
                            importance => ?IMPORTANCE_MEDIUM,
2229
                            desc => ?DESC("server_ssl_opts_schema_enable_crl_check")
2230
                        }
2231
                    )}
2232
            ]
2233
        ].
2234

2235
validate_server_ssl_opts(#{<<"fail_if_no_peer_cert">> := true, <<"verify">> := Verify}) ->
2236
    validate_verify(Verify);
2237
validate_server_ssl_opts(#{fail_if_no_peer_cert := true, verify := Verify}) ->
2238
    validate_verify(Verify);
2239
validate_server_ssl_opts(_SSLOpts) ->
2240
    ok.
2241

2242
validate_verify(verify_peer) ->
2243
    ok;
2244
validate_verify(_) ->
2245
    {error, "verify must be verify_peer when fail_if_no_peer_cert is true"}.
2246

2247
mqtt_ssl_listener_ssl_options_validator(Conf) ->
2248
    Checks = [
2249
        fun validate_server_ssl_opts/1,
2250
        fun ocsp_outer_validator/1,
2251
        fun crl_outer_validator/1
2252
    ],
2253
    case emqx_utils:pipeline(Checks, Conf, not_used) of
2254
        {ok, _, _} ->
2255
            ok;
2256
        {error, Reason, _NotUsed} ->
2257
            {error, Reason}
2258
    end.
2259

2260
ocsp_outer_validator(#{<<"ocsp">> := #{<<"enable_ocsp_stapling">> := true}} = Conf) ->
2261
    %% outer mqtt listener ssl server config
2262
    ServerCertPemPath = maps:get(<<"certfile">>, Conf, undefined),
2263
    case ServerCertPemPath of
2264
        undefined ->
2265
            {error, "Server certificate must be defined when using OCSP stapling"};
2266
        _ ->
2267
            %% check if issuer pem is readable and/or valid?
2268
            ok
2269
    end;
2270
ocsp_outer_validator(_Conf) ->
2271
    ok.
2272

2273
ocsp_inner_validator(#{enable_ocsp_stapling := _} = Conf) ->
2274
    ocsp_inner_validator(emqx_utils_maps:binary_key_map(Conf));
2275
ocsp_inner_validator(#{<<"enable_ocsp_stapling">> := false} = _Conf) ->
2276
    ok;
2277
ocsp_inner_validator(#{<<"enable_ocsp_stapling">> := true} = Conf) ->
2278
    assert_required_field(
2279
        Conf, <<"responder_url">>, "The responder URL is required for OCSP stapling"
2280
    ),
2281
    assert_required_field(
2282
        Conf, <<"issuer_pem">>, "The issuer PEM path is required for OCSP stapling"
2283
    ),
2284
    ok.
2285

2286
crl_outer_validator(
2287
    #{<<"enable_crl_check">> := true} = SSLOpts
2288
) ->
2289
    case maps:get(<<"verify">>, SSLOpts) of
2290
        verify_peer ->
2291
            ok;
2292
        _ ->
2293
            {error, "verify must be verify_peer when CRL check is enabled"}
2294
    end;
2295
crl_outer_validator(_SSLOpts) ->
2296
    ok.
2297

2298
%% @doc Make schema for SSL client.
2299
-spec client_ssl_opts_schema(map()) -> hocon_schema:field_schema().
2300
client_ssl_opts_schema(Defaults) ->
2301
    common_ssl_opts_schema(Defaults, client) ++
2302
        [
2303
            {"enable",
2304
                sc(
2305
                    boolean(),
2306
                    #{
2307
                        default => false,
2308
                        desc => ?DESC(client_ssl_opts_schema_enable)
2309
                    }
2310
                )},
2311
            {"server_name_indication",
2312
                sc(
2313
                    hoconsc:union([disable, string()]),
2314
                    #{
2315
                        required => false,
2316
                        example => disable,
2317
                        validator => fun emqx_schema:non_empty_string/1,
2318
                        desc => ?DESC(client_ssl_opts_schema_server_name_indication)
2319
                    }
2320
                )}
2321
        ].
2322

2323
available_tls_vsns(dtls_all_available) -> emqx_tls_lib:available_versions(dtls);
2324
available_tls_vsns(tls_all_available) -> emqx_tls_lib:available_versions(tls).
2325

2326
outdated_tls_vsn(dtls_all_available) -> [dtlsv1];
2327
outdated_tls_vsn(tls_all_available) -> ['tlsv1.1', tlsv1].
2328

2329
default_tls_vsns(Key) ->
2330
    available_tls_vsns(Key) -- outdated_tls_vsn(Key).
2331

2332
-spec tls_versions_schema(tls_all_available | dtls_all_available) -> hocon_schema:field_schema().
2333
tls_versions_schema(Collection) ->
2334
    DefaultVersions = default_tls_vsns(Collection),
2335
    sc(
2336
        hoconsc:array(typerefl:atom()),
2337
        #{
2338
            default => DefaultVersions,
2339
            desc => ?DESC(common_ssl_opts_schema_versions),
2340
            importance => ?IMPORTANCE_HIGH,
2341
            validator => fun(Input) -> validate_tls_versions(Collection, Input) end
2342
        }
2343
    ).
2344

2345
-spec ciphers_schema(quic | dtls_all_available | tls_all_available | undefined) ->
2346
    hocon_schema:field_schema().
2347
ciphers_schema(Default) ->
2348
    Desc =
2349
        case Default of
2350
            quic -> ?DESC(ciphers_schema_quic);
2351
            _ -> ?DESC(ciphers_schema_common)
2352
        end,
2353
    sc(
2354
        hoconsc:array(string()),
2355
        #{
2356
            default => default_ciphers(Default),
2357
            converter => fun converter_ciphers/2,
2358
            validator =>
2359
                case Default =:= quic of
2360
                    %% quic has openssl statically linked
2361
                    true -> undefined;
2362
                    false -> fun validate_ciphers/1
2363
                end,
2364
            desc => Desc
2365
        }
2366
    ).
2367

2368
converter_ciphers(undefined, _Opts) ->
2369
    [];
2370
converter_ciphers(<<>>, _Opts) ->
2371
    [];
2372
converter_ciphers(Ciphers, _Opts) when is_list(Ciphers) -> Ciphers;
2373
converter_ciphers(Ciphers, _Opts) when is_binary(Ciphers) ->
2374
    {ok, List} = to_comma_separated_binary(binary_to_list(Ciphers)),
2375
    List.
2376

2377
default_ciphers(Which) ->
2378
    lists:map(
2379
        fun erlang:iolist_to_binary/1,
2380
        do_default_ciphers(Which)
2381
    ).
2382

2383
do_default_ciphers(quic) ->
2384
    [
2385
        "TLS_AES_256_GCM_SHA384",
2386
        "TLS_AES_128_GCM_SHA256",
2387
        "TLS_CHACHA20_POLY1305_SHA256"
2388
    ];
2389
do_default_ciphers(_) ->
2390
    %% otherwise resolve default ciphers list at runtime
2391
    [].
2392

2393
password_converter(X, Opts) ->
2394
    bin_str_converter(X, Opts).
2395

2396
bin_str_converter(undefined, _) ->
2397
    undefined;
2398
bin_str_converter(I, _) when is_integer(I) ->
2399
    integer_to_binary(I);
2400
bin_str_converter(X, _) ->
2401
    try
2402
        iolist_to_binary(X)
2403
    catch
2404
        _:_ ->
2405
            throw("must_quote")
2406
    end.
2407

2408
authz_fields() ->
2409
    [
2410
        {"no_match",
2411
            sc(
2412
                hoconsc:enum([allow, deny]),
2413
                #{
2414
                    default => allow,
2415
                    required => true,
2416
                    desc => ?DESC(fields_authorization_no_match)
2417
                }
2418
            )},
2419
        {"deny_action",
2420
            sc(
2421
                hoconsc:enum([ignore, disconnect]),
2422
                #{
2423
                    default => ignore,
2424
                    required => true,
2425
                    desc => ?DESC(fields_authorization_deny_action)
2426
                }
2427
            )},
2428
        {"cache",
2429
            sc(
2430
                ref(?MODULE, "authz_cache"),
2431
                #{}
2432
            )}
2433
    ].
2434

2435
%% @private return a list of keys in a parent field
2436
-spec keys(string(), hocon:config()) -> [string()].
2437
keys(Parent, Conf) ->
2438
    [binary_to_list(B) || B <- maps:keys(conf_get(Parent, Conf, #{}))].
2439

2440
-spec ceiling(number()) -> integer().
2441
ceiling(X) ->
2442
    T = erlang:trunc(X),
2443
    case (X - T) of
2444
        Neg when Neg < 0 -> T;
2445
        Pos when Pos > 0 -> T + 1;
2446
        _ -> T
2447
    end.
2448

2449
%% types
2450

2451
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
2452

2453
map(Name, Type) -> hoconsc:map(Name, Type).
2454

2455
ref(StructName) -> hoconsc:ref(?MODULE, StructName).
2456

2457
ref(Module, StructName) -> hoconsc:ref(Module, StructName).
2458

2459
mk_duration(Desc, OverrideMeta) ->
2460
    DefaultMeta = #{
2461
        desc => Desc ++
2462
            " Time interval is a string that contains a number followed by time unit:<br/>"
2463
            "- `ms` for milliseconds,\n"
2464
            "- `s` for seconds,\n"
2465
            "- `m` for minutes,\n"
2466
            "- `h` for hours;\n<br/>"
2467
            "or combination of whereof: `1h5m0s`"
2468
    },
2469
    hoconsc:mk(typerefl:alias("string", duration()), maps:merge(DefaultMeta, OverrideMeta)).
2470

2471
to_duration(Str) ->
2472
    case hocon_postprocess:duration(Str) of
2473
        I when is_integer(I) -> {ok, I};
2474
        _ -> to_integer(Str)
2475
    end.
2476

2477
to_duration_s(Str) ->
2478
    case hocon_postprocess:duration(Str) of
2479
        I when is_number(I) -> {ok, ceiling(I / 1000)};
2480
        _ -> to_integer(Str)
2481
    end.
2482

2483
-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
2484
    Input :: string() | binary().
2485
to_duration_ms(Str) ->
2486
    case hocon_postprocess:duration(Str) of
2487
        I when is_number(I) -> {ok, ceiling(I)};
2488
        _ -> to_integer(Str)
2489
    end.
2490

2491
-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
2492
    Input :: string() | binary().
2493
to_timeout_duration(Str) ->
2494
    do_to_timeout_duration(Str, fun to_duration/1, ?MAX_INT_TIMEOUT_MS, "ms").
2495

2496
-spec to_timeout_duration_ms(Input) -> {ok, timeout_duration_ms()} | {error, Input} when
2497
    Input :: string() | binary().
2498
to_timeout_duration_ms(Str) ->
2499
    do_to_timeout_duration(Str, fun to_duration_ms/1, ?MAX_INT_TIMEOUT_MS, "ms").
2500

2501
-spec to_timeout_duration_s(Input) -> {ok, timeout_duration_s()} | {error, Input} when
2502
    Input :: string() | binary().
2503
to_timeout_duration_s(Str) ->
2504
    do_to_timeout_duration(Str, fun to_duration_s/1, ?MAX_INT_TIMEOUT_S, "s").
2505

2506
do_to_timeout_duration(Str, Fn, Max, Unit) ->
2507
    case Fn(Str) of
2508
        {ok, I} ->
2509
            case I =< Max of
2510
                true ->
2511
                    {ok, I};
2512
                false ->
2513
                    Msg = lists:flatten(
2514
                        io_lib:format("timeout value too large (max: ~b ~s)", [Max, Unit])
2515
                    ),
2516
                    throw(#{
2517
                        schema_module => ?MODULE,
2518
                        message => Msg,
2519
                        kind => validation_error
2520
                    })
2521
            end;
2522
        Err ->
2523
            Err
2524
    end.
2525

2526
to_bytesize(Str) ->
2527
    case hocon_postprocess:bytesize(Str) of
2528
        I when is_integer(I) -> {ok, I};
2529
        _ -> to_integer(Str)
2530
    end.
2531

2532
to_wordsize(Str) ->
2533
    WordSize = erlang:system_info(wordsize),
2534
    case to_bytesize(Str) of
2535
        {ok, Bytes} -> {ok, Bytes div WordSize};
2536
        Error -> Error
2537
    end.
2538

2539
to_integer(Str) ->
2540
    case string:to_integer(Str) of
2541
        {Int, []} -> {ok, Int};
2542
        {Int, <<>>} -> {ok, Int};
2543
        _ -> {error, Str}
2544
    end.
2545

2546
to_percent(Str) ->
2547
    Percent = hocon_postprocess:percent(Str),
2548
    case is_number(Percent) andalso Percent >= 0.0 andalso Percent =< 1.0 of
2549
        true -> {ok, Percent};
2550
        false -> {error, Str}
2551
    end.
2552

2553
to_comma_separated_list(Str) ->
2554
    {ok, string:tokens(Str, ", ")}.
2555

2556
to_comma_separated_binary(Str) ->
2557
    {ok, lists:map(fun erlang:list_to_binary/1, string:tokens(Str, ", "))}.
2558

2559
to_comma_separated_atoms(Str) ->
2560
    {ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}.
2561

2562
to_url(Str) ->
2563
    case emqx_http_lib:uri_parse(Str) of
2564
        {ok, URIMap} ->
2565
            URIString = emqx_http_lib:normalize(URIMap),
2566
            {ok, iolist_to_binary(URIString)};
2567
        Error ->
2568
            Error
2569
    end.
2570

2571
to_json_binary(Str) ->
2572
    case emqx_utils_json:safe_decode(Str) of
2573
        {ok, _} ->
2574
            {ok, iolist_to_binary(Str)};
2575
        Error ->
2576
            Error
2577
    end.
2578

2579
%% @doc support the following format:
2580
%%  - 127.0.0.1:1883
2581
%%  - ::1:1883
2582
%%  - [::1]:1883
2583
%%  - :1883
2584
%%  - :::1883
2585
to_ip_port(Str) ->
2586
    case split_ip_port(Str) of
2587
        {"", Port} ->
2588
            %% this is a local address
2589
            {ok, parse_port(Port)};
2590
        {MaybeIp, Port} ->
2591
            PortVal = parse_port(Port),
2592
            case inet:parse_address(MaybeIp) of
2593
                {ok, IpTuple} ->
2594
                    {ok, {IpTuple, PortVal}};
2595
                _ ->
2596
                    {error, bad_ip_port}
2597
            end;
2598
        _ ->
2599
            {error, bad_ip_port}
2600
    end.
2601

2602
split_ip_port(Str0) ->
2603
    Str = re:replace(Str0, " ", "", [{return, list}, global]),
2604
    case lists:split(string:rchr(Str, $:), Str) of
2605
        %% no colon
2606
        {[], Str} ->
2607
            {"", Str};
2608
        {IpPlusColon, PortString} ->
2609
            IpStr0 = lists:droplast(IpPlusColon),
2610
            case IpStr0 of
2611
                %% drop head/tail brackets
2612
                [$[ | S] ->
2613
                    case lists:last(S) of
2614
                        $] -> {lists:droplast(S), PortString};
2615
                        _ -> error
2616
                    end;
2617
                _ ->
2618
                    {IpStr0, PortString}
2619
            end
2620
    end.
2621

2622
to_erl_cipher_suite(Str) ->
2623
    case ssl:str_to_suite(Str) of
2624
        {error, Reason} -> error({invalid_cipher, Reason});
2625
        Cipher -> Cipher
2626
    end.
2627

2628
to_atom(Atom) when is_atom(Atom) ->
2629
    Atom;
2630
to_atom(Str) when is_list(Str) ->
2631
    list_to_atom(Str);
2632
to_atom(Bin) when is_binary(Bin) ->
2633
    binary_to_atom(Bin, utf8).
2634

2635
validate_heap_size(Siz) when is_integer(Siz) ->
2636
    MaxSiz =
2637
        case erlang:system_info(wordsize) of
2638
            % arch_64
2639
            8 -> (1 bsl 59) - 1;
2640
            % arch_32
2641
            4 -> (1 bsl 27) - 1
2642
        end,
2643
    case Siz > MaxSiz of
2644
        true ->
2645
            {error, #{reason => max_heap_size_too_large, maximum => MaxSiz}};
2646
        false ->
2647
            ok
2648
    end;
2649
validate_heap_size(_SizStr) ->
2650
    {error, invalid_heap_size}.
2651

2652
validate_packet_size(Siz) when is_integer(Siz) andalso Siz < 1 ->
2653
    {error, #{reason => max_mqtt_packet_size_too_small, minimum => 1}};
2654
validate_packet_size(Siz) when is_integer(Siz) andalso Siz > ?MAX_INT_MQTT_PACKET_SIZE ->
2655
    Max = integer_to_list(round(?MAX_INT_MQTT_PACKET_SIZE / 1024 / 1024)) ++ "M",
2656
    {error, #{reason => max_mqtt_packet_size_too_large, maximum => Max}};
2657
validate_packet_size(Siz) when is_integer(Siz) ->
2658
    ok;
2659
validate_packet_size(_SizStr) ->
2660
    {error, invalid_packet_size}.
2661

2662
validate_keepalive_multiplier(Multiplier) when
2663
    is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0
2664
->
2665
    ok;
2666
validate_keepalive_multiplier(_Multiplier) ->
2667
    {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
2668

2669
validate_alarm_actions(Actions) ->
2670
    UnSupported = lists:filter(
2671
        fun(Action) -> Action =/= log andalso Action =/= publish end, Actions
2672
    ),
2673
    case UnSupported of
2674
        [] -> ok;
2675
        Error -> {error, Error}
2676
    end.
2677

2678
validate_tcp_keepalive(Value) ->
2679
    case iolist_to_binary(Value) of
2680
        <<"none">> ->
2681
            ok;
2682
        _ ->
2683
            _ = parse_tcp_keepalive(Value),
2684
            ok
2685
    end.
2686

2687
%% @doc This function is used as value validator and also run-time parser.
2688
parse_tcp_keepalive(Str) ->
2689
    try
2690
        {ok, [Idle, Interval, Probes]} = to_comma_separated_binary(Str),
2691
        %% use 10 times the Linux defaults as range limit
2692
        IdleInt = parse_ka_int(Idle, "Idle", 1, 7200_0),
2693
        IntervalInt = parse_ka_int(Interval, "Interval", 1, 75_0),
2694
        ProbesInt = parse_ka_int(Probes, "Probes", 1, 9_0),
2695
        {IdleInt, IntervalInt, ProbesInt}
2696
    catch
2697
        error:_ ->
2698
            throw(#{
2699
                reason => "Not comma separated positive integers of 'Idle,Interval,Probes' format",
2700
                value => Str
2701
            })
2702
    end.
2703

2704
parse_ka_int(Bin, Name, Min, Max) ->
2705
    I = binary_to_integer(string:trim(Bin)),
2706
    case I >= Min andalso I =< Max of
2707
        true ->
2708
            I;
2709
        false ->
2710
            Msg = io_lib:format("TCP-Keepalive '~s' value must be in the rage of [~p, ~p].", [
2711
                Name, Min, Max
2712
            ]),
2713
            throw(#{reason => lists:flatten(Msg), value => I})
2714
    end.
2715

2716
user_lookup_fun_tr(undefined, Opts) ->
2717
    user_lookup_fun_tr(<<"emqx_tls_psk:lookup">>, Opts);
2718
user_lookup_fun_tr(Lookup, #{make_serializable := true}) ->
2719
    fmt_user_lookup_fun(Lookup);
2720
user_lookup_fun_tr(Lookup, _) ->
2721
    parse_user_lookup_fun(Lookup).
2722

2723
fmt_user_lookup_fun({Fun, _}) when is_function(Fun, 3) ->
2724
    {module, Mod} = erlang:fun_info(Fun, module),
2725
    {name, Name} = erlang:fun_info(Fun, name),
2726
    atom_to_list(Mod) ++ ":" ++ atom_to_list(Name);
2727
fmt_user_lookup_fun(Other) ->
2728
    %% already serializable
2729
    Other.
2730

2731
parse_user_lookup_fun({Fun, _} = Lookup) when is_function(Fun, 3) -> Lookup;
2732
parse_user_lookup_fun(StrConf) ->
2733
    [ModStr, FunStr] = string:tokens(str(StrConf), ": "),
2734
    Mod = list_to_atom(ModStr),
2735
    Fun = list_to_atom(FunStr),
2736
    {fun Mod:Fun/3, undefined}.
2737

2738
validate_ciphers(Ciphers) ->
2739
    Set = emqx_tls_lib:all_ciphers_set_cached(),
2740
    case lists:filter(fun(Cipher) -> not sets:is_element(Cipher, Set) end, Ciphers) of
2741
        [] -> ok;
2742
        Bad -> {error, {bad_ciphers, Bad}}
2743
    end.
2744

2745
validate_tls_versions(Collection, Versions) ->
2746
    AvailableVersions = available_tls_vsns(Collection),
2747
    case lists:filter(fun(V) -> not lists:member(V, AvailableVersions) end, Versions) of
2748
        [] -> validate_tls_version_gap(Versions);
2749
        Vs -> {error, {unsupported_tls_versions, Vs}}
2750
    end.
2751

2752
%% See also `validate_version_gap/1` in OTP ssl.erl,
2753
%% e.g: https://github.com/emqx/otp/blob/emqx-OTP-25.1.2/lib/ssl/src/ssl.erl#L2566.
2754
%% Do not allow configuration of TLS 1.3 with a gap where TLS 1.2 is not supported
2755
%% as that configuration can trigger the built in version downgrade protection
2756
%% mechanism and the handshake can fail with an Illegal Parameter alert.
2757
validate_tls_version_gap(Versions) ->
2758
    case lists:member('tlsv1.3', Versions) of
2759
        true when length(Versions) >= 2 ->
2760
            case lists:member('tlsv1.2', Versions) of
2761
                true ->
2762
                    ok;
2763
                false ->
2764
                    {error,
2765
                        "Using multiple versions that include tlsv1.3 but "
2766
                        "exclude tlsv1.2 is not allowed"}
2767
            end;
2768
        _ ->
2769
            ok
2770
    end.
2771

2772
validations() ->
2773
    [
2774
        {check_process_watermark, fun check_process_watermark/1},
2775
        {check_cpu_watermark, fun check_cpu_watermark/1}
2776
    ].
2777

2778
%% validations from emqx_conf_schema, we must filter other *_schema by undefined.
2779
check_process_watermark(Conf) ->
2780
    check_watermark("sysmon.vm.process_low_watermark", "sysmon.vm.process_high_watermark", Conf).
2781

2782
check_cpu_watermark(Conf) ->
2783
    check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf).
2784

2785
check_watermark(LowKey, HighKey, Conf) ->
2786
    case to_percent(hocon_maps:get(LowKey, Conf)) of
2787
        {error, undefined} ->
2788
            true;
2789
        {ok, Low} ->
2790
            case to_percent(hocon_maps:get(HighKey, Conf)) of
2791
                {ok, High} when High > Low -> true;
2792
                {ok, High} -> {bad_watermark, #{LowKey => Low, HighKey => High}};
2793
                {error, HighVal} -> {bad_watermark, #{HighKey => HighVal}}
2794
            end;
2795
        {error, Low} ->
2796
            {bad_watermark, #{LowKey => Low}}
2797
    end.
2798

2799
str(A) when is_atom(A) ->
2800
    atom_to_list(A);
2801
str(B) when is_binary(B) ->
2802
    binary_to_list(B);
2803
str(S) when is_list(S) ->
2804
    S.
2805

2806
-spec qos() -> typerefl:type().
2807
qos() ->
2808
    typerefl:alias("qos", typerefl:union([0, 1, 2])).
2809

2810
non_empty_string(<<>>) -> {error, empty_string_not_allowed};
2811
non_empty_string("") -> {error, empty_string_not_allowed};
2812
non_empty_string(S) when is_binary(S); is_list(S) -> ok;
2813
non_empty_string(_) -> {error, invalid_string}.
2814

2815
%% @doc Make schema for 'server' or 'servers' field.
2816
%% for each field, there are three passes:
2817
%% 1. converter: Normalize the value.
2818
%%               This normalized value is stored in EMQX's raw config.
2819
%% 2. validator: Validate the normalized value.
2820
%%               Besides checkin if the value can be empty or undefined
2821
%%               it also calls the 3rd pass to see if the provided
2822
%%               hosts can be successfully parsed.
2823
%% 3. parsing: Done at runtime in each module which uses this config
2824
servers_sc(Meta0, ParseOpts) ->
2825
    %% if this field has a default value
2826
    %% then it is not NOT required
2827
    %% NOTE: maps:is_key is not the solution because #{default => undefined} is legit
2828
    HasDefault = (maps:get(default, Meta0, undefined) =/= undefined),
2829
    Required = maps:get(required, Meta0, not HasDefault),
2830
    Meta = #{
2831
        required => Required,
2832
        converter => fun convert_servers/2,
2833
        validator => servers_validator(ParseOpts, Required)
2834
    },
2835
    sc(string(), maps:merge(Meta, Meta0)).
2836

2837
%% @hidden Convert a deep map to host:port pairs.
2838
%% This is due to the fact that a host:port string
2839
%% often can be parsed as a HOCON struct.
2840
%% e.g. when a string from environment variable is `host.domain.name:80'
2841
%% without escaped quotes, it's parsed as
2842
%% `#{<<"host">> => #{<<"domain">> => #{<<"name">> => 80}}}'
2843
%% and when it is a comma-separated list of host:port pairs
2844
%% like `h1.foo:80, h2.bar:81' then it is parsed as
2845
%% `#{<<"h1">> => #{<<"foo">> => 80}, <<"h2">> => #{<<"bar">> => 81}}'
2846
%% This function is to format the map back to host:port (pairs)
2847
%% This function also tries to remove spaces around commas in comma-separated,
2848
%% `host:port' list, and format string array to comma-separated.
2849
convert_servers(HoconValue, _HoconOpts) ->
2850
    convert_servers(HoconValue).
2851

2852
convert_servers(undefined) ->
2853
    %% should not format 'undefined' as string
2854
    %% not to throw exception either
2855
    %% (leave it to the 'required => true | false' check)
2856
    undefined;
2857
convert_servers(Map) when is_map(Map) ->
2858
    try
2859
        List = convert_hocon_map_host_port(Map),
2860
        iolist_to_binary(string:join(List, ","))
2861
    catch
2862
        _:_ ->
2863
            throw("bad_host_port")
2864
    end;
2865
convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) ->
2866
    %% if the old config was a string array
2867
    %% we want to make sure it's converted to a comma-separated
2868
    iolist_to_binary([[I, ","] || I <- Array]);
2869
convert_servers(Str) ->
2870
    normalize_host_port_str(Str).
2871

2872
%% remove spaces around comma (,)
2873
normalize_host_port_str(Str) ->
2874
    iolist_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")).
2875

2876
%% @doc Shared validation function for both 'server' and 'servers' string.
2877
%% NOTE: Validator is called after converter.
2878
servers_validator(Opts, Required) ->
2879
    fun(Str0) ->
2880
        case str(Str0) of
2881
            "" ->
2882
                %% Empty string is not allowed even if the field is not required
2883
                %% we should remove field from config if it's empty
2884
                throw("cannot_be_empty");
2885
            "undefined" when Required ->
2886
                %% when the field is not set in config file
2887
                %% NOTE: assuming nobody is going to name their server "undefined"
2888
                throw("cannot_be_empty");
2889
            "undefined" ->
2890
                ok;
2891
            Str ->
2892
                %% it's valid as long as it can be parsed
2893
                _ = parse_servers(Str, Opts),
2894
                ok
2895
        end
2896
    end.
2897

2898
%% @doc Parse `host[:port]' endpoint to a `{Host, Port}' tuple or just `Host' string.
2899
%% `Opt' is a `map()' with below options supported:
2900
%%
2901
%% `default_port': a port number, so users are not forced to configure
2902
%%                 port number.
2903
%% `no_port': by default it's `false', when set to `true',
2904
%%            a `throw' exception is raised if the port is found.
2905
-spec parse_server(undefined | string() | binary(), server_parse_option()) ->
2906
    undefined | parsed_server().
2907
parse_server(Str, Opts) ->
2908
    case parse_servers(Str, Opts) of
2909
        undefined ->
2910
            undefined;
2911
        [L] ->
2912
            L;
2913
        [_ | _] = L ->
2914
            throw("expecting_one_host_but_got: " ++ integer_to_list(length(L)))
2915
    end.
2916

2917
%% @doc Parse comma separated `host[:port][,host[:port]]' endpoints
2918
%% into a list of `{Host, Port}' tuples or just `Host' string.
2919
-spec parse_servers(undefined | string() | binary(), server_parse_option()) ->
2920
    undefined | [parsed_server()].
2921
parse_servers(undefined, _Opts) ->
2922
    %% should not parse 'undefined' as string,
2923
    %% not to throw exception either,
2924
    %% leave it to the 'required => true | false' check
2925
    undefined;
2926
parse_servers(Str, Opts) ->
2927
    case do_parse_servers(Str, Opts) of
2928
        [] ->
2929
            %% treat empty as 'undefined'
2930
            undefined;
2931
        [_ | _] = L ->
2932
            L
2933
    end.
2934

2935
do_parse_servers([H | _] = Array, Opts) when is_binary(H) orelse is_list(H) ->
2936
    %% the old schema allowed providing a list of strings
2937
    %% e.g. ["server1:80", "server2:80"]
2938
    lists:map(
2939
        fun(HostPort) ->
2940
            do_parse_server(str(HostPort), Opts)
2941
        end,
2942
        Array
2943
    );
2944
do_parse_servers(Str, Opts) when is_binary(Str) orelse is_list(Str) ->
2945
    lists:map(
2946
        fun(HostPort) ->
2947
            do_parse_server(HostPort, Opts)
2948
        end,
2949
        split_host_port(Str)
2950
    ).
2951

2952
split_host_port(Str) ->
2953
    lists:filtermap(
2954
        fun(S) ->
2955
            case string:strip(S) of
2956
                "" -> false;
2957
                X -> {true, X}
2958
            end
2959
        end,
2960
        string:tokens(str(Str), ",")
2961
    ).
2962

2963
do_parse_server(Str, Opts) ->
2964
    DefaultPort = maps:get(default_port, Opts, undefined),
2965
    NotExpectingPort = maps:get(no_port, Opts, false),
2966
    DefaultScheme = maps:get(default_scheme, Opts, undefined),
2967
    SupportedSchemes = maps:get(supported_schemes, Opts, []),
2968
    NotExpectingScheme = (not is_list(DefaultScheme)) andalso length(SupportedSchemes) =:= 0,
2969
    case is_integer(DefaultPort) andalso NotExpectingPort of
2970
        true ->
2971
            %% either provide a default port from schema,
2972
            %% or do not allow user to set port number
2973
            error("bad_schema");
2974
        false ->
2975
            ok
2976
    end,
2977
    case is_list(DefaultScheme) andalso (not lists:member(DefaultScheme, SupportedSchemes)) of
2978
        true ->
2979
            %% inconsistent schema
2980
            error("bad_schema");
2981
        false ->
2982
            ok
2983
    end,
2984
    %% do not split with space, there should be no space allowed between host and port
2985
    Tokens = string:tokens(Str, ":"),
2986
    Context = #{
2987
        not_expecting_port => NotExpectingPort,
2988
        not_expecting_scheme => NotExpectingScheme,
2989
        default_port => DefaultPort,
2990
        default_scheme => DefaultScheme,
2991
        opts => Opts
2992
    },
2993
    check_server_parts(Tokens, Context).
2994

2995
check_server_parts([Scheme, "//" ++ Hostname, Port], Context) ->
2996
    #{
2997
        not_expecting_scheme := NotExpectingScheme,
2998
        not_expecting_port := NotExpectingPort,
2999
        opts := Opts
3000
    } = Context,
3001
    NotExpectingPort andalso throw("not_expecting_port_number"),
3002
    NotExpectingScheme andalso throw("not_expecting_scheme"),
3003
    #{
3004
        scheme => check_scheme(Scheme, Opts),
3005
        hostname => check_hostname(Hostname),
3006
        port => parse_port(Port)
3007
    };
3008
check_server_parts([Scheme, "//" ++ Hostname], Context) ->
3009
    #{
3010
        not_expecting_scheme := NotExpectingScheme,
3011
        not_expecting_port := NotExpectingPort,
3012
        default_port := DefaultPort,
3013
        opts := Opts
3014
    } = Context,
3015
    NotExpectingScheme andalso throw("not_expecting_scheme"),
3016
    case is_integer(DefaultPort) of
3017
        true ->
3018
            #{
3019
                scheme => check_scheme(Scheme, Opts),
3020
                hostname => check_hostname(Hostname),
3021
                port => DefaultPort
3022
            };
3023
        false when NotExpectingPort ->
3024
            #{
3025
                scheme => check_scheme(Scheme, Opts),
3026
                hostname => check_hostname(Hostname)
3027
            };
3028
        false ->
3029
            throw("missing_port_number")
3030
    end;
3031
check_server_parts([Hostname, Port], Context) ->
3032
    #{
3033
        not_expecting_port := NotExpectingPort,
3034
        default_scheme := DefaultScheme
3035
    } = Context,
3036
    NotExpectingPort andalso throw("not_expecting_port_number"),
3037
    case is_list(DefaultScheme) of
3038
        false ->
3039
            #{
3040
                hostname => check_hostname(Hostname),
3041
                port => parse_port(Port)
3042
            };
3043
        true ->
3044
            #{
3045
                scheme => DefaultScheme,
3046
                hostname => check_hostname(Hostname),
3047
                port => parse_port(Port)
3048
            }
3049
    end;
3050
check_server_parts([Hostname], Context) ->
3051
    #{
3052
        not_expecting_scheme := NotExpectingScheme,
3053
        not_expecting_port := NotExpectingPort,
3054
        default_port := DefaultPort,
3055
        default_scheme := DefaultScheme
3056
    } = Context,
3057
    case is_integer(DefaultPort) orelse NotExpectingPort of
3058
        true ->
3059
            ok;
3060
        false ->
3061
            throw("missing_port_number")
3062
    end,
3063
    case is_list(DefaultScheme) orelse NotExpectingScheme of
3064
        true ->
3065
            ok;
3066
        false ->
3067
            throw("missing_scheme")
3068
    end,
3069
    case {is_integer(DefaultPort), is_list(DefaultScheme)} of
3070
        {true, true} ->
3071
            #{
3072
                scheme => DefaultScheme,
3073
                hostname => check_hostname(Hostname),
3074
                port => DefaultPort
3075
            };
3076
        {true, false} ->
3077
            #{
3078
                hostname => check_hostname(Hostname),
3079
                port => DefaultPort
3080
            };
3081
        {false, true} ->
3082
            #{
3083
                scheme => DefaultScheme,
3084
                hostname => check_hostname(Hostname)
3085
            };
3086
        {false, false} ->
3087
            #{hostname => check_hostname(Hostname)}
3088
    end;
3089
check_server_parts(_Tokens, _Context) ->
3090
    throw("bad_host_port").
3091

3092
check_scheme(Str, Opts) ->
3093
    SupportedSchemes = maps:get(supported_schemes, Opts, []),
3094
    IsSupported = lists:member(Str, SupportedSchemes),
3095
    case IsSupported of
3096
        true ->
3097
            Str;
3098
        false ->
3099
            throw("unsupported_scheme")
3100
    end.
3101

3102
check_hostname(Str) ->
3103
    %% not intended to use inet_parse:domain here
3104
    %% only checking space because it interferes the parsing
3105
    case string:tokens(Str, " ") of
3106
        [H] ->
3107
            case is_port_number(H) of
3108
                true ->
3109
                    throw("expecting_hostname_but_got_a_number");
3110
                false ->
3111
                    H
3112
            end;
3113
        _ ->
3114
            throw("hostname_has_space")
3115
    end.
3116

3117
convert_hocon_map_host_port(Map) ->
3118
    lists:map(
3119
        fun({Host, Port}) ->
3120
            %% Only when Host:Port string is a valid HOCON object
3121
            %% is it possible for the converter to reach here.
3122
            %%
3123
            %% For example EMQX_FOO__SERVER='1.2.3.4:1234' is parsed as
3124
            %% a HOCON string value "1.2.3.4:1234" but not a map because
3125
            %% 1 is not a valid HOCON field.
3126
            %%
3127
            %% EMQX_FOO__SERVER="local.domain.host" (without ':port')
3128
            %% is also not a valid HOCON object (because it has no value),
3129
            %% hence parsed as string.
3130
            true = (Port > 0),
3131
            str(Host) ++ ":" ++ integer_to_list(Port)
3132
        end,
3133
        hocon_maps:flatten(Map, #{})
3134
    ).
3135

3136
is_port_number(Port) ->
3137
    try
3138
        _ = parse_port(Port),
3139
        true
3140
    catch
3141
        _:_ ->
3142
            false
3143
    end.
3144

3145
parse_port(Port) ->
3146
    case string:to_integer(string:strip(Port)) of
3147
        {P, ""} when P < 0 -> throw("port_number_must_be_positive");
3148
        {P, ""} when P > 65535 -> throw("port_number_too_large");
3149
        {P, ""} -> P;
3150
        _ -> throw("bad_port_number")
3151
    end.
3152

3153
quic_feature_toggle(Desc) ->
3154
    sc(
3155
        %% true, false are for user facing
3156
        %% 0, 1 are for internal representation
3157
        typerefl:alias("boolean", typerefl:union([true, false, 0, 1])),
3158
        #{
3159
            desc => Desc,
3160
            importance => ?IMPORTANCE_HIDDEN,
3161
            required => false,
3162
            converter => fun
3163
                (Val, #{make_serializable := true}) -> Val;
3164
                (true, _Opts) -> 1;
3165
                (false, _Opts) -> 0;
3166
                (Other, _Opts) -> Other
3167
            end
3168
        }
3169
    ).
3170

3171
quic_lowlevel_settings_uint(Low, High, Desc) ->
3172
    sc(
3173
        range(Low, High),
3174
        #{
3175
            required => false,
3176
            importance => ?IMPORTANCE_HIDDEN,
3177
            desc => Desc
3178
        }
3179
    ).
3180

3181
-spec is_quic_ssl_opts(string()) -> boolean().
3182
is_quic_ssl_opts(Name) ->
3183
    lists:member(Name, [
3184
        "cacertfile",
3185
        "certfile",
3186
        "keyfile",
3187
        "verify",
3188
        "password"
3189
        %% Followings are planned
3190
        %% , "hibernate_after"
3191
        %% , "fail_if_no_peer_cert"
3192
        %% , "handshake_timeout"
3193
        %% , "gc_after_handshake"
3194
    ]).
3195

3196
assert_required_field(Conf, Key, ErrorMessage) ->
3197
    case maps:get(Key, Conf, undefined) of
3198
        undefined ->
3199
            throw(ErrorMessage);
3200
        _ ->
3201
            ok
3202
    end.
3203

3204
default_listener(tcp) ->
3205
    #{
3206
        <<"bind">> => <<"0.0.0.0:1883">>
3207
    };
3208
default_listener(ws) ->
3209
    #{
3210
        <<"bind">> => <<"0.0.0.0:8083">>,
3211
        <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
3212
    };
3213
default_listener(SSLListener) ->
3214
    %% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate
3215
    SslOptions = #{
3216
        <<"cacertfile">> => cert_file(<<"cacert.pem">>, server),
3217
        <<"certfile">> => cert_file(<<"cert.pem">>, server),
3218
        <<"keyfile">> => cert_file(<<"key.pem">>, server)
3219
    },
3220
    case SSLListener of
3221
        ssl ->
3222
            #{
3223
                <<"bind">> => <<"0.0.0.0:8883">>,
3224
                <<"ssl_options">> => SslOptions
3225
            };
3226
        wss ->
3227
            #{
3228
                <<"bind">> => <<"0.0.0.0:8084">>,
3229
                <<"ssl_options">> => SslOptions,
3230
                <<"websocket">> => #{<<"mqtt_path">> => <<"/mqtt">>}
3231
            }
3232
    end.
3233

3234
%% @doc This function helps to perform a naive string interpolation which
3235
%% only looks at the first segment of the string and tries to replace it.
3236
%% For example
3237
%%  "$MY_FILE_PATH"
3238
%%  "${MY_FILE_PATH}"
3239
%%  "$ENV_VARIABLE/sub/path"
3240
%%  "${ENV_VARIABLE}/sub/path"
3241
%%  "${ENV_VARIABLE}\sub\path" # windows
3242
%% This function returns undefined if the input is undefined
3243
%% otherwise always return string.
3244
naive_env_interpolation(undefined) ->
3245
    undefined;
3246
naive_env_interpolation(Bin) when is_binary(Bin) ->
3247
    naive_env_interpolation(unicode:characters_to_list(Bin, utf8));
3248
naive_env_interpolation("$" ++ Maybe = Original) ->
3249
    {Env, Tail} = split_path(Maybe),
3250
    case resolve_env(Env) of
3251
        {ok, Path} ->
3252
            filename:join([Path, Tail]);
3253
        error ->
3254
            ?SLOG(warning, #{
3255
                msg => "cannot_resolve_env_variable",
3256
                env => Env,
3257
                original => Original
3258
            }),
3259
            Original
3260
    end;
3261
naive_env_interpolation(Other) ->
3262
    Other.
3263

3264
split_path(Path) ->
3265
    split_path(Path, []).
3266

3267
split_path([], Acc) ->
3268
    {lists:reverse(Acc), []};
3269
split_path([Char | Rest], Acc) when Char =:= $/ orelse Char =:= $\\ ->
3270
    {lists:reverse(Acc), string:trim(Rest, leading, "/\\")};
3271
split_path([Char | Rest], Acc) ->
3272
    split_path(Rest, [Char | Acc]).
3273

3274
resolve_env(Name0) ->
3275
    Name = string:trim(Name0, both, "{}"),
3276
    Value = os:getenv(Name),
3277
    case Value =/= false andalso Value =/= "" of
3278
        true ->
3279
            {ok, Value};
3280
        false ->
3281
            special_env(Name)
3282
    end.
3283

3284
-ifdef(TEST).
3285
%% when running tests, we need to mock the env variables
3286
special_env("EMQX_ETC_DIR") ->
3287
    {ok, filename:join([code:lib_dir(emqx), etc])};
3288
special_env("EMQX_LOG_DIR") ->
3289
    {ok, "log"};
3290
special_env(_Name) ->
3291
    %% only in tests
3292
    error.
3293
-else.
3294
special_env(_Name) -> error.
3295
-endif.
3296

3297
%% The tombstone atom.
3298
tombstone() ->
3299
    ?TOMBSTONE_TYPE.
3300

3301
%% Make a map type, the value of which is allowed to be 'marked_for_deletion'
3302
%% 'marked_for_delition' is a special value which means the key is deleted.
3303
%% This is used to support the 'delete' operation in configs,
3304
%% since deleting the key would result in default value being used.
3305
tombstone_map(Name, Type) ->
3306
    %% marked_for_deletion must be the last member of the union
3307
    %% because we need to first union member to populate the default values
3308
    map(
3309
        Name,
3310
        hoconsc:union(
3311
            fun
3312
                (all_union_members) ->
3313
                    [Type, ?TOMBSTONE_TYPE];
3314
                ({value, V}) when is_map(V) ->
3315
                    [Type];
3316
                ({value, _}) ->
3317
                    [?TOMBSTONE_TYPE]
3318
            end
3319
        )
3320
    ).
3321

3322
%% inverse of mark_del_map
3323
get_tombstone_map_value_type(Schema) ->
3324
    %% TODO: violation of abstraction, expose an API in hoconsc
3325
    %% hoconsc:map_value_type(Schema)
3326
    ?MAP(_Name, Union) = hocon_schema:field_schema(Schema, type),
3327
    %% TODO: violation of abstraction, fix hoconsc:union_members/1
3328
    ?UNION(Members, _) = Union,
3329
    Tombstone = tombstone(),
3330
    [Type, Tombstone] = hoconsc:union_members(Members),
3331
    Type.
3332

3333
%% Keep the 'default' tombstone, but delete others.
3334
keep_default_tombstone(Map, _Opts) when is_map(Map) ->
3335
    maps:filter(
3336
        fun(Key, Value) ->
3337
            Key =:= <<"default">> orelse Value =/= ?TOMBSTONE_VALUE
3338
        end,
3339
        Map
3340
    );
3341
keep_default_tombstone(Value, _Opts) ->
3342
    Value.
3343

3344
ensure_default_listener(undefined, ListenerType) ->
3345
    %% let the schema's default value do its job
3346
    #{<<"default">> => default_listener(ListenerType)};
3347
ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) ->
3348
    keep_default_tombstone(Map, #{});
3349
ensure_default_listener(Map, ListenerType) ->
3350
    NewMap = Map#{<<"default">> => default_listener(ListenerType)},
3351
    keep_default_tombstone(NewMap, #{}).
3352

3353
cert_file(_File, client) -> undefined;
3354
cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
3355

3356
mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
3357
    case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of
3358
        false ->
3359
            %% Multiplier is provided, and it's not default value
3360
            Mqtt;
3361
        true ->
3362
            %% Multiplier is default value, fallback to use Backoff value
3363
            %% Backoff default value was half of Multiplier default value
3364
            %% so there is no need to compare Backoff with its default.
3365
            Backoff = maps:get(<<"keepalive_backoff">>, Mqtt, ?DEFAULT_BACKOFF),
3366
            Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}
3367
    end;
3368
mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
3369
    Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
3370
mqtt_converter(Mqtt, _Opts) ->
3371
    Mqtt.
3372

3373
%% For backward compatibility with window_time is disable
3374
flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) ->
3375
    Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
3376
flapping_detect_converter(Conf, _Opts) ->
3377
    Conf.
3378

3379
mqtt_general() ->
3380
    [
3381
        {"idle_timeout",
3382
            sc(
3383
                hoconsc:union([infinity, duration()]),
3384
                #{
3385
                    default => <<"15s">>,
3386
                    desc => ?DESC(mqtt_idle_timeout)
3387
                }
3388
            )},
3389
        {"max_packet_size",
3390
            sc(
3391
                bytesize(),
3392
                #{
3393
                    default => <<"1MB">>,
3394
                    validator => fun ?MODULE:validate_packet_size/1,
3395
                    desc => ?DESC(mqtt_max_packet_size)
3396
                }
3397
            )},
3398
        {"max_clientid_len",
3399
            sc(
3400
                range(23, 65535),
3401
                #{
3402
                    default => 65535,
3403
                    desc => ?DESC(mqtt_max_clientid_len)
3404
                }
3405
            )},
3406
        {"max_topic_levels",
3407
            sc(
3408
                range(1, 65535),
3409
                #{
3410
                    default => 128,
3411
                    desc => ?DESC(mqtt_max_topic_levels)
3412
                }
3413
            )},
3414
        {"max_topic_alias",
3415
            sc(
3416
                range(0, 65535),
3417
                #{
3418
                    default => 65535,
3419
                    desc => ?DESC(mqtt_max_topic_alias)
3420
                }
3421
            )},
3422
        {"retain_available",
3423
            sc(
3424
                boolean(),
3425
                #{
3426
                    default => true,
3427
                    desc => ?DESC(mqtt_retain_available)
3428
                }
3429
            )},
3430
        {"wildcard_subscription",
3431
            sc(
3432
                boolean(),
3433
                #{
3434
                    default => true,
3435
                    desc => ?DESC(mqtt_wildcard_subscription)
3436
                }
3437
            )},
3438
        {"shared_subscription",
3439
            sc(
3440
                boolean(),
3441
                #{
3442
                    default => true,
3443
                    desc => ?DESC(mqtt_shared_subscription)
3444
                }
3445
            )},
3446
        {"shared_subscription_strategy",
3447
            sc(
3448
                hoconsc:enum([
3449
                    random,
3450
                    round_robin,
3451
                    round_robin_per_group,
3452
                    sticky,
3453
                    local,
3454
                    hash_topic,
3455
                    hash_clientid
3456
                ]),
3457
                #{
3458
                    default => round_robin,
3459
                    desc => ?DESC(mqtt_shared_subscription_strategy)
3460
                }
3461
            )},
3462
        {"exclusive_subscription",
3463
            sc(
3464
                boolean(),
3465
                #{
3466
                    default => false,
3467
                    desc => ?DESC(mqtt_exclusive_subscription)
3468
                }
3469
            )},
3470
        {"ignore_loop_deliver",
3471
            sc(
3472
                boolean(),
3473
                #{
3474
                    default => false,
3475
                    desc => ?DESC(mqtt_ignore_loop_deliver)
3476
                }
3477
            )},
3478
        {"strict_mode",
3479
            sc(
3480
                boolean(),
3481
                #{
3482
                    default => false,
3483
                    desc => ?DESC(mqtt_strict_mode)
3484
                }
3485
            )},
3486
        {"response_information",
3487
            sc(
3488
                string(),
3489
                #{
3490
                    default => <<"">>,
3491
                    desc => ?DESC(mqtt_response_information)
3492
                }
3493
            )},
3494
        {"server_keepalive",
3495
            sc(
3496
                hoconsc:union([pos_integer(), disabled]),
3497
                #{
3498
                    default => disabled,
3499
                    desc => ?DESC(mqtt_server_keepalive)
3500
                }
3501
            )},
3502
        {"keepalive_backoff",
3503
            sc(
3504
                number(),
3505
                #{
3506
                    default => ?DEFAULT_BACKOFF,
3507
                    %% Must add required => false, zone schema has no default.
3508
                    required => false,
3509
                    importance => ?IMPORTANCE_HIDDEN
3510
                }
3511
            )},
3512
        {"keepalive_multiplier",
3513
            sc(
3514
                number(),
3515
                #{
3516
                    default => ?DEFAULT_MULTIPLIER,
3517
                    validator => fun ?MODULE:validate_keepalive_multiplier/1,
3518
                    desc => ?DESC(mqtt_keepalive_multiplier)
3519
                }
3520
            )},
3521
        {"retry_interval",
3522
            sc(
3523
                duration(),
3524
                #{
3525
                    default => <<"30s">>,
3526
                    desc => ?DESC(mqtt_retry_interval)
3527
                }
3528
            )},
3529
        {"use_username_as_clientid",
3530
            sc(
3531
                boolean(),
3532
                #{
3533
                    default => false,
3534
                    desc => ?DESC(mqtt_use_username_as_clientid)
3535
                }
3536
            )},
3537
        {"peer_cert_as_username",
3538
            sc(
3539
                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
3540
                #{
3541
                    default => disabled,
3542
                    desc => ?DESC(mqtt_peer_cert_as_username)
3543
                }
3544
            )},
3545
        {"peer_cert_as_clientid",
3546
            sc(
3547
                hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
3548
                #{
3549
                    default => disabled,
3550
                    desc => ?DESC(mqtt_peer_cert_as_clientid)
3551
                }
3552
            )},
3553
        {"client_attrs_init",
3554
            sc(
3555
                hoconsc:union([disabled, ref("client_attrs_init")]),
3556
                #{
3557
                    default => disabled,
3558
                    desc => ?DESC("client_attrs_init")
3559
                }
3560
            )}
3561
    ].
3562
%% All session's importance should be lower than general part to organize document.
3563
mqtt_session() ->
3564
    [
3565
        {"session_expiry_interval",
3566
            sc(
3567
                duration(),
3568
                #{
3569
                    default => <<"2h">>,
3570
                    desc => ?DESC(mqtt_session_expiry_interval),
3571
                    importance => ?IMPORTANCE_LOW
3572
                }
3573
            )},
3574
        {"message_expiry_interval",
3575
            sc(
3576
                hoconsc:union([duration(), infinity]),
3577
                #{
3578
                    default => infinity,
3579
                    desc => ?DESC(mqtt_message_expiry_interval),
3580
                    importance => ?IMPORTANCE_LOW
3581
                }
3582
            )},
3583
        {"max_awaiting_rel",
3584
            sc(
3585
                hoconsc:union([non_neg_integer(), infinity]),
3586
                #{
3587
                    default => 100,
3588
                    desc => ?DESC(mqtt_max_awaiting_rel),
3589
                    importance => ?IMPORTANCE_LOW
3590
                }
3591
            )},
3592
        {"max_qos_allowed",
3593
            sc(
3594
                qos(),
3595
                #{
3596
                    default => 2,
3597
                    desc => ?DESC(mqtt_max_qos_allowed),
3598
                    importance => ?IMPORTANCE_LOW
3599
                }
3600
            )},
3601
        {"mqueue_priorities",
3602
            sc(
3603
                hoconsc:union([disabled, map()]),
3604
                #{
3605
                    default => disabled,
3606
                    desc => ?DESC(mqtt_mqueue_priorities),
3607
                    importance => ?IMPORTANCE_LOW
3608
                }
3609
            )},
3610
        {"mqueue_default_priority",
3611
            sc(
3612
                hoconsc:enum([highest, lowest]),
3613
                #{
3614
                    default => lowest,
3615
                    desc => ?DESC(mqtt_mqueue_default_priority),
3616
                    importance => ?IMPORTANCE_LOW
3617
                }
3618
            )},
3619
        {"mqueue_store_qos0",
3620
            sc(
3621
                boolean(),
3622
                #{
3623
                    default => true,
3624
                    desc => ?DESC(mqtt_mqueue_store_qos0),
3625
                    importance => ?IMPORTANCE_LOW
3626
                }
3627
            )},
3628
        {"max_mqueue_len",
3629
            sc(
3630
                hoconsc:union([non_neg_integer(), infinity]),
3631
                #{
3632
                    default => 1000,
3633
                    desc => ?DESC(mqtt_max_mqueue_len),
3634
                    importance => ?IMPORTANCE_LOW
3635
                }
3636
            )},
3637
        {"max_inflight",
3638
            sc(
3639
                range(1, 65535),
3640
                #{
3641
                    default => 32,
3642
                    desc => ?DESC(mqtt_max_inflight),
3643
                    importance => ?IMPORTANCE_LOW
3644
                }
3645
            )},
3646
        {"max_subscriptions",
3647
            sc(
3648
                hoconsc:union([range(1, inf), infinity]),
3649
                #{
3650
                    default => infinity,
3651
                    desc => ?DESC(mqtt_max_subscriptions),
3652
                    importance => ?IMPORTANCE_LOW
3653
                }
3654
            )},
3655
        {"upgrade_qos",
3656
            sc(
3657
                boolean(),
3658
                #{
3659
                    default => false,
3660
                    desc => ?DESC(mqtt_upgrade_qos),
3661
                    importance => ?IMPORTANCE_LOW
3662
                }
3663
            )},
3664
        {"await_rel_timeout",
3665
            sc(
3666
                duration(),
3667
                #{
3668
                    default => <<"300s">>,
3669
                    desc => ?DESC(mqtt_await_rel_timeout),
3670
                    importance => ?IMPORTANCE_LOW
3671
                }
3672
            )}
3673
    ].
3674

3675
default_mem_check_interval() ->
3676
    case emqx_os_mon:is_os_check_supported() of
3677
        true -> <<"60s">>;
3678
        false -> disabled
3679
    end.
3680

3681
description_schema() ->
3682
    sc(
3683
        string(),
3684
        #{
3685
            default => <<"">>,
3686
            desc => ?DESC(description),
3687
            required => false,
3688
            importance => ?IMPORTANCE_LOW
3689
        }
3690
    ).
3691

3692
tags_schema() ->
3693
    sc(
3694
        hoconsc:array(binary()),
3695
        #{
3696
            desc => ?DESC(resource_tags),
3697
            required => false,
3698
            importance => ?IMPORTANCE_LOW
3699
        }
3700
    ).
3701

3702
ensure_unicode_path(undefined, _) ->
3703
    undefined;
3704
ensure_unicode_path(Path, #{make_serializable := true}) ->
3705
    %% format back to serializable string
3706
    unicode:characters_to_binary(Path, utf8);
3707
ensure_unicode_path(Path, Opts) when is_binary(Path) ->
3708
    case unicode:characters_to_list(Path, utf8) of
3709
        {R, _, _} when R =:= error orelse R =:= incomplete ->
3710
            throw({"bad_file_path_string", Path});
3711
        PathStr ->
3712
            ensure_unicode_path(PathStr, Opts)
3713
    end;
3714
ensure_unicode_path(Path, _) when is_list(Path) ->
3715
    Path;
3716
ensure_unicode_path(Path, _) ->
3717
    throw({"not_string", Path}).
3718

3719
listeners() ->
3720
    [
3721
        {"tcp",
3722
            sc(
3723
                tombstone_map(name, ref("mqtt_tcp_listener")),
3724
                #{
3725
                    desc => ?DESC(fields_listeners_tcp),
3726
                    converter => fun(X, _) ->
3727
                        ensure_default_listener(X, tcp)
3728
                    end,
3729
                    required => {false, recursively}
3730
                }
3731
            )},
3732
        {"ssl",
3733
            sc(
3734
                tombstone_map(name, ref("mqtt_ssl_listener")),
3735
                #{
3736
                    desc => ?DESC(fields_listeners_ssl),
3737
                    converter => fun(X, _) -> ensure_default_listener(X, ssl) end,
3738
                    required => {false, recursively}
3739
                }
3740
            )},
3741
        {"ws",
3742
            sc(
3743
                tombstone_map(name, ref("mqtt_ws_listener")),
3744
                #{
3745
                    desc => ?DESC(fields_listeners_ws),
3746
                    converter => fun(X, _) -> ensure_default_listener(X, ws) end,
3747
                    required => {false, recursively}
3748
                }
3749
            )},
3750
        {"wss",
3751
            sc(
3752
                tombstone_map(name, ref("mqtt_wss_listener")),
3753
                #{
3754
                    desc => ?DESC(fields_listeners_wss),
3755
                    converter => fun(X, _) -> ensure_default_listener(X, wss) end,
3756
                    required => {false, recursively}
3757
                }
3758
            )},
3759
        {"quic",
3760
            sc(
3761
                tombstone_map(name, ref("mqtt_quic_listener")),
3762
                #{
3763
                    desc => ?DESC(fields_listeners_quic),
3764
                    converter => fun keep_default_tombstone/2,
3765
                    required => {false, recursively}
3766
                }
3767
            )}
3768
    ].
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc