• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 527

16 Sep 2025 07:26AM UTC coverage: 67.052% (+1.0%) from 66.039%
527

push

github

web-flow
Merge pull request #211 from JimMoen/fix-rate-limit-pause

fix: the next check start time should be `Now + Pasue`

2 of 2 new or added lines in 1 file covered. (100.0%)

228 existing lines in 13 files now uncovered.

696 of 1038 relevant lines covered (67.05%)

106.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.55
/src/esockd_udp.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp).
18

19
-behaviour(gen_server).
20

21
-import(esockd_listener_sup, [conn_rate_limiter/2]).
22

23
-export([ server/4
24
        , count_peers/1
25
        , stop/1
26
        ]).
27

28
%% get/set
29
-export([ get_options/1
30
        , get_acceptors/1
31
        , get_max_connections/1
32
        , get_max_conn_rate/3
33
        , get_current_connections/1
34
        , get_shutdown_count/1
35
        ]).
36

37
-export([ set_max_connections/2
38
        , set_max_conn_rate/4
39
        ]).
40

41
-export([ get_access_rules/1
42
        , allow/2
43
        , deny/2
44
        ]).
45

46
%% gen_server callbacks
47
-export([ init/1
48
        , handle_call/3
49
        , handle_cast/2
50
        , handle_info/2
51
        , terminate/2
52
        , code_change/3
53
        ]).
54

55
-type(maybe(T) :: undefined | T).
56

57
-record(state, {
58
          proto        :: atom(),
59
          sock         :: inet:socket(),
60
          port         :: inet:port_number(),
61
          rate_limit   :: maybe(esockd_rate_limit:bucket()),
62
          conn_limiter :: esockd_limiter:bucket_name(),
63
          limit_timer  :: maybe(reference()),
64
          max_peers    :: infinity | pos_integer(),
65
          peers        :: map(),
66
          options      :: [esockd:option()],
67
          access_rules :: list(),
68
          mfa          :: esockd:mfargs()
69
         }).
70

71
-define(ACTIVE_N, 100).
72
-define(ENABLED(X), (X =/= undefined)).
73
-define(DEFAULT_OPTS, [binary, {reuseaddr, true}]).
74
-define(ERROR_MSG(Format, Args),
75
        error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
76

77
%%--------------------------------------------------------------------
78
%% API
79
%%--------------------------------------------------------------------
80

81
-spec(server(atom(), esockd:listen_on(), [gen_udp:option()], mfa())
82
      -> {ok, pid()} | {error, term()}).
83
server(Proto, ListenOn, Opts, MFA) ->
84
    gen_server:start_link(?MODULE, [Proto, ListenOn, Opts, MFA], []).
30✔
85

86
resolve_addr(Port, SockOpts) when is_integer(Port) ->
87
    SockOpts;
21✔
88
resolve_addr({Addr, _Port}, SockOpts) ->
89
    IfAddr = case proplists:get_value(ip, SockOpts) of
9✔
90
                 undefined -> proplists:get_value(ifaddr, SockOpts);
9✔
UNCOV
91
                 Addr      -> Addr
×
92
             end,
93
    case (IfAddr =:= undefined) orelse (IfAddr =:= Addr) of
9✔
94
        true -> ok;
9✔
UNCOV
95
        _ -> error({badarg, inconsistent_addr})
×
96
    end,
97
    lists:keystore(ip, 1, SockOpts, {ip, Addr}).
9✔
98

99
-spec(count_peers(pid()) -> integer()).
100
count_peers(Pid) ->
101
    gen_server:call(Pid, count_peers).
9✔
102

103
-spec(stop(pid()) -> ok).
104
stop(Pid) -> gen_server:stop(Pid).
9✔
105

106
%%--------------------------------------------------------------------
107
%% GET/SET APIs
108
%%--------------------------------------------------------------------
109

110
get_options(Pid) ->
111
    gen_server:call(Pid, options).
3✔
112

113
get_acceptors(_Pid) ->
114
    1.
3✔
115

116
get_max_connections(Pid) ->
117
    gen_server:call(Pid, max_peers).
6✔
118

119
get_max_conn_rate(_Pid, Proto, ListenOn) ->
120
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
6✔
121
        undefined ->
UNCOV
122
            {error, not_found};
×
123
        #{capacity := Capacity, interval := Interval} ->
124
            {Capacity, Interval}
6✔
125
    end.
126

127
get_current_connections(Pid) ->
128
    gen_server:call(Pid, count_peers).
3✔
129

130
get_shutdown_count(_Pid) ->
131
    [].
3✔
132

133
set_max_connections(Pid, MaxLimit) when is_integer(MaxLimit) ->
134
    gen_server:call(Pid, {max_peers, MaxLimit}).
3✔
135

136
set_max_conn_rate(Pid, Proto, ListenOn, ConnRate) ->
137
    gen_server:call(Pid, {max_conn_rate, Proto, ListenOn, ConnRate}).
3✔
138

139
get_access_rules(Pid) ->
UNCOV
140
    gen_server:call(Pid, access_rules).
×
141

142
allow(Pid, CIDR) ->
143
    gen_server:call(Pid, {add_rule, {allow, CIDR}}).
×
144

145
deny(Pid, CIDR) ->
UNCOV
146
    gen_server:call(Pid, {add_rule, {deny, CIDR}}).
×
147

148
%%--------------------------------------------------------------------
149
%% gen_server callbacks
150
%%--------------------------------------------------------------------
151

152
init([Proto, ListenOn, Opts, MFA]) ->
153
    process_flag(trap_exit, true),
30✔
154
    put(incoming_peers, 0),
30✔
155

156
    RawRules = proplists:get_value(access_rules, Opts, [{allow, all}]),
30✔
157
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
30✔
158

159
    Port = port(ListenOn),
30✔
160
    UdpOpts = resolve_addr(ListenOn, sockopts(Opts)),
30✔
161
    case gen_udp:open(Port, esockd:merge_opts(?DEFAULT_OPTS, UdpOpts)) of
30✔
162
        {ok, Sock} ->
163
            %% Trigger the udp_passive event
164
            ok = inet:setopts(Sock, [{active, 1}]),
30✔
165
            Limiter = conn_rate_limiter({listener, Proto, Port}, proplists:get_value(max_conn_rate, Opts)),
30✔
166
            MaxPeers = proplists:get_value(max_connections, Opts, infinity),
30✔
167
            {ok, #state{proto = Proto,
30✔
168
                        sock = Sock,
169
                        port = Port,
170
                        max_peers = MaxPeers,
171
                        peers = #{},
172
                        access_rules = AccessRules,
173
                        conn_limiter = Limiter,
174
                        options = Opts,
175
                        mfa = MFA}};
176
        {error, Reason} ->
UNCOV
177
            {stop, Reason}
×
178
    end.
179

180
port(Port) when is_integer(Port) -> Port;
21✔
181
port({_Addr, Port}) -> Port.
9✔
182

183
sockopts(Opts) ->
184
    esockd:merge_opts(
30✔
185
      ?DEFAULT_OPTS,
186
      proplists:get_value(udp_options, Opts, [])
187
     ).
188

189
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
190
    {reply, maps:size(Peers) div 2, State};
12✔
191

192
handle_call(max_peers, _From, State = #state{max_peers = MaxLimit}) ->
193
    {reply, MaxLimit, State};
6✔
194

195
handle_call({max_peers, MaxLimit}, _From, State) ->
196
    {reply, ok, State#state{max_peers = MaxLimit}};
3✔
197

198
handle_call({max_conn_rate, Proto, ListenOn, ConnRate}, _From, State) ->
199
    {reply, ok, State#state{conn_limiter = conn_rate_limiter({listener, Proto, ListenOn}, ConnRate)}};
3✔
200

201
handle_call(options, _From, State = #state{options = Opts}) ->
202
    {reply, Opts, State};
3✔
203

204
handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
UNCOV
205
    {reply, [raw(Rule) || Rule <- Rules], State};
×
206

207
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
UNCOV
208
    try esockd_access:compile(RawRule) of
×
209
        Rule ->
UNCOV
210
            case lists:member(Rule, Rules) of
×
211
                true ->
UNCOV
212
                    {reply, {error, already_exists}, State};
×
213
                false ->
UNCOV
214
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
×
215
            end
216
    catch
217
        error:Reason ->
UNCOV
218
            ?ERROR_MSG("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
UNCOV
219
            {reply, {error, bad_access_rule}, State}
×
220
    end;
221

222
%% mimic the supervisor's which_children reply
223
handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Func, _Args}}) ->
UNCOV
224
     {reply, [{undefined, Pid, worker, [Mod]}
×
225
              || Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};
×
226

227
handle_call(Req, _From, State) ->
228
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
3✔
229
    {reply, ignore, State}.
3✔
230

231
handle_cast(Msg, State) ->
232
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
3✔
233
    {noreply, State}.
3✔
234

235
handle_info({udp, Sock, IP, InPortNo, Packet},
236
            State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
237
    case maps:find(Peer = {IP, InPortNo}, Peers) of
33✔
238
        {ok, Pid} ->
239
            Pid ! {datagram, self(), Packet},
6✔
240
            {noreply, State};
6✔
241
        error ->
242
            case allowed(IP, Rules) of
27✔
243
                true ->
244
                    put(incoming_peers, get(incoming_peers) + 1),
27✔
245
                    try should_throttle(State) orelse
27✔
246
                        start_channel({udp, self(), Sock}, Peer, State) of
27✔
247
                        true ->
UNCOV
248
                            ?ERROR_MSG("Cannot create udp channel for peer ~s due to throttling.",
×
249
                                       [esockd:format(Peer)]),
UNCOV
250
                            {noreply, State};
×
251
                        {ok, Pid} ->
252
                            _Ref = erlang:monitor(process, Pid),
27✔
253
                            Pid ! {datagram, self(), Packet},
27✔
254
                            {noreply, store_peer(Peer, Pid, State)};
27✔
255
                        {error, Reason} ->
UNCOV
256
                            ?ERROR_MSG("Failed to start udp channel for peer ~s, reason: ~p",
×
257
                                       [esockd:format(Peer), Reason]),
UNCOV
258
                            {noreply, State}
×
259
                    catch
260
                        _Error:Reason ->
261
                            ?ERROR_MSG("Exception occurred when starting udp channel for peer ~s, reason: ~p",
×
262
                                       [esockd:format(Peer), Reason]),
UNCOV
263
                            {noreply, State}
×
264
                    end;
265
                false ->
UNCOV
266
                    {noreply, State}
×
267
            end
268
    end;
269

270
handle_info({udp_passive, Sock}, State = #state{sock = Sock, rate_limit = Rl}) ->
271
    NState = case ?ENABLED(Rl) andalso
18✔
UNCOV
272
                  esockd_rate_limit:check(put(incoming_peers, 0), Rl) of
×
273
                 false ->
274
                     activate_sock(State);
18✔
275
                 {0, Rl1} ->
UNCOV
276
                     activate_sock(State#state{rate_limit = Rl1});
×
277
                 {Pause, Rl1} ->
278
                     ?ERROR_MSG("Pause ~w(ms) due to rate limit.", [Pause]),
×
UNCOV
279
                     TRef = erlang:start_timer(Pause, self(), activate_sock),
×
280
                     State#state{rate_limit = Rl1, limit_timer = TRef}
×
281
             end,
282
    {noreply, NState, hibernate};
18✔
283

284
handle_info({timeout, TRef, activate_sock}, State = #state{limit_timer = TRef}) ->
UNCOV
285
    NState = State#state{limit_timer = undefined},
×
286
    {noreply, activate_sock(NState)};
×
287

288
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
289
    handle_peer_down(DownPid, Peers, State);
3✔
290

291
handle_info({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
UNCOV
292
    handle_peer_down(DownPid, Peers, State);
×
293

294
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
295
    case gen_udp:send(Sock, IP, Port, Packet) of
30✔
296
        ok -> ok;
30✔
297
        {error, Reason} ->
UNCOV
298
            ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd:format(Peer), Reason])
×
299
    end,
300
    {noreply, State};
30✔
301
handle_info(Info, State) ->
302
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
3✔
303
    {noreply, State}.
3✔
304

305
terminate(_Reason, #state{sock = Sock}) ->
306
    gen_udp:close(Sock).
30✔
307

308
code_change(_OldVsn, State, _Extra) ->
309
    {ok, State}.
3✔
310

311
%%--------------------------------------------------------------------
312
%% Internel functions
313
%%--------------------------------------------------------------------
314

315
handle_peer_down(DownPid, Peers, State) ->
316
    case maps:find(DownPid, Peers) of
3✔
317
        {ok, Peer} ->
318
            {noreply, erase_peer(Peer, DownPid, State)};
3✔
319
        error ->
320
            {noreply, State}
×
321
    end.
322

323
-compile({inline,
324
          [ allowed/2
325
          , should_throttle/1
326
          , start_channel/3
327
          , activate_sock/1
328
          , store_peer/3
329
          , erase_peer/3
330
          , raw/1
331
          ]}).
332

333
allowed(Addr, Rules) ->
334
    case esockd_access:match(Addr, Rules) of
27✔
UNCOV
335
        nomatch          -> true;
×
336
        {matched, allow} -> true;
27✔
UNCOV
337
        {matched, deny}  -> false
×
338
    end.
339

340
should_throttle(#state{max_peers = infinity}) -> false;
27✔
341
should_throttle(#state{max_peers = MaxLimit, peers = Peers}) ->
UNCOV
342
    (maps:size(Peers) div 2) > MaxLimit.
×
343

344
start_channel(Transport, Peer, #state{mfa = {M, F, Args}}) ->
345
    erlang:apply(M, F, [Transport, Peer | Args]).
27✔
346

347
activate_sock(State = #state{sock = Sock}) ->
348
    ok = inet:setopts(Sock, [{active, ?ACTIVE_N}]), State.
18✔
349

350
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
351
    State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
27✔
352

353
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
354
    State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
3✔
355

356
raw({allow, CIDR = {_Start, _End, _Len}}) ->
UNCOV
357
     {allow, esockd_cidr:to_string(CIDR)};
×
358
raw({deny, CIDR = {_Start, _End, _Len}}) ->
UNCOV
359
     {deny, esockd_cidr:to_string(CIDR)};
×
360
raw(Rule) ->
UNCOV
361
     Rule.
×
362

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc