• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 361

19 Dec 2023 09:18AM UTC coverage: 73.537%. First build
361

Pull #184

github

web-flow
Merge 55a256686 into 36a0b25ec
Pull Request #184: fix(setopts): pass only changed socket options to `setops/2`

53 of 54 new or added lines in 6 files covered. (98.15%)

867 of 1179 relevant lines covered (73.54%)

62.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/src/esockd.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd).
18

19
-include("esockd.hrl").
20

21
-export([start/0]).
22

23
%% Core API
24
-export([ open/3
25
        , open_udp/3
26
        , open_dtls/3
27
        , close/2
28
        , close/1
29
        %% Legacy API
30
        , open/4
31
        , open_udp/4
32
        , open_dtls/4
33
        ]).
34

35
-export([ reopen/1
36
        , reopen/2
37
        ]).
38

39
-export([ child_spec/3
40
        , udp_child_spec/3
41
        , dtls_child_spec/3
42
        %% Legacy API
43
        , child_spec/4
44
        , udp_child_spec/4
45
        , dtls_child_spec/4
46
        ]).
47

48
%% Management API
49
-export([ listeners/0
50
        , listener/1
51
        ]).
52

53
-export([ get_stats/1
54
        , get_options/1
55
        , set_options/2
56
        , get_acceptors/1
57
        ]).
58

59
-export([ get_max_connections/1
60
        , set_max_connections/2
61
        , get_current_connections/1
62
        , get_max_conn_rate/1
63
        , set_max_conn_rate/2
64
        ]).
65

66
-export([get_shutdown_count/1]).
67

68
%% Allow, Deny API
69
-export([ get_access_rules/1
70
        , allow/2
71
        , deny/2
72
        ]).
73

74
%% Utility functions
75
-export([ merge_opts/2
76
        , changed_opts/2
77
        , parse_opt/1
78
        , start_mfargs/3
79
        , ulimit/0
80
        , fixaddr/1
81
        , to_string/1
82
        , format/1
83
        ]).
84

85
-export_type([ proto/0
86
             , transport/0
87
             , udp_transport/0
88
             , socket/0
89
             , sock_fun/0
90
             , mfargs/0
91
             , option/0
92
             , listen_on/0
93
             , listener_ref/0
94
             ]).
95

96
-type(proto() :: atom()).
97
-type(transport() :: module()).
98
-type(udp_transport() :: {udp | dtls, pid(), inet:socket()}).
99
-type(socket() :: esockd_transport:socket()).
100
-type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).
101
-type(sock_fun() :: {function(), list()}).
102
-type(conn_limit() :: map() | {pos_integer(), pos_integer()}).
103
-type(options() :: [option()]).
104
-type(option() :: {acceptors, pos_integer()}
105
                | {max_connections, pos_integer()}
106
                | {max_conn_rate, conn_limit()}
107
                | {connection_mfargs, mfargs()}
108
                | {access_rules, [esockd_access:rule()]}
109
                | {shutdown, brutal_kill | infinity | pos_integer()}
110
                | tune_buffer | {tune_buffer, boolean()}
111
                | proxy_protocol | {proxy_protocol, boolean()}
112
                | {proxy_protocol_timeout, timeout()}
113
                | {ssl_options, ssl_options()}
114
                | {tcp_options, [gen_tcp:listen_option()]}
115
                | {udp_options, [gen_udp:option()]}
116
                | {dtls_options, dtls_options()}).
117

118
-type(host() :: inet:ip_address() | string()).
119
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
120
-type ssl_options() :: [ssl_custom_option() | ssl_option()].
121
-type dtls_options() :: [ssl_custom_option() | ssl_option()].
122
-type ssl_custom_option() :: {handshake_timeout, pos_integer()}
123
                           | {gc_after_handshake, boolean()}.
124
-type listener_ref() :: {proto(), listen_on()}.
125

126
%%--------------------------------------------------------------------
127
%% APIs
128
%%--------------------------------------------------------------------
129

130
%% @doc Start esockd application.
131
-spec(start() -> ok).
132
start() ->
133
    {ok, _} = application:ensure_all_started(esockd), ok.
×
134

135
%%--------------------------------------------------------------------
136
%% Open & Close
137

138
%% @doc Open a TCP or SSL listener
139
-spec open(atom(), listen_on(), options()) -> {ok, pid()} | {error, term()}.
140
open(Proto, Port, Opts) when is_atom(Proto), is_integer(Port) ->
141
        esockd_sup:start_child(child_spec(Proto, Port, Opts));
42✔
142
open(Proto, {Host, Port}, Opts) when is_atom(Proto), is_integer(Port) ->
143
    {IPAddr, _Port} = fixaddr({Host, Port}),
2✔
144
    case proplists:get_value(ip, tcp_options(Opts)) of
2✔
145
        undefined -> ok;
2✔
146
        IPAddr    -> ok;
×
147
        Other     -> error({badmatch, Other})
×
148
    end,
149
        esockd_sup:start_child(child_spec(Proto, {IPAddr, Port}, Opts)).
2✔
150

151
%% @private
152
tcp_options(Opts) ->
153
    proplists:get_value(tcp_options, Opts, []).
2✔
154

155
%% @doc Open a TCP or SSL listener
156
-spec open(atom(), listen_on(), [option()], mfargs()) -> {ok, pid()} | {error, term()}.
157
open(Proto, Port, Opts, MFA) ->
158
        open(Proto, Port, merge_mfargs(Opts, MFA)).
24✔
159

160
%% @doc Open a UDP listener
161
-spec open_udp(atom(), listen_on(), [option()])
162
      -> {ok, pid()} | {error, term()}.
163
open_udp(Proto, Port, Opts) ->
164
    esockd_sup:start_child(udp_child_spec(Proto, Port, Opts)).
7✔
165

166
%% @doc Open a UDP listener
167
-spec open_udp(atom(), listen_on(), [option()], mfargs())
168
      -> {ok, pid()} | {error, term()}.
169
open_udp(Proto, Port, Opts, MFA) ->
NEW
170
    open_udp(Proto, Port, merge_mfargs(Opts, MFA)).
×
171

172
%% @doc Open a DTLS listener
173
-spec open_dtls(atom(), listen_on(), options())
174
      -> {ok, pid()} | {error, term()}.
175
open_dtls(Proto, ListenOn, Opts) ->
176
    esockd_sup:start_child(dtls_child_spec(Proto, ListenOn, Opts)).
10✔
177

178
%% @doc Open a DTLS listener
179
-spec(open_dtls(atom(), listen_on(), options(), mfargs())
180
     -> {ok, pid()}
181
      | {error, term()}).
182
open_dtls(Proto, ListenOn, Opts, MFA) ->
183
    open_dtls(Proto, ListenOn, merge_mfargs(Opts, MFA)).
1✔
184

185
%% @doc Close the listener
186
-spec(close({atom(), listen_on()}) -> ok | {error, term()}).
187
close({Proto, ListenOn}) when is_atom(Proto) ->
188
    close(Proto, ListenOn).
×
189

190
-spec(close(atom(), listen_on()) -> ok | {error, term()}).
191
close(Proto, ListenOn) when is_atom(Proto) ->
192
        esockd_sup:stop_listener(Proto, fixaddr(ListenOn)).
61✔
193

194
%% @doc Reopen the listener
195
-spec(reopen({atom(), listen_on()}) -> {ok, pid()} | {error, term()}).
196
reopen({Proto, ListenOn}) when is_atom(Proto) ->
197
    reopen(Proto, ListenOn).
3✔
198

199
-spec(reopen(atom(), listen_on()) -> {ok, pid()} | {error, term()}).
200
reopen(Proto, ListenOn) when is_atom(Proto) ->
201
    esockd_sup:restart_listener(Proto, fixaddr(ListenOn)).
3✔
202

203
%%--------------------------------------------------------------------
204
%% Spec funcs
205

206
%% @doc Create a Child spec for a TCP/SSL Listener. It is a convenient method
207
%% for creating a Child spec to hang on another Application supervisor.
208
-spec child_spec(atom(), listen_on(), options())
209
      -> supervisor:child_spec().
210
child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
211
    esockd_sup:child_spec(Proto, fixaddr(ListenOn), Opts).
46✔
212

213
-spec child_spec(atom(), listen_on(), options(), mfargs())
214
      -> supervisor:child_spec().
215
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
216
    child_spec(Proto, ListenOn, merge_mfargs(Opts, MFA)).
1✔
217

218
%% @doc Create a Child spec for a UDP Listener.
219
-spec udp_child_spec(atom(), listen_on(), options())
220
     -> supervisor:child_spec().
221
udp_child_spec(Proto, Port, Opts) ->
222
    esockd_sup:udp_child_spec(Proto, fixaddr(Port), Opts).
9✔
223

224
%% @doc Create a Child spec for a UDP Listener.
225
-spec udp_child_spec(atom(), listen_on(), options(), mfargs())
226
     -> supervisor:child_spec().
227
udp_child_spec(Proto, Port, Opts, MFA) ->
228
    udp_child_spec(Proto, Port, merge_mfargs(Opts, MFA)).
1✔
229

230
%% @doc Create a Child spec for a DTLS Listener.
231
-spec dtls_child_spec(atom(), listen_on(), options())
232
     -> supervisor:child_spec().
233
dtls_child_spec(Proto, ListenOn, Opts) ->
234
    esockd_sup:dtls_child_spec(Proto, fixaddr(ListenOn), Opts).
12✔
235

236
%% @doc Create a Child spec for a DTLS Listener.
237
-spec dtls_child_spec(atom(), listen_on(), options(), mfargs())
238
     -> supervisor:child_spec().
239
dtls_child_spec(Proto, ListenOn, Opts, MFA) ->
240
    dtls_child_spec(Proto, ListenOn, merge_mfargs(Opts, MFA)).
1✔
241

242
merge_mfargs(Opts, MFA) ->
243
    [{connection_mfargs, MFA} | proplists:delete(connection_mfargs, Opts)].
28✔
244

245
%%--------------------------------------------------------------------
246
%% Get/Set APIs
247

248
%% @doc Get listeners.
249
-spec(listeners() -> [{{atom(), listen_on()}, pid()}]).
250
listeners() -> esockd_sup:listeners().
2✔
251

252
%% @doc Get one listener.
253
-spec(listener({atom(), listen_on()}) -> pid()).
254
listener({Proto, ListenOn}) when is_atom(Proto) ->
255
    esockd_sup:listener({Proto, fixaddr(ListenOn)}).
2✔
256

257
%% @doc Get stats
258
-spec(get_stats({atom(), listen_on()}) -> [{atom(), non_neg_integer()}]).
259
get_stats({Proto, ListenOn}) when is_atom(Proto) ->
260
    esockd_server:get_stats({Proto, fixaddr(ListenOn)}).
1✔
261

262
%% @doc Get options
263
-spec(get_options({atom(), listen_on()}) -> options()).
264
get_options({Proto, ListenOn}) when is_atom(Proto) ->
265
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, []).
6✔
266

267
%% @doc Set applicable options
268
%% If some options could not be set, either because they are not applicable or
269
%% because they require a listener restart, function returns an error.
270
-spec set_options({atom(), listen_on()}, options()) ->
271
    {ok, options()} | {error, _TODO}.
272
set_options({Proto, ListenOn}, Options) when is_atom(Proto) ->
273
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Options]).
6✔
274

275
%% @doc Get acceptors number
276
-spec(get_acceptors({atom(), listen_on()}) -> pos_integer()).
277
get_acceptors({Proto, ListenOn}) ->
278
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
10✔
279

280
%% @doc Get max connections
281
-spec(get_max_connections({atom(), listen_on()} | pid()) -> pos_integer()).
282
get_max_connections({Proto, ListenOn}) when is_atom(Proto) ->
283
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
7✔
284

285
%% @doc Set max connections
286
-spec(set_max_connections({atom(), listen_on()}, pos_integer()) -> ok).
287
set_max_connections({Proto, ListenOn}, MaxConns) when is_atom(Proto) ->
288
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).
3✔
289

290
%% @doc Set max connection rate
291
-spec(get_max_conn_rate({atom(), listen_on()}) -> conn_limit()).
292
get_max_conn_rate({Proto, ListenOn}) when is_atom(Proto) ->
293
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn]).
9✔
294

295
%% @doc Set max connection rate
296
-spec(set_max_conn_rate({atom(), listen_on()}, conn_limit()) -> ok).
297
set_max_conn_rate({Proto, ListenOn}, Opt) when is_atom(Proto) ->
298
    with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Opt]).
3✔
299

300
%% @doc Get current connections
301
-spec(get_current_connections({atom(), listen_on()}) -> non_neg_integer()).
302
get_current_connections({Proto, ListenOn}) when is_atom(Proto) ->
303
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
4✔
304

305
%% @doc Get shutdown count
306
-spec(get_shutdown_count({atom(), listen_on()}) -> pos_integer()).
307
get_shutdown_count({Proto, ListenOn}) when is_atom(Proto) ->
308
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
3✔
309

310
%% @doc Get access rules
311
-spec(get_access_rules({atom(), listen_on()}) -> [esockd_access:rule()]).
312
get_access_rules({Proto, ListenOn}) when is_atom(Proto) ->
313
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
9✔
314

315
%% @doc Allow access address
316
-spec(allow({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
317
allow({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
318
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
3✔
319

320
%% @doc Deny access address
321
-spec(deny({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
322
deny({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
323
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
3✔
324

325
%%--------------------------------------------------------------------
326
%% Utils
327

328
-spec start_mfargs(mfargs(), _Arg1, _Arg2) -> _Ret.
329
start_mfargs(M, A1, A2) when is_atom(M) ->
330
    M:start_link(A1, A2);
30✔
331
start_mfargs({M, F}, A1, A2) when is_atom(M), is_atom(F) ->
332
    M:F(A1, A2);
6✔
333
start_mfargs({M, F, Args}, A1, A2) when is_atom(M), is_atom(F), is_list(Args) ->
334
    erlang:apply(M, F, [A1, A2 | Args]).
28✔
335

336
%% @doc Merge two options
337
-spec(merge_opts(proplists:proplist(), proplists:proplist())
338
      -> proplists:proplist()).
339
merge_opts(Opts1, Opts2) ->
340
    squash_opts(Opts1 ++ Opts2).
101✔
341

342
squash_opts([{Name, Value} | Rest]) ->
343
    Overrides = proplists:get_all_values(Name, Rest),
383✔
344
    Merged = lists:foldl(fun(O, V) -> merge_opt(Name, V, O) end, Value, Overrides),
383✔
345
    make_opt(Name, Merged) ++ squash_opts(proplists:delete(Name, Rest));
383✔
346
squash_opts([Name | Rest]) when is_atom(Name) ->
347
    [Name | squash_opts([Opt || Opt <- Rest, Opt =/= Name])];
35✔
348
squash_opts([]) ->
349
    [].
101✔
350

351
make_opt(_Name, undefined) -> [];
1✔
352
make_opt(Name, Value) -> [{Name, Value}].
382✔
353

354
merge_opt(ssl_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
3✔
355
merge_opt(tcp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
2✔
356
merge_opt(udp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
×
357
merge_opt(dtls_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
×
358
merge_opt(_, _Opt1, Opt2) -> Opt2.
40✔
359

360
-spec changed_opts(proplists:proplist(), proplists:proplist())
361
      -> proplists:proplist().
362
changed_opts(Opts, OptsRef) ->
363
    lists:filter(
10✔
364
        fun(Opt) ->
365
            [Name] = proplists:get_keys([Opt]),
14✔
366
            Value = proplists:get_value(Name, [Opt]),
14✔
367
            ValueRef = proplists:get_value(Name, OptsRef),
14✔
368
            ValueRef =/= Value orelse ValueRef == undefined
14✔
369
        end,
370
        Opts
371
    ).
372

373
%% @doc Parse option.
374
parse_opt(Options) ->
375
    parse_opt(Options, []).
1✔
376
parse_opt([], Acc) ->
377
    lists:reverse(Acc);
1✔
378
parse_opt([{acceptors, I}|Opts], Acc) when is_integer(I) ->
379
    parse_opt(Opts, [{acceptors, I}|Acc]);
1✔
380
parse_opt([{max_connections, I}|Opts], Acc) when is_integer(I) ->
381
    parse_opt(Opts, [{max_connections, I}|Acc]);
×
382
parse_opt([{max_conn_rate, Limit}|Opts], Acc) when Limit > 0 ->
383
    parse_opt(Opts, [{max_conn_rate, {Limit, 1}}|Acc]);
×
384
parse_opt([{max_conn_rate, {Limit, Period}}|Opts], Acc) when Limit > 0, Period >0 ->
385
    parse_opt(Opts, [{max_conn_rate, {Limit, Period}}|Acc]);
×
386
parse_opt([{access_rules, Rules}|Opts], Acc) ->
387
    parse_opt(Opts, [{access_rules, Rules}|Acc]);
×
388
parse_opt([{shutdown, I}|Opts], Acc) when I == brutal_kill; I == infinity; is_integer(I) ->
389
    parse_opt(Opts, [{shutdown, I}|Acc]);
×
390
parse_opt([tune_buffer|Opts], Acc) ->
391
    parse_opt(Opts, [{tune_buffer, true}|Acc]);
×
392
parse_opt([{tune_buffer, I}|Opts], Acc) when is_boolean(I) ->
393
    parse_opt(Opts, [{tune_buffer, I}|Acc]);
1✔
394
parse_opt([proxy_protocol|Opts], Acc) ->
395
    parse_opt(Opts, [{proxy_protocol, true}|Acc]);
×
396
parse_opt([{proxy_protocol, I}|Opts], Acc) when is_boolean(I) ->
397
    parse_opt(Opts, [{proxy_protocol, I}|Acc]);
1✔
398
parse_opt([{proxy_protocol_timeout, Timeout}|Opts], Acc) when is_integer(Timeout) ->
399
    parse_opt(Opts, [{proxy_protocol_timeout, Timeout}|Acc]);
×
400
parse_opt([{ssl_options, L}|Opts], Acc) when is_list(L) ->
401
    parse_opt(Opts, [{ssl_options, L}|Acc]);
1✔
402
parse_opt([{tcp_options, L}|Opts], Acc) when is_list(L) ->
403
    parse_opt(Opts, [{tcp_options, L}|Acc]);
×
404
parse_opt([{udp_options, L}|Opts], Acc) when is_list(L) ->
405
    parse_opt(Opts, [{udp_options, L}|Acc]);
×
406
parse_opt([{dtls_options, L}|Opts], Acc) when is_list(L) ->
407
    parse_opt(Opts, [{dtls_options, L}|Acc]);
×
408
parse_opt([_|Opts], Acc) ->
409
    parse_opt(Opts, Acc).
2✔
410

411
%% @doc System 'ulimit -n'
412
-spec(ulimit() -> pos_integer()).
413
ulimit() ->
414
    proplists:get_value(max_fds, hd(erlang:system_info(check_io))).
1✔
415

416
-spec(to_string(listen_on()) -> string()).
417
to_string(Port) when is_integer(Port) ->
418
    integer_to_list(Port);
1✔
419
to_string({Addr, Port}) ->
420
    format(fixaddr({Addr, Port})).
3✔
421

422
%% @doc Parse Address
423
fixaddr(Port) when is_integer(Port) ->
424
    Port;
129✔
425
fixaddr({Addr, Port}) when is_list(Addr), is_integer(Port) ->
426
    {ok, IPAddr} = inet:parse_address(Addr), {IPAddr, Port};
8✔
427
fixaddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) ->
428
    case esockd_cidr:is_ipv6(Addr) or esockd_cidr:is_ipv4(Addr) of
5✔
429
        true  -> {Addr, Port};
5✔
430
        false -> error({invalid_ipaddr, Addr})
×
431
    end.
432

433
-spec(format({inet:ip_address(), inet:port_number()}) -> string()).
434
format({Addr, Port}) ->
435
    inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port).
64✔
436

437
%%--------------------------------------------------------------------
438
%% Internal funcs
439
%%--------------------------------------------------------------------
440

441
with_listener({Proto, ListenOn}, Fun) ->
442
    with_listener({Proto, ListenOn}, Fun, []).
33✔
443

444
with_listener({Proto, ListenOn}, Fun, Args) ->
445
    case esockd_sup:listener_and_module({Proto, ListenOn}) of
48✔
446
        undefined ->
447
            error(not_found);
3✔
448
        {LSup, Mod} ->
449
            erlang:apply(Mod, Fun, [LSup | Args])
45✔
450
    end.
451

452
with_listener_ref(ListenerRef = {Proto, ListenOn}, Fun, Args) ->
453
    case esockd_sup:listener_and_module({Proto, ListenOn}) of
18✔
454
        undefined ->
455
            error(not_found);
3✔
456
        {LSup, Mod} ->
457
            erlang:apply(Mod, Fun, [ListenerRef, LSup | Args])
15✔
458
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc