• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 527

16 Sep 2025 07:26AM UTC coverage: 67.052% (+1.0%) from 66.039%
527

push

github

web-flow
Merge pull request #211 from JimMoen/fix-rate-limit-pause

fix: the next check start time should be `Now + Pasue`

2 of 2 new or added lines in 1 file covered. (100.0%)

228 existing lines in 13 files now uncovered.

696 of 1038 relevant lines covered (67.05%)

106.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.76
/src/esockd.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd).
18

19
-include("esockd.hrl").
20

21
-export([start/0]).
22

23
%% Core API
24
-export([ open/4
25
        , open_udp/4
26
        , open_dtls/4
27
        , close/2
28
        , close/1
29
        ]).
30

31
-export([ reopen/1
32
        , reopen/2
33
        ]).
34

35
-export([ child_spec/4
36
        , udp_child_spec/4
37
        , dtls_child_spec/4
38
        ]).
39

40
%% Management API
41
-export([ listeners/0
42
        , listener/1
43
        ]).
44

45
-export([ get_stats/1
46
        , get_options/1
47
        , get_acceptors/1
48
        ]).
49

50
-export([ get_max_connections/1
51
        , set_max_connections/2
52
        , get_current_connections/1
53
        , get_max_conn_rate/1
54
        , set_max_conn_rate/2
55
        ]).
56

57
-export([get_shutdown_count/1]).
58

59
%% Allow, Deny API
60
-export([ get_access_rules/1
61
        , allow/2
62
        , deny/2
63
        ]).
64

65
%% Utility functions
66
-export([ merge_opts/2
67
        , parse_opt/1
68
        , ulimit/0
69
        , fixaddr/1
70
        , to_string/1
71
        , format/1
72
        ]).
73

74
-export_type([ proto/0
75
             , transport/0
76
             , udp_transport/0
77
             , socket/0
78
             , sock_fun/0
79
             , mfargs/0
80
             , option/0
81
             , listen_on/0
82
             ]).
83

84
-type(proto() :: atom()).
85
-type(transport() :: module()).
86
-type(udp_transport() :: {udp | dtls, pid(), inet:socket()}).
87
-type(socket() :: esockd_transport:socket()).
88
-type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).
89
-type(sock_fun() :: {function(), list()}).
90
-type(conn_limit() :: pos_integer() | {pos_integer(), pos_integer()}).
91
-type(options() :: [option()]).
92
-type(option() :: {acceptors, pos_integer()}
93
                | {max_connections, pos_integer()}
94
                | {max_conn_rate, conn_limit()}
95
                | {access_rules, [esockd_access:rule()]}
96
                | {shutdown, brutal_kill | infinity | pos_integer()}
97
                | tune_buffer | {tune_buffer, boolean()}
98
                | proxy_protocol | {proxy_protocol, boolean()}
99
                | {proxy_protocol_timeout, timeout()}
100
                | {ssl_options, ssl_options()}
101
                | {tcp_options, [gen_tcp:listen_option()]}
102
                | {udp_options, [gen_udp:option()]}
103
                | {dtls_options, dtls_options()}).
104

105
-type(host() :: inet:ip_address() | string()).
106
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
107
-type ssl_options() :: [{handshake_timeout, pos_integer()} | ssl:ssl_option()].
108
-type dtls_options() :: [{handshake_timeout, pos_integer()} | ssl:ssl_option()].
109

110
%%--------------------------------------------------------------------
111
%% APIs
112
%%--------------------------------------------------------------------
113

114
%% @doc Start esockd application.
115
-spec(start() -> ok).
116
start() ->
UNCOV
117
    {ok, _} = application:ensure_all_started(esockd), ok.
×
118

119
%%--------------------------------------------------------------------
120
%% Open & Close
121

122
%% @doc Open a TCP or SSL listener
123
-spec(open(atom(), listen_on(), [option()], mfargs()) -> {ok, pid()} | {error, term()}).
124
open(Proto, Port, Opts, MFA) when is_atom(Proto), is_integer(Port) ->
125
        esockd_sup:start_listener(Proto, Port, Opts, MFA);
78✔
126
open(Proto, {Host, Port}, Opts, MFA) when is_atom(Proto), is_integer(Port) ->
127
    {IPAddr, _Port} = fixaddr({Host, Port}),
6✔
128
    case proplists:get_value(ip, tcp_options(Opts)) of
6✔
129
        undefined -> ok;
6✔
UNCOV
130
        IPAddr    -> ok;
×
UNCOV
131
        Other     -> error({badmatch, Other})
×
132
    end,
133
        esockd_sup:start_listener(Proto, {IPAddr, Port}, Opts, MFA).
6✔
134

135
%% @private
136
tcp_options(Opts) ->
137
    proplists:get_value(tcp_options, Opts, []).
6✔
138

139
%% @doc Open a UDP listener
140
-spec(open_udp(atom(), listen_on(), [option()], mfargs())
141
     -> {ok, pid()}
142
      | {error, term()}).
143
open_udp(Proto, Port, Opts, MFA) ->
144
    esockd_sup:start_child(udp_child_spec(Proto, Port, Opts, MFA)).
21✔
145

146
%% @doc Open a DTLS listener
147
-spec(open_dtls(atom(), listen_on(), options(), mfargs())
148
     -> {ok, pid()}
149
      | {error, term()}).
150
open_dtls(Proto, ListenOn, Opts, MFA) ->
151
    esockd_sup:start_child(dtls_child_spec(Proto, ListenOn, Opts, MFA)).
30✔
152

153
%% @doc Close the listener
154
-spec(close({atom(), listen_on()}) -> ok | {error, term()}).
155
close({Proto, ListenOn}) when is_atom(Proto) ->
UNCOV
156
    close(Proto, ListenOn).
×
157

158
-spec(close(atom(), listen_on()) -> ok | {error, term()}).
159
close(Proto, ListenOn) when is_atom(Proto) ->
160
        esockd_sup:stop_listener(Proto, fixaddr(ListenOn)).
135✔
161

162
%% @doc Reopen the listener
163
-spec(reopen({atom(), listen_on()}) -> {ok, pid()} | {error, term()}).
164
reopen({Proto, ListenOn}) when is_atom(Proto) ->
165
    reopen(Proto, ListenOn).
9✔
166

167
-spec(reopen(atom(), listen_on()) -> {ok, pid()} | {error, term()}).
168
reopen(Proto, ListenOn) when is_atom(Proto) ->
169
    esockd_sup:restart_listener(Proto, fixaddr(ListenOn)).
9✔
170

171
%%--------------------------------------------------------------------
172
%% Spec funcs
173

174
%% @doc Create a Child spec for a TCP/SSL Listener. It is a convenient method
175
%% for creating a Child spec to hang on another Application supervisor.
176
-spec(child_spec(atom(), listen_on(), [option()], mfargs())
177
      -> supervisor:child_spec()).
178
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
179
    esockd_sup:child_spec(Proto, fixaddr(ListenOn), Opts, MFA).
3✔
180

181
%% @doc Create a Child spec for a UDP Listener.
182
-spec(udp_child_spec(atom(), listen_on(), options(), mfargs())
183
     -> supervisor:child_spec()).
184
udp_child_spec(Proto, Port, Opts, MFA) ->
185
    esockd_sup:udp_child_spec(Proto, fixaddr(Port), Opts, MFA).
24✔
186

187
%% @doc Create a Child spec for a DTLS Listener.
188
-spec(dtls_child_spec(atom(), listen_on(), options(), mfargs())
189
     -> supervisor:child_spec()).
190
dtls_child_spec(Proto, ListenOn, Opts, MFA) ->
191
    esockd_sup:dtls_child_spec(Proto, fixaddr(ListenOn), Opts, MFA).
33✔
192

193
%%--------------------------------------------------------------------
194
%% Get/Set APIs
195

196
%% @doc Get listeners.
197
-spec(listeners() -> [{{atom(), listen_on()}, pid()}]).
198
listeners() -> esockd_sup:listeners().
6✔
199

200
%% @doc Get one listener.
201
-spec(listener({atom(), listen_on()}) -> pid()).
202
listener({Proto, ListenOn}) when is_atom(Proto) ->
203
    esockd_sup:listener({Proto, fixaddr(ListenOn)}).
6✔
204

205
%% @doc Get stats
206
-spec(get_stats({atom(), listen_on()}) -> [{atom(), non_neg_integer()}]).
207
get_stats({Proto, ListenOn}) when is_atom(Proto) ->
208
    esockd_server:get_stats({Proto, fixaddr(ListenOn)}).
3✔
209

210
%% @doc Get options
211
-spec(get_options({atom(), listen_on()}) -> options()).
212
get_options({Proto, ListenOn}) when is_atom(Proto) ->
213
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
18✔
214

215
%% @doc Get acceptors number
216
-spec(get_acceptors({atom(), listen_on()}) -> pos_integer()).
217
get_acceptors({Proto, ListenOn}) ->
218
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
15✔
219

220
%% @doc Get max connections
221
-spec(get_max_connections({atom(), listen_on()} | pid()) -> pos_integer()).
222
get_max_connections({Proto, ListenOn}) when is_atom(Proto) ->
223
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
21✔
224

225
%% @doc Set max connections
226
-spec(set_max_connections({atom(), listen_on()}, pos_integer()) -> ok).
227
set_max_connections({Proto, ListenOn}, MaxConns) when is_atom(Proto) ->
228
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).
9✔
229

230
%% @doc Set max connection rate
231
-spec(get_max_conn_rate({atom(), listen_on()}) -> conn_limit()).
232
get_max_conn_rate({Proto, ListenOn}) when is_atom(Proto) ->
233
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn]).
27✔
234

235
%% @doc Set max connection rate
236
-spec(set_max_conn_rate({atom(), listen_on()}, conn_limit()) -> ok).
237
set_max_conn_rate({Proto, ListenOn}, ConnRate)
238
  when is_atom(Proto), is_integer(ConnRate);
239
       is_atom(Proto), tuple_size(ConnRate) =:= 2 ->
240
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn, ConnRate]).
9✔
241

242
%% @doc Get current connections
243
-spec(get_current_connections({atom(), listen_on()}) -> non_neg_integer()).
244
get_current_connections({Proto, ListenOn}) when is_atom(Proto) ->
245
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
12✔
246

247
%% @doc Get shutdown count
248
-spec(get_shutdown_count({atom(), listen_on()}) -> pos_integer()).
249
get_shutdown_count({Proto, ListenOn}) when is_atom(Proto) ->
250
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
9✔
251

252
%% @doc Get access rules
253
-spec(get_access_rules({atom(), listen_on()}) -> [esockd_access:rule()]).
254
get_access_rules({Proto, ListenOn}) when is_atom(Proto) ->
255
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
27✔
256

257
%% @doc Allow access address
258
-spec(allow({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
259
allow({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
260
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
9✔
261

262
%% @doc Deny access address
263
-spec(deny({atom(), listen_on()}, all | esockd_cidr:cidr_string()) -> ok).
264
deny({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
265
    with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [CIDR]).
9✔
266

267
%%--------------------------------------------------------------------
268
%% Utils
269

270
%% @doc Merge two options
271
-spec(merge_opts(proplists:proplist(), proplists:proplist())
272
      -> proplists:proplist()).
273
merge_opts(Defaults, Options) ->
274
    lists:foldl(
183✔
275
      fun({Opt, Val}, Acc) ->
276
          lists:keystore(Opt, 1, Acc, {Opt, Val});
132✔
277
         (Opt, Acc) ->
278
          lists:usort([Opt | Acc])
48✔
279
      end, Defaults, Options).
280

281
%% @doc Parse option.
282
parse_opt(Options) ->
283
    parse_opt(Options, []).
3✔
284
parse_opt([], Acc) ->
285
    lists:reverse(Acc);
3✔
286
parse_opt([{acceptors, I}|Opts], Acc) when is_integer(I) ->
287
    parse_opt(Opts, [{acceptors, I}|Acc]);
3✔
288
parse_opt([{max_connections, I}|Opts], Acc) when is_integer(I) ->
UNCOV
289
    parse_opt(Opts, [{max_connections, I}|Acc]);
×
290
parse_opt([{max_conn_rate, Limit}|Opts], Acc) when Limit > 0 ->
UNCOV
291
    parse_opt(Opts, [{max_conn_rate, {Limit, 1}}|Acc]);
×
292
parse_opt([{max_conn_rate, {Limit, Period}}|Opts], Acc) when Limit > 0, Period >0 ->
UNCOV
293
    parse_opt(Opts, [{max_conn_rate, {Limit, Period}}|Acc]);
×
294
parse_opt([{access_rules, Rules}|Opts], Acc) ->
UNCOV
295
    parse_opt(Opts, [{access_rules, Rules}|Acc]);
×
296
parse_opt([{shutdown, I}|Opts], Acc) when I == brutal_kill; I == infinity; is_integer(I) ->
UNCOV
297
    parse_opt(Opts, [{shutdown, I}|Acc]);
×
298
parse_opt([tune_buffer|Opts], Acc) ->
UNCOV
299
    parse_opt(Opts, [{tune_buffer, true}|Acc]);
×
300
parse_opt([{tune_buffer, I}|Opts], Acc) when is_boolean(I) ->
301
    parse_opt(Opts, [{tune_buffer, I}|Acc]);
3✔
302
parse_opt([proxy_protocol|Opts], Acc) ->
UNCOV
303
    parse_opt(Opts, [{proxy_protocol, true}|Acc]);
×
304
parse_opt([{proxy_protocol, I}|Opts], Acc) when is_boolean(I) ->
305
    parse_opt(Opts, [{proxy_protocol, I}|Acc]);
3✔
306
parse_opt([{proxy_protocol_timeout, Timeout}|Opts], Acc) when is_integer(Timeout) ->
UNCOV
307
    parse_opt(Opts, [{proxy_protocol_timeout, Timeout}|Acc]);
×
308
parse_opt([{ssl_options, L}|Opts], Acc) when is_list(L) ->
309
    parse_opt(Opts, [{ssl_options, L}|Acc]);
3✔
310
parse_opt([{tcp_options, L}|Opts], Acc) when is_list(L) ->
UNCOV
311
    parse_opt(Opts, [{tcp_options, L}|Acc]);
×
312
parse_opt([{udp_options, L}|Opts], Acc) when is_list(L) ->
UNCOV
313
    parse_opt(Opts, [{udp_options, L}|Acc]);
×
314
parse_opt([{dtls_options, L}|Opts], Acc) when is_list(L) ->
UNCOV
315
    parse_opt(Opts, [{dtls_options, L}|Acc]);
×
316
parse_opt([_|Opts], Acc) ->
317
    parse_opt(Opts, Acc).
6✔
318

319
%% @doc System 'ulimit -n'
320
-spec(ulimit() -> pos_integer()).
321
ulimit() ->
322
    proplists:get_value(max_fds, hd(erlang:system_info(check_io))).
3✔
323

324
-spec(to_string(listen_on()) -> string()).
325
to_string(Port) when is_integer(Port) ->
326
    integer_to_list(Port);
3✔
327
to_string({Addr, Port}) ->
328
    format(fixaddr({Addr, Port})).
9✔
329

330
%% @doc Parse Address
331
fixaddr(Port) when is_integer(Port) ->
332
    Port;
204✔
333
fixaddr({Addr, Port}) when is_list(Addr), is_integer(Port) ->
334
    {ok, IPAddr} = inet:parse_address(Addr), {IPAddr, Port};
24✔
335
fixaddr({Addr, Port}) when is_tuple(Addr), is_integer(Port) ->
336
    case esockd_cidr:is_ipv6(Addr) or esockd_cidr:is_ipv4(Addr) of
9✔
337
        true  -> {Addr, Port};
9✔
UNCOV
338
        false -> error({invalid_ipaddr, Addr})
×
339
    end.
340

341
-spec(format({inet:ip_address(), inet:port_number()}) -> string()).
342
format({Addr, Port}) ->
343
    inet:ntoa(Addr) ++ ":" ++ integer_to_list(Port).
144✔
344

345
%%--------------------------------------------------------------------
346
%% Internal funcs
347
%%--------------------------------------------------------------------
348

349
with_listener({Proto, ListenOn}, Fun) ->
350
    with_listener({Proto, ListenOn}, Fun, []).
102✔
351

352
with_listener({Proto, ListenOn}, Fun, Args) ->
353
    case esockd_sup:listener_and_module({Proto, ListenOn}) of
165✔
354
        undefined ->
355
            error(not_found);
18✔
356
        {LSup, Mod} ->
357
            erlang:apply(Mod, Fun, [LSup | Args])
147✔
358
    end.
359

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc