• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.17
/apps/emqx_gateway/src/emqx_gateway_schema.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_gateway_schema).
18

19
-behaviour(hocon_schema).
20

21
-dialyzer(no_return).
22
-dialyzer(no_match).
23
-dialyzer(no_contracts).
24
-dialyzer(no_unused).
25
-dialyzer(no_fail_call).
26

27
-include_lib("hocon/include/hoconsc.hrl").
28
-include_lib("typerefl/include/types.hrl").
29
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
30

31
-type ip_port() :: tuple() | integer().
32
-type duration() :: non_neg_integer().
33
-type duration_s() :: non_neg_integer().
34
-type bytesize() :: pos_integer().
35

36
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
37
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
38
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
39
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
40

41
-reflect_type([
42
    duration/0,
43
    duration_s/0,
44
    bytesize/0,
45
    ip_port/0
46
]).
47
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
48
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
49

50
-export([namespace/0, roots/0, fields/1, desc/1, tags/0]).
51

52
-export([proxy_protocol_opts/0]).
53

54
-export([mountpoint/0, mountpoint/1, gateway_common_options/0, gateway_schema/1, gateway_names/0]).
55

56
-export([ws_listener/0, wss_listener/0, ws_opts/2]).
57

58
namespace() -> gateway.
1,875✔
59

60
tags() ->
61
    [<<"Gateway">>].
952✔
62

63
roots() ->
64
    [{gateway, sc(ref(?MODULE, gateway), #{importance => ?IMPORTANCE_LOW})}].
20,203✔
65

66
fields(gateway) ->
67
    lists:map(
654✔
68
        fun(#{name := Name, config_schema_module := Mod}) ->
69
            {Name,
5,232✔
70
                sc(
71
                    ref(Mod, Name),
72
                    #{
73
                        required => {false, recursively},
74
                        desc => ?DESC(Name)
75
                    }
76
                )}
77
        end,
78
        emqx_gateway_utils:find_gateway_definitions()
79
    );
80
fields(clientinfo_override) ->
81
    [
507✔
82
        {username, sc(binary(), #{desc => ?DESC(gateway_common_clientinfo_override_username)})},
83
        {password,
84
            sc(binary(), #{
85
                desc => ?DESC(gateway_common_clientinfo_override_password),
86
                sensitive => true,
87
                format => <<"password">>,
88
                converter => fun emqx_schema:password_converter/2
89
            })},
90
        {clientid, sc(binary(), #{desc => ?DESC(gateway_common_clientinfo_override_clientid)})}
91
    ];
92
fields(udp_listeners) ->
93
    [
×
94
        {udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
95
        {dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
96
    ];
97
fields(tcp_listeners) ->
98
    [
96✔
99
        {tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
100
        {ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
101
    ];
102
fields(tcp_udp_listeners) ->
103
    [
×
104
        {tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
105
        {ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
106
        {udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
107
        {dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
108
    ];
109
fields(tcp_listener) ->
110
    %% some special configs for tcp listener
111
    [
112
        {acceptors, sc(integer(), #{default => 16, desc => ?DESC(tcp_listener_acceptors)})}
113
    ] ++
393✔
114
        tcp_opts() ++
115
        proxy_protocol_opts() ++
116
        common_listener_opts();
117
fields(ssl_listener) ->
118
    fields(tcp_listener) ++
150✔
119
        [
120
            {ssl_options,
121
                sc(
122
                    hoconsc:ref(emqx_schema, "listener_ssl_opts"),
123
                    #{
124
                        desc => ?DESC(ssl_listener_options),
125
                        validator => fun emqx_schema:validate_server_ssl_opts/1
126
                    }
127
                )}
128
        ];
129
fields(ws_listener) ->
130
    emqx_gateway_schema:ws_listener() ++
50✔
131
        [{websocket, sc(ref(websocket), #{})}];
132
fields(wss_listener) ->
133
    emqx_gateway_schema:wss_listener() ++
49✔
134
        [{websocket, sc(ref(websocket), #{})}];
135
fields(websocket) ->
136
    DefaultPath = <<>>,
97✔
137
    SubProtocols = <<>>,
97✔
138
    emqx_gateway_schema:ws_opts(DefaultPath, SubProtocols);
97✔
139
fields(udp_listener) ->
140
    [
141
        %% some special configs for udp listener
142
    ] ++
299✔
143
        udp_opts() ++
144
        common_listener_opts();
145
fields(dtls_listener) ->
146
    [{acceptors, sc(integer(), #{default => 16, desc => ?DESC(dtls_listener_acceptors)})}] ++
150✔
147
        fields(udp_listener) ++
148
        [
149
            {dtls_options,
150
                sc(ref(dtls_opts), #{
151
                    desc => ?DESC(dtls_listener_dtls_opts),
152
                    validator => fun emqx_schema:validate_server_ssl_opts/1
153
                })}
154
        ];
155
fields(udp_opts) ->
156
    [
290✔
157
        {active_n,
158
            sc(
159
                integer(),
160
                #{
161
                    default => 100,
162
                    desc => ?DESC(udp_listener_active_n)
163
                }
164
            )},
165
        {recbuf, sc(bytesize(), #{desc => ?DESC(udp_listener_recbuf)})},
166
        {sndbuf, sc(bytesize(), #{desc => ?DESC(udp_listener_sndbuf)})},
167
        {buffer, sc(bytesize(), #{desc => ?DESC(udp_listener_buffer)})},
168
        {reuseaddr, sc(boolean(), #{default => true, desc => ?DESC(udp_listener_reuseaddr)})}
169
    ];
170
fields(dtls_opts) ->
171
    emqx_schema:server_ssl_opts_schema(
145✔
172
        #{
173
            depth => 10,
174
            reuse_sessions => true,
175
            versions => dtls_all_available
176
        },
177
        _IsRanchListener = false
178
    ).
179

180
desc(gateway) ->
181
    "EMQX Gateway configuration root.";
654✔
182
desc(clientinfo_override) ->
183
    "ClientInfo override.";
107✔
184
desc(udp_listeners) ->
185
    "Settings for the UDP listeners.";
×
186
desc(tcp_listeners) ->
187
    "Settings for the TCP listeners.";
96✔
188
desc(tcp_udp_listeners) ->
189
    "Settings for TCP and UDP listeners.";
×
190
desc(tcp_listener) ->
191
    "Settings for TCP listener.";
94✔
192
desc(ssl_listener) ->
193
    "Settings for SSL listener.";
×
194
desc(udp_listener) ->
195
    "Settings for UDP listener.";
×
196
desc(dtls_listener) ->
197
    "Settings for DTLS listener.";
×
198
desc(udp_opts) ->
199
    "Settings for UDP sockets.";
×
200
desc(dtls_opts) ->
201
    "Settings for DTLS protocol.";
×
202
desc(websocket) ->
203
    "Websocket options";
1✔
204
desc(_) ->
205
    undefined.
×
206

207
authentication_schema() ->
208
    sc(
6,434✔
209
        emqx_authn_schema:authenticator_type(),
210
        #{
211
            required => {false, recursively},
212
            desc => ?DESC(gateway_common_authentication),
213
            %% we do not expose this to the user for now
214
            importance => ?IMPORTANCE_HIDDEN,
215
            examples => emqx_authn_api:authenticator_examples()
216
        }
217
    ).
218

219
gateway_common_options() ->
220
    [
5,632✔
221
        {enable,
222
            sc(
223
                boolean(),
224
                #{
225
                    default => true,
226
                    desc => ?DESC(gateway_common_enable)
227
                }
228
            )},
229
        {enable_stats,
230
            sc(
231
                boolean(),
232
                #{
233
                    default => true,
234
                    desc => ?DESC(gateway_common_enable_stats)
235
                }
236
            )},
237
        {idle_timeout,
238
            sc(
239
                duration(),
240
                #{
241
                    default => <<"30s">>,
242
                    desc => ?DESC(gateway_common_idle_timeout)
243
                }
244
            )},
245
        {clientinfo_override,
246
            sc(
247
                ref(clientinfo_override),
248
                #{desc => ?DESC(gateway_common_clientinfo_override)}
249
            )},
250
        {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()}
251
    ].
252

253
mountpoint() ->
254
    mountpoint(<<"">>).
2,816✔
255
mountpoint(Default) ->
256
    sc(
5,632✔
257
        binary(),
258
        #{
259
            default => iolist_to_binary(Default),
260
            desc => ?DESC(gateway_mountpoint)
261
        }
262
    ).
263

264
ws_listener() ->
265
    [
266
        {acceptors, sc(integer(), #{default => 16, desc => ?DESC(tcp_listener_acceptors)})}
267
    ] ++
110✔
268
        tcp_opts() ++
269
        proxy_protocol_opts() ++
270
        common_listener_opts().
271

272
wss_listener() ->
273
    ws_listener() ++
49✔
274
        [
275
            {ssl_options,
276
                sc(
277
                    hoconsc:ref(emqx_schema, "listener_wss_opts"),
278
                    #{
279
                        desc => ?DESC(ssl_listener_options),
280
                        validator => fun emqx_schema:validate_server_ssl_opts/1
281
                    }
282
                )}
283
        ].
284

285
ws_opts(DefaultPath, DefaultSubProtocols) when
286
    is_binary(DefaultPath), is_binary(DefaultSubProtocols)
287
->
288
    [
108✔
289
        {"path",
290
            sc(
291
                string(),
292
                #{
293
                    default => DefaultPath,
294
                    desc => ?DESC(fields_ws_opts_path)
295
                }
296
            )},
297
        {"piggyback",
298
            sc(
299
                hoconsc:enum([single, multiple]),
300
                #{
301
                    default => single,
302
                    desc => ?DESC(fields_ws_opts_piggyback)
303
                }
304
            )},
305
        {"compress",
306
            sc(
307
                boolean(),
308
                #{
309
                    default => false,
310
                    desc => ?DESC(fields_ws_opts_compress)
311
                }
312
            )},
313
        {"idle_timeout",
314
            sc(
315
                duration(),
316
                #{
317
                    default => <<"7200s">>,
318
                    desc => ?DESC(fields_ws_opts_idle_timeout)
319
                }
320
            )},
321
        {"max_frame_size",
322
            sc(
323
                hoconsc:union([infinity, integer()]),
324
                #{
325
                    default => infinity,
326
                    desc => ?DESC(fields_ws_opts_max_frame_size)
327
                }
328
            )},
329
        {"fail_if_no_subprotocol",
UNCOV
330
            sc(
×
331
                boolean(),
332
                #{
333
                    default => true,
334
                    desc => ?DESC(fields_ws_opts_fail_if_no_subprotocol)
335
                }
336
            )},
337
        {"supported_subprotocols",
338
            sc(
339
                emqx_schema:comma_separated_list(),
340
                #{
341
                    default => DefaultSubProtocols,
342
                    desc => ?DESC(fields_ws_opts_supported_subprotocols)
343
                }
344
            )},
345
        {"check_origin_enable",
346
            sc(
347
                boolean(),
348
                #{
349
                    default => false,
350
                    desc => ?DESC(fields_ws_opts_check_origin_enable)
351
                }
352
            )},
353
        {"allow_origin_absence",
354
            sc(
355
                boolean(),
356
                #{
357
                    default => true,
358
                    desc => ?DESC(fields_ws_opts_allow_origin_absence)
359
                }
360
            )},
361
        {"check_origins",
362
            sc(
363
                emqx_schema:comma_separated_binary(),
364
                #{
365
                    default => <<"http://localhost:18083, http://127.0.0.1:18083">>,
366
                    desc => ?DESC(fields_ws_opts_check_origins)
367
                }
368
            )},
369
        {"proxy_address_header",
370
            sc(
371
                string(),
372
                #{
373
                    default => <<"x-forwarded-for">>,
374
                    desc => ?DESC(fields_ws_opts_proxy_address_header)
375
                }
376
            )},
377
        {"proxy_port_header",
378
            sc(
379
                string(),
380
                #{
381
                    default => <<"x-forwarded-port">>,
382
                    desc => ?DESC(fields_ws_opts_proxy_port_header)
383
                }
384
            )},
385
        {"deflate_opts",
386
            sc(
387
                ref(emqx_schema, "deflate_opts"),
388
                #{}
389
            )}
390
    ].
391

392
common_listener_opts() ->
393
    [
394
        {enable,
395
            sc(
396
                boolean(),
397
                #{
398
                    default => true,
399
                    desc => ?DESC(gateway_common_listener_enable)
400
                }
401
            )},
402
        {bind,
403
            sc(
404
                ip_port(),
405
                #{desc => ?DESC(gateway_common_listener_bind)}
406
            )},
407
        {max_connections,
408
            sc(
409
                hoconsc:union([pos_integer(), infinity]),
410
                #{
411
                    default => 1024,
412
                    desc => ?DESC(gateway_common_listener_max_connections)
413
                }
414
            )},
415
        {max_conn_rate,
416
            sc(
417
                integer(),
418
                #{
419
                    default => 1000,
420
                    desc => ?DESC(gateway_common_listener_max_conn_rate)
421
                }
422
            )},
423
        {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()},
424
        {"enable_authn",
425
            sc(
426
                boolean(),
427
                #{
428
                    desc => ?DESC(gateway_common_listener_enable_authn),
429
                    default => true
430
                }
431
            )},
432
        {mountpoint,
433
            sc(
434
                binary(),
435
                #{
436
                    default => undefined,
437
                    desc => ?DESC(gateway_mountpoint)
438
                }
439
            )},
440
        {access_rules,
441
            sc(
442
                hoconsc:array(string()),
443
                #{
444
                    default => [],
445
                    desc => ?DESC(gateway_common_listener_access_rules)
446
                }
447
            )}
448
    ].
449

450
tcp_opts() ->
451
    [{tcp_options, sc(ref(emqx_schema, "tcp_opts"), #{desc => ?DESC(tcp_listener_tcp_opts)})}].
452

453
udp_opts() ->
454
    [{udp_options, sc(ref(udp_opts), #{})}].
455

456
proxy_protocol_opts() ->
457
    [
458
        {proxy_protocol,
459
            sc(
460
                boolean(),
461
                #{
462
                    default => false,
463
                    desc => ?DESC(tcp_listener_proxy_protocol)
464
                }
465
            )},
466
        {proxy_protocol_timeout,
467
            sc(
468
                duration(),
469
                #{
470
                    default => <<"3s">>,
471
                    desc => ?DESC(tcp_listener_proxy_protocol_timeout)
472
                }
473
            )}
474
    ].
475

476
%%--------------------------------------------------------------------
477
%% dynamic schemas
478

479
gateway_schema(Name) ->
480
    case emqx_gateway_utils:find_gateway_definition(Name) of
481
        {ok, #{config_schema_module := SchemaMod}} ->
482
            SchemaMod:fields(Name);
483
        {error, _} = Error ->
484
            throw(Error)
485
    end.
486

487
gateway_names() ->
488
    Definations = emqx_gateway_utils:find_gateway_definitions(),
489
    [
490
        Name
491
     || #{name := Name} = Defination <- Definations,
492
        emqx_gateway_utils:check_gateway_edition(Defination)
493
    ].
494
%%--------------------------------------------------------------------
495
%% helpers
496

497
sc(Type, Meta) ->
498
    hoconsc:mk(Type, Meta).
499

500
map(Name, Type) ->
501
    hoconsc:map(Name, Type).
502

503
ref(StructName) ->
504
    ref(?MODULE, StructName).
505

506
ref(Mod, Field) ->
507
    hoconsc:ref(Mod, Field).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc