• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 354

14 Dec 2023 12:54PM UTC coverage: 71.491%. First build
354

Pull #183

github

web-flow
Merge 3297859f4 into 5cb22a8b1
Pull Request #183: feat(listener): support changing options on the fly

170 of 192 new or added lines in 10 files covered. (88.54%)

820 of 1147 relevant lines covered (71.49%)

60.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.29
/src/esockd_udp.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp).
18

19
-behaviour(gen_server).
20

21
-import(esockd_listener_sup, [conn_rate_limiter/1, conn_limiter_opts/2, conn_limiter_opt/2]).
22

23
-export([ server/4
24
        , count_peers/1
25
        , stop/1
26
        ]).
27

28
%% get/set
29
-export([ get_options/2
30
        , get_acceptors/1
31
        , get_max_connections/1
32
        , get_max_conn_rate/3
33
        , get_current_connections/1
34
        , get_shutdown_count/1
35
        ]).
36

37
-export([ set_options/3
38
        , set_max_connections/3
39
        , set_max_conn_rate/3
40
        ]).
41

42
-export([ get_access_rules/1
43
        , allow/2
44
        , deny/2
45
        ]).
46

47
%% gen_server callbacks
48
-export([ init/1
49
        , handle_call/3
50
        , handle_cast/2
51
        , handle_info/2
52
        , terminate/2
53
        , code_change/3
54
        ]).
55

56
-type(maybe(T) :: undefined | T).
57

58
-record(state, {
59
          proto        :: atom(),
60
          sock         :: inet:socket(),
61
          port         :: inet:port_number(),
62
          rate_limit   :: maybe(esockd_rate_limit:bucket()),
63
          conn_limiter :: esockd_generic_limiter:limiter(),
64
          limit_timer  :: maybe(reference()),
65
          max_peers    :: infinity | pos_integer(),
66
          peers        :: map(),
67
          options      :: [esockd:option()],
68
          access_rules :: list(),
69
          mfa          :: esockd:mfargs()
70
         }).
71

72
-define(ACTIVE_N, 100).
73
-define(ENABLED(X), (X =/= undefined)).
74
-define(DEFAULT_OPTS, [binary, {reuseaddr, true}]).
75
-define(ERROR_MSG(Format, Args),
76
        error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
77

78
%%--------------------------------------------------------------------
79
%% API
80
%%--------------------------------------------------------------------
81

82
-spec(server(atom(), esockd:listen_on(), [gen_udp:option()], mfa())
83
      -> {ok, pid()} | {error, term()}).
84
server(Proto, ListenOn, Opts, MFA) ->
85
    gen_server:start_link(?MODULE, [Proto, ListenOn, Opts, MFA], []).
10✔
86

87
resolve_addr(Port, SockOpts) when is_integer(Port) ->
88
    SockOpts;
7✔
89
resolve_addr({Addr, _Port}, SockOpts) ->
90
    IfAddr = case proplists:get_value(ip, SockOpts) of
3✔
91
                 undefined -> proplists:get_value(ifaddr, SockOpts);
3✔
92
                 Addr      -> Addr
×
93
             end,
94
    case (IfAddr =:= undefined) orelse (IfAddr =:= Addr) of
3✔
95
        true -> ok;
3✔
96
        _ -> error({badarg, inconsistent_addr})
×
97
    end,
98
    lists:keystore(ip, 1, SockOpts, {ip, Addr}).
3✔
99

100
-spec(count_peers(pid()) -> integer()).
101
count_peers(Pid) ->
102
    gen_server:call(Pid, count_peers).
3✔
103

104
-spec(stop(pid()) -> ok).
105
stop(Pid) -> gen_server:stop(Pid).
3✔
106

107
%%--------------------------------------------------------------------
108
%% GET/SET APIs
109
%%--------------------------------------------------------------------
110

111
get_options(_ListenerRef, Pid) ->
112
    gen_server:call(Pid, options).
1✔
113

114
get_acceptors(_Pid) ->
115
    1.
1✔
116

117
get_max_connections(Pid) ->
118
    gen_server:call(Pid, max_peers).
2✔
119

120
get_max_conn_rate(_Pid, Proto, ListenOn) ->
121
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
2✔
122
        undefined ->
123
            {error, not_found};
×
124
        #{capacity := Capacity, interval := Interval} ->
125
            {Capacity, Interval}
2✔
126
    end.
127

128
get_current_connections(Pid) ->
129
    gen_server:call(Pid, count_peers).
1✔
130

131
get_shutdown_count(_Pid) ->
132
    [].
1✔
133

134
set_options(_ListenerRef, _Pid, _Opts) ->
NEW
135
    {error, not_supported}.
×
136

137
set_max_connections(_ListenerRef, Pid, MaxLimit) when is_integer(MaxLimit) ->
138
    gen_server:call(Pid, {max_peers, MaxLimit}).
1✔
139

140
set_max_conn_rate(_ListenerRef = {Proto, ListenOn}, Pid, Opts) ->
141
    gen_server:call(Pid, {max_conn_rate, Proto, ListenOn, Opts}).
1✔
142

143
get_access_rules(Pid) ->
144
    gen_server:call(Pid, access_rules).
×
145

146
allow(Pid, CIDR) ->
147
    gen_server:call(Pid, {add_rule, {allow, CIDR}}).
×
148

149
deny(Pid, CIDR) ->
150
    gen_server:call(Pid, {add_rule, {deny, CIDR}}).
×
151

152
%%--------------------------------------------------------------------
153
%% gen_server callbacks
154
%%--------------------------------------------------------------------
155

156
init([Proto, ListenOn, Opts, MFA]) ->
157
    process_flag(trap_exit, true),
10✔
158
    put(incoming_peers, 0),
10✔
159

160
    RawRules = proplists:get_value(access_rules, Opts, [{allow, all}]),
10✔
161
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
10✔
162

163
    Port = port(ListenOn),
10✔
164
    UdpOpts = resolve_addr(ListenOn, sockopts(Opts)),
10✔
165
    case gen_udp:open(Port, esockd:merge_opts(?DEFAULT_OPTS, UdpOpts)) of
10✔
166
        {ok, Sock} ->
167
            %% Trigger the udp_passive event
168
            ok = inet:setopts(Sock, [{active, 1}]),
10✔
169
            Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
10✔
170
            MaxPeers = proplists:get_value(max_connections, Opts, infinity),
10✔
171
            {ok, #state{proto = Proto,
10✔
172
                        sock = Sock,
173
                        port = Port,
174
                        max_peers = MaxPeers,
175
                        peers = #{},
176
                        access_rules = AccessRules,
177
                        conn_limiter = Limiter,
178
                        options = Opts,
179
                        mfa = MFA}};
180
        {error, Reason} ->
181
            {stop, Reason}
×
182
    end.
183

184
port(Port) when is_integer(Port) -> Port;
7✔
185
port({_Addr, Port}) -> Port.
3✔
186

187
sockopts(Opts) ->
188
    esockd:merge_opts(
10✔
189
      ?DEFAULT_OPTS,
190
      proplists:get_value(udp_options, Opts, [])
191
     ).
192

193
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
194
    {reply, maps:size(Peers) div 2, State};
4✔
195

196
handle_call(max_peers, _From, State = #state{max_peers = MaxLimit}) ->
197
    {reply, MaxLimit, State};
2✔
198

199
handle_call({max_peers, MaxLimit}, _From, State) ->
200
    {reply, ok, State#state{max_peers = MaxLimit}};
1✔
201

202
handle_call({max_conn_rate, Proto, ListenOn, Opts}, _From, State) ->
203
    Limiter = conn_rate_limiter(conn_limiter_opt(Opts, {listener, Proto, ListenOn})),
1✔
204
    {reply, ok, State#state{conn_limiter = Limiter}};
1✔
205

206
handle_call(options, _From, State = #state{options = Opts}) ->
207
    {reply, Opts, State};
1✔
208

209
handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
210
    {reply, [raw(Rule) || Rule <- Rules], State};
×
211

212
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
213
    try esockd_access:compile(RawRule) of
×
214
        Rule ->
215
            case lists:member(Rule, Rules) of
×
216
                true ->
217
                    {reply, {error, already_exists}, State};
×
218
                false ->
219
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
×
220
            end
221
    catch
222
        error:Reason ->
223
            ?ERROR_MSG("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
224
            {reply, {error, bad_access_rule}, State}
×
225
    end;
226

227
%% mimic the supervisor's which_children reply
228
handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Func, _Args}}) ->
229
     {reply, [{undefined, Pid, worker, [Mod]}
×
230
              || Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};
×
231

232
handle_call(Req, _From, State) ->
233
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
1✔
234
    {reply, ignore, State}.
1✔
235

236
handle_cast(Msg, State) ->
237
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
1✔
238
    {noreply, State}.
1✔
239

240
handle_info({udp, Sock, IP, InPortNo, Packet},
241
            State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
242
    case maps:find(Peer = {IP, InPortNo}, Peers) of
11✔
243
        {ok, Pid} ->
244
            Pid ! {datagram, self(), Packet},
2✔
245
            {noreply, State};
2✔
246
        error ->
247
            case allowed(IP, Rules) of
9✔
248
                true ->
249
                    put(incoming_peers, get(incoming_peers) + 1),
9✔
250
                    try should_throttle(State) orelse
9✔
251
                        start_channel({udp, self(), Sock}, Peer, State) of
9✔
252
                        true ->
253
                            ?ERROR_MSG("Cannot create udp channel for peer ~s due to throttling.",
×
254
                                       [esockd:format(Peer)]),
255
                            {noreply, State};
×
256
                        {ok, Pid} ->
257
                            _Ref = erlang:monitor(process, Pid),
9✔
258
                            Pid ! {datagram, self(), Packet},
9✔
259
                            {noreply, store_peer(Peer, Pid, State)};
9✔
260
                        {error, Reason} ->
261
                            ?ERROR_MSG("Failed to start udp channel for peer ~s, reason: ~p",
×
262
                                       [esockd:format(Peer), Reason]),
263
                            {noreply, State}
×
264
                    catch
265
                        _Error:Reason ->
266
                            ?ERROR_MSG("Exception occurred when starting udp channel for peer ~s, reason: ~p",
×
267
                                       [esockd:format(Peer), Reason]),
268
                            {noreply, State}
×
269
                    end;
270
                false ->
271
                    {noreply, State}
×
272
            end
273
    end;
274

275
handle_info({udp_passive, Sock}, State = #state{sock = Sock, rate_limit = Rl}) ->
276
    NState = case ?ENABLED(Rl) andalso
6✔
277
                  esockd_rate_limit:check(put(incoming_peers, 0), Rl) of
×
278
                 false ->
279
                     activate_sock(State);
6✔
280
                 {0, Rl1} ->
281
                     activate_sock(State#state{rate_limit = Rl1});
×
282
                 {Pause, Rl1} ->
283
                     ?ERROR_MSG("Pause ~w(ms) due to rate limit.", [Pause]),
×
284
                     TRef = erlang:start_timer(Pause, self(), activate_sock),
×
285
                     State#state{rate_limit = Rl1, limit_timer = TRef}
×
286
             end,
287
    {noreply, NState, hibernate};
6✔
288

289
handle_info({timeout, TRef, activate_sock}, State = #state{limit_timer = TRef}) ->
290
    NState = State#state{limit_timer = undefined},
×
291
    {noreply, activate_sock(NState)};
×
292

293
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
294
    handle_peer_down(DownPid, Peers, State);
1✔
295

296
handle_info({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
297
    handle_peer_down(DownPid, Peers, State);
×
298

299
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
300
    case gen_udp:send(Sock, IP, Port, Packet) of
10✔
301
        ok -> ok;
10✔
302
        {error, Reason} ->
303
            ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd:format(Peer), Reason])
×
304
    end,
305
    {noreply, State};
10✔
306
handle_info(Info, State) ->
307
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
1✔
308
    {noreply, State}.
1✔
309

310
terminate(_Reason, #state{sock = Sock}) ->
311
    gen_udp:close(Sock).
10✔
312

313
code_change(_OldVsn, State, _Extra) ->
314
    {ok, State}.
1✔
315

316
%%--------------------------------------------------------------------
317
%% Internel functions
318
%%--------------------------------------------------------------------
319

320
handle_peer_down(DownPid, Peers, State) ->
321
    case maps:find(DownPid, Peers) of
1✔
322
        {ok, Peer} ->
323
            {noreply, erase_peer(Peer, DownPid, State)};
1✔
324
        error ->
325
            {noreply, State}
×
326
    end.
327

328
-compile({inline,
329
          [ allowed/2
330
          , should_throttle/1
331
          , start_channel/3
332
          , activate_sock/1
333
          , store_peer/3
334
          , erase_peer/3
335
          , raw/1
336
          ]}).
337

338
allowed(Addr, Rules) ->
339
    case esockd_access:match(Addr, Rules) of
9✔
340
        nomatch          -> true;
×
341
        {matched, allow} -> true;
9✔
342
        {matched, deny}  -> false
×
343
    end.
344

345
should_throttle(#state{max_peers = infinity}) -> false;
9✔
346
should_throttle(#state{max_peers = MaxLimit, peers = Peers}) ->
347
    (maps:size(Peers) div 2) > MaxLimit.
×
348

349
start_channel(Transport, Peer, #state{mfa = {M, F, Args}}) ->
350
    erlang:apply(M, F, [Transport, Peer | Args]).
9✔
351

352
activate_sock(State = #state{sock = Sock}) ->
353
    ok = inet:setopts(Sock, [{active, ?ACTIVE_N}]), State.
6✔
354

355
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
356
    State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
9✔
357

358
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
359
    State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
1✔
360

361
raw({allow, CIDR = {_Start, _End, _Len}}) ->
362
     {allow, esockd_cidr:to_string(CIDR)};
×
363
raw({deny, CIDR = {_Start, _End, _Len}}) ->
364
     {deny, esockd_cidr:to_string(CIDR)};
×
365
raw(Rule) ->
366
     Rule.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc