• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 356

15 Dec 2023 02:41PM UTC coverage: 72.414%. First build
356

Pull #183

github

web-flow
Merge 8bfe0c31c into 5cb22a8b1
Pull Request #183: feat(listener): support changing options on the fly

191 of 213 new or added lines in 10 files covered. (89.67%)

840 of 1160 relevant lines covered (72.41%)

62.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.25
/src/esockd.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd).
18

19
-include("esockd.hrl").
20

21
-export([start/0]).
22

23
%% Core API
24
-export([ open/4
25
        , open_udp/4
26
        , open_dtls/4
27
        , close/2
28
        , close/1
29
        ]).
30

31
-export([ reopen/1
32
        , reopen/2
33
        ]).
34

35
-export([ child_spec/4
36
        , udp_child_spec/4
37
        , dtls_child_spec/4
38
        ]).
39

40
%% Management API
41
-export([ listeners/0
42
        , listener/1
43
        ]).
44

45
-export([ get_stats/1
46
        , get_options/1
47
        , set_options/2
48
        , get_acceptors/1
49
        ]).
50

51
-export([ get_max_connections/1
52
        , set_max_connections/2
53
        , get_current_connections/1
54
        , get_max_conn_rate/1
55
        , set_max_conn_rate/2
56
        ]).
57

58
-export([get_shutdown_count/1]).
59

60
%% Allow, Deny API
61
-export([ get_access_rules/1
62
        , allow/2
63
        , deny/2
64
        ]).
65

66
%% Utility functions
67
-export([ merge_opts/2
68
        , parse_opt/1
69
        , ulimit/0
70
        , fixaddr/1
71
        , to_string/1
72
        , format/1
73
        ]).
74

75
-export_type([ proto/0
76
             , transport/0
77
             , udp_transport/0
78
             , socket/0
79
             , sock_fun/0
80
             , mfargs/0
81
             , option/0
82
             , listen_on/0
83
             , listener_ref/0
84
             ]).
85

86
-type(proto() :: atom()).
87
-type(transport() :: module()).
88
-type(udp_transport() :: {udp | dtls, pid(), inet:socket()}).
89
-type(socket() :: esockd_transport:socket()).
90
-type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).
91
-type(sock_fun() :: {function(), list()}).
92
-type(conn_limit() :: map() | {pos_integer(), pos_integer()}).
93
-type(options() :: [option()]).
94
-type(option() :: {acceptors, pos_integer()}
95
                | {max_connections, pos_integer()}
96
                | {max_conn_rate, conn_limit()}
97
                | {access_rules, [esockd_access:rule()]}
98
                | {shutdown, brutal_kill | infinity | pos_integer()}
99
                | tune_buffer | {tune_buffer, boolean()}
100
                | proxy_protocol | {proxy_protocol, boolean()}
101
                | {proxy_protocol_timeout, timeout()}
102
                | {ssl_options, ssl_options()}
103
                | {tcp_options, [gen_tcp:listen_option()]}
104
                | {udp_options, [gen_udp:option()]}
105
                | {dtls_options, dtls_options()}).
106

107
-type(host() :: inet:ip_address() | string()).
108
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
109
-type ssl_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
110
-type dtls_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
111
-type listener_ref() :: {proto(), listen_on()}.
112

113
%%--------------------------------------------------------------------
114
%% APIs
115
%%--------------------------------------------------------------------
116

117
%% @doc Start esockd application.
118
-spec(start() -> ok).
119
start() ->
120
    {ok, _} = application:ensure_all_started(esockd), ok.
×
121

122
%%--------------------------------------------------------------------
123
%% Open & Close
124

125
%% @doc Open a TCP or SSL listener
126
-spec(open(atom(), listen_on(), [option()], mfargs()) -> {ok, pid()} | {error, term()}).
127
open(Proto, Port, Opts, MFA) when is_atom(Proto), is_integer(Port) ->
128
        esockd_sup:start_listener(Proto, Port, Opts, MFA);
42✔
129
open(Proto, {Host, Port}, Opts, MFA) when is_atom(Proto), is_integer(Port) ->
130
    {IPAddr, _Port} = fixaddr({Host, Port}),
2✔
131
    case proplists:get_value(ip, tcp_options(Opts)) of
2✔
132
        undefined -> ok;
2✔
133
        IPAddr    -> ok;
×
134
        Other     -> error({badmatch, Other})
×
135
    end,
136
        esockd_sup:start_listener(Proto, {IPAddr, Port}, Opts, MFA).
2✔
137

138
%% @private
139
tcp_options(Opts) ->
140
    proplists:get_value(tcp_options, Opts, []).
2✔
141

142
%% @doc Open a UDP listener
143
-spec(open_udp(atom(), listen_on(), [option()], mfargs())
144
     -> {ok, pid()}
145
      | {error, term()}).
146
open_udp(Proto, Port, Opts, MFA) ->
147
    esockd_sup:start_child(udp_child_spec(Proto, Port, Opts, MFA)).
7✔
148

149
%% @doc Open a DTLS listener
150
-spec(open_dtls(atom(), listen_on(), options(), mfargs())
151
     -> {ok, pid()}
152
      | {error, term()}).
153
open_dtls(Proto, ListenOn, Opts, MFA) ->
154
    esockd_sup:start_child(dtls_child_spec(Proto, ListenOn, Opts, MFA)).
10✔
155

156
%% @doc Close the listener
157
-spec(close({atom(), listen_on()}) -> ok | {error, term()}).
158
close({Proto, ListenOn}) when is_atom(Proto) ->
159
    close(Proto, ListenOn).
×
160

161
-spec(close(atom(), listen_on()) -> ok | {error, term()}).
162
close(Proto, ListenOn) when is_atom(Proto) ->
163
        esockd_sup:stop_listener(Proto, fixaddr(ListenOn)).
61✔
164

165
%% @doc Reopen the listener
166
-spec(reopen({atom(), listen_on()}) -> {ok, pid()} | {error, term()}).
167
reopen({Proto, ListenOn}) when is_atom(Proto) ->
168
    reopen(Proto, ListenOn).
3✔
169

170
-spec(reopen(atom(), listen_on()) -> {ok, pid()} | {error, term()}).
171
reopen(Proto, ListenOn) when is_atom(Proto) ->
172
    esockd_sup:restart_listener(Proto, fixaddr(ListenOn)).
3✔
173

174
%%--------------------------------------------------------------------
175
%% Spec funcs
176

177
%% @doc Create a Child spec for a TCP/SSL Listener. It is a convenient method
178
%% for creating a Child spec to hang on another Application supervisor.
179
-spec(child_spec(atom(), listen_on(), [option()], mfargs())
180
      -> supervisor:child_spec()).
181
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
182
    esockd_sup:child_spec(Proto, fixaddr(ListenOn), Opts, MFA).
1✔
183

184
%% @doc Create a Child spec for a UDP Listener.
185
-spec(udp_child_spec(atom(), listen_on(), options(), mfargs())
186
     -> supervisor:child_spec()).
187
udp_child_spec(Proto, Port, Opts, MFA) ->
188
    esockd_sup:udp_child_spec(Proto, fixaddr(Port), Opts, MFA).
8✔
189

190
%% @doc Create a Child spec for a DTLS Listener.
191
-spec(dtls_child_spec(atom(), listen_on(), options(), mfargs())
192
     -> supervisor:child_spec()).
193
dtls_child_spec(Proto, ListenOn, Opts, MFA) ->
194
    esockd_sup:dtls_child_spec(Proto, fixaddr(ListenOn), Opts, MFA).
11✔
195

196
%%--------------------------------------------------------------------
197
%% Get/Set APIs
198

199
%% @doc Get listeners.
200
-spec(listeners() -> [{{atom(), listen_on()}, pid()}]).
201
listeners() -> esockd_sup:listeners().
2✔
202

203
%% @doc Get one listener.
204
-spec(listener({atom(), listen_on()}) -> pid()).
205
listener({Proto, ListenOn}) when is_atom(Proto) ->
206
    esockd_sup:listener({Proto, fixaddr(ListenOn)}).
2✔
207

208
%% @doc Get stats
209
-spec(get_stats({atom(), listen_on()}) -> [{atom(), non_neg_integer()}]).
210
get_stats({Proto, ListenOn}) when is_atom(Proto) ->
211
    esockd_server:get_stats({Proto, fixaddr(ListenOn)}).
1✔
212

213
%% @doc Get options
214
-spec(get_options({atom(), listen_on()}) -> options()).
215
get_options({Proto, ListenOn}) when is_atom(Proto) ->
216
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, []).
6✔
217

218
%% @doc Set applicable options
219
%% If some options could not be set, either because they are not applicable or
220
%% because they require a listener restart, function returns an error.
221
-spec set_options({atom(), listen_on()}, options()) ->
222
    {ok, options()} | {error, _TODO}.
223
set_options({Proto, ListenOn}, Options) when is_atom(Proto) ->
224
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Options]).
5✔
225

226
%% @doc Get acceptors number
227
-spec(get_acceptors({atom(), listen_on()}) -> pos_integer()).
228
get_acceptors({Proto, ListenOn}) ->
229
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
9✔
230

231
%% @doc Get max connections
232
-spec(get_max_connections({atom(), listen_on()} | pid()) -> pos_integer()).
233
get_max_connections({Proto, ListenOn}) when is_atom(Proto) ->
234
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
7✔
235

236
%% @doc Set max connections
237
-spec(set_max_connections({atom(), listen_on()}, pos_integer()) -> ok).
238
set_max_connections({Proto, ListenOn}, MaxConns) when is_atom(Proto) ->
239
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).
3✔
240

241
%% @doc Set max connection rate
242
-spec(get_max_conn_rate({atom(), listen_on()}) -> conn_limit()).
243
get_max_conn_rate({Proto, ListenOn}) when is_atom(Proto) ->
244
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn]).
9✔
245

246
%% @doc Set max connection rate
247
-spec(set_max_conn_rate({atom(), listen_on()}, conn_limit()) -> ok).
248
set_max_conn_rate({Proto, ListenOn}, Opt) when is_atom(Proto) ->
249
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Opt]).
3✔
250

251
%% @doc Get current connections
252
-spec(get_current_connections({atom(), listen_on()}) -> non_neg_integer()).
253
get_current_connections({Proto, ListenOn}) when is_atom(Proto) ->
254
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
4✔
255

256
%% @doc Get shutdown count
257
-spec(get_shutdown_count({atom(), listen_on()}) -> pos_integer()).
258
get_shutdown_count({Proto, ListenOn}) when is_atom(Proto) ->
259
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
3✔
260

261
%% @doc Get access rules
262
-spec(get_access_rules({atom(), listen_on()}) -> [esockd_access:rule()]).
263
get_access_rules({Proto, ListenOn}) when is_atom(Proto) ->
264
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
9✔
265

266
%% @doc Allow access address
267
-spec(allow({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
268
allow({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
269
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
3✔
270

271
%% @doc Deny access address
272
-spec(deny({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
273
deny({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
274
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
3✔
275

276
%%--------------------------------------------------------------------
277
%% Utils
278

279
%% @doc Merge two options
280
-spec(merge_opts(proplists:proplist(), proplists:proplist())
281
      -> proplists:proplist()).
282
merge_opts(Opts1, Opts2) ->
283
    squash_opts(Opts1 ++ Opts2).
91✔
284

285
squash_opts([{Name, Value} | Rest]) ->
286
    Overrides = proplists:get_all_values(Name, Rest),
337✔
287
    Merged = lists:foldl(fun(O, V) -> merge_opt(Name, V, O) end, Value, Overrides),
337✔
288
    make_opt(Name, Merged) ++ squash_opts(proplists:delete(Name, Rest));
337✔
289
squash_opts([Name | Rest]) when is_atom(Name) ->
290
    [Name | squash_opts([Opt || Opt <- Rest, Opt =/= Name])];
35✔
291
squash_opts([]) ->
292
    [].
91✔
293

294
make_opt(_Name, undefined) -> [];
1✔
295
make_opt(Name, Value) -> [{Name, Value}].
336✔
296

297
merge_opt(ssl_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
3✔
NEW
298
merge_opt(tcp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
×
NEW
299
merge_opt(udp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
×
NEW
300
merge_opt(dtls_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
×
301
merge_opt(_, _Opt1, Opt2) -> Opt2.
38✔
302

303
%% @doc Parse option.
304
parse_opt(Options) ->
305
    parse_opt(Options, []).
1✔
306
parse_opt([], Acc) ->
307
    lists:reverse(Acc);
1✔
308
parse_opt([{acceptors, I}|Opts], Acc) when is_integer(I) ->
309
    parse_opt(Opts, [{acceptors, I}|Acc]);
1✔
310
parse_opt([{max_connections, I}|Opts], Acc) when is_integer(I) ->
311
    parse_opt(Opts, [{max_connections, I}|Acc]);
×
312
parse_opt([{max_conn_rate, Limit}|Opts], Acc) when Limit > 0 ->
313
    parse_opt(Opts, [{max_conn_rate, {Limit, 1}}|Acc]);
×
314
parse_opt([{max_conn_rate, {Limit, Period}}|Opts], Acc) when Limit > 0, Period >0 ->
315
    parse_opt(Opts, [{max_conn_rate, {Limit, Period}}|Acc]);
×
316
parse_opt([{access_rules, Rules}|Opts], Acc) ->
317
    parse_opt(Opts, [{access_rules, Rules}|Acc]);
×
318
parse_opt([{shutdown, I}|Opts], Acc) when I == brutal_kill; I == infinity; is_integer(I) ->
319
    parse_opt(Opts, [{shutdown, I}|Acc]);
×
320
parse_opt([tune_buffer|Opts], Acc) ->
321
    parse_opt(Opts, [{tune_buffer, true}|Acc]);
×
322
parse_opt([{tune_buffer, I}|Opts], Acc) when is_boolean(I) ->
323
    parse_opt(Opts, [{tune_buffer, I}|Acc]);
1✔
324
parse_opt([proxy_protocol|Opts], Acc) ->
325
    parse_opt(Opts, [{proxy_protocol, true}|Acc]);
×
326
parse_opt([{proxy_protocol, I}|Opts], Acc) when is_boolean(I) ->
327
    parse_opt(Opts, [{proxy_protocol, I}|Acc]);
1✔
328
parse_opt([{proxy_protocol_timeout, Timeout}|Opts], Acc) when is_integer(Timeout) ->
329
    parse_opt(Opts, [{proxy_protocol_timeout, Timeout}|Acc]);
×
330
parse_opt([{ssl_options, L}|Opts], Acc) when is_list(L) ->
331
    parse_opt(Opts, [{ssl_options, L}|Acc]);
1✔
332
parse_opt([{tcp_options, L}|Opts], Acc) when is_list(L) ->
333
    parse_opt(Opts, [{tcp_options, L}|Acc]);
×
334
parse_opt([{udp_options, L}|Opts], Acc) when is_list(L) ->
335
    parse_opt(Opts, [{udp_options, L}|Acc]);
×
336
parse_opt([{dtls_options, L}|Opts], Acc) when is_list(L) ->
337
    parse_opt(Opts, [{dtls_options, L}|Acc]);
×
338
parse_opt([_|Opts], Acc) ->
339
    parse_opt(Opts, Acc).
2✔
340

341
%% @doc System 'ulimit -n'
342
-spec(ulimit() -> pos_integer()).
343
ulimit() ->
344
    proplists:get_value(max_fds, hd(erlang:system_info(check_io))).
1✔
345

346
-spec(to_string(listen_on()) -> string()).
347
to_string(Port) when is_integer(Port) ->
348
    integer_to_list(Port);
1✔
349
to_string({Addr, Port}) ->
350
    format(fixaddr({Addr, Port})).
3✔
351

352
%% @doc Parse Address
353
fixaddr(Port) when is_integer(Port) ->
354
    Port;
84✔
355
fixaddr({Addr, Port}) when is_list(Addr), is_integer(Port) ->
356
    {ok, IPAddr} = inet:parse_address(Addr), {IPAddr, Port};
8✔
357
fixaddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) ->
358
    case esockd_cidr:is_ipv6(Addr) or esockd_cidr:is_ipv4(Addr) of
3✔
359
        true  -> {Addr, Port};
3✔
360
        false -> error({invalid_ipaddr, Addr})
×
361
    end.
362

363
-spec(format({inet:ip_address(), inet:port_number()}) -> string()).
364
format({Addr, Port}) ->
365
    inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port).
64✔
366

367
%%--------------------------------------------------------------------
368
%% Internal funcs
369
%%--------------------------------------------------------------------
370

371
with_listener({Proto, ListenOn}, Fun) ->
372
    with_listener({Proto, ListenOn}, Fun, []).
32✔
373

374
with_listener({Proto, ListenOn}, Fun, Args) ->
375
    case esockd_sup:listener_and_module({Proto, ListenOn}) of
47✔
376
        undefined ->
377
            error(not_found);
3✔
378
        {LSup, Mod} ->
379
            erlang:apply(Mod, Fun, [LSup | Args])
44✔
380
    end.
381

382
with_listener_ref(ListenerRef = {Proto, ListenOn}, Fun, Args) ->
383
    case esockd_sup:listener_and_module({Proto, ListenOn}) of
17✔
384
        undefined ->
385
            error(not_found);
3✔
386
        {LSup, Mod} ->
387
            erlang:apply(Mod, Fun, [ListenerRef, LSup | Args])
14✔
388
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc