• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 416

11 Jul 2024 09:50AM UTC coverage: 67.67%. First build
416

Pull #189

github

web-flow
Merge 044df43c5 into 32517719e
Pull Request #189: feat(udp): Support UDP port health check

7 of 9 new or added lines in 1 file covered. (77.78%)

877 of 1296 relevant lines covered (67.67%)

57.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.04
/src/esockd_udp.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp).
18

19
-behaviour(gen_server).
20

21
-import(esockd_listener_sup, [conn_rate_limiter/1, conn_limiter_opts/2, conn_limiter_opt/2]).
22

23
-export([ server/3
24
        , count_peers/1
25
        , stop/1
26
        ]).
27

28
%% get/set
29
-export([ get_options/2
30
        , get_acceptors/1
31
        , get_max_connections/1
32
        , get_max_conn_rate/3
33
        , get_current_connections/1
34
        , get_shutdown_count/1
35
        ]).
36

37
-export([ set_options/3
38
        , set_max_connections/3
39
        , set_max_conn_rate/3
40
        ]).
41

42
-export([ get_access_rules/1
43
        , allow/2
44
        , deny/2
45
        ]).
46

47
%% gen_server callbacks
48
-export([ init/1
49
        , handle_call/3
50
        , handle_cast/2
51
        , handle_info/2
52
        , terminate/2
53
        , code_change/3
54
        ]).
55

56
-type(maybe(T) :: undefined | T).
57

58
-record(state, {
59
          proto        :: atom(),
60
          sock         :: inet:socket(),
61
          port         :: inet:port_number(),
62
          rate_limit   :: maybe(esockd_rate_limit:bucket()),
63
          conn_limiter :: esockd_generic_limiter:limiter(),
64
          limit_timer  :: maybe(reference()),
65
          max_peers    :: infinity | pos_integer(),
66
          peers        :: map(),
67
          options      :: [esockd:option()],
68
          access_rules :: list(),
69
          mfa          :: esockd:mfargs(),
70
          health_check_request :: maybe(binary()),
71
          health_check_reply :: maybe(binary())
72
         }).
73

74
-define(ACTIVE_N, 100).
75
-define(ENABLED(X), (X =/= undefined)).
76
-define(DEFAULT_OPTS, [binary, {reuseaddr, true}]).
77
-define(ERROR_MSG(Format, Args),
78
        error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
79

80
%%--------------------------------------------------------------------
81
%% API
82
%%--------------------------------------------------------------------
83

84
-spec(server(atom(), esockd:listen_on(), [esockd:option()])
85
      -> {ok, pid()} | {error, term()}).
86
server(Proto, ListenOn, Opts) ->
87
    gen_server:start_link(?MODULE, [Proto, ListenOn, Opts], []).
12✔
88

89
resolve_addr(Port, SockOpts) when is_integer(Port) ->
90
    SockOpts;
7✔
91
resolve_addr({Addr, _Port}, SockOpts) ->
92
    IfAddr = case proplists:get_value(ip, SockOpts) of
5✔
93
                 undefined -> proplists:get_value(ifaddr, SockOpts);
5✔
94
                 Addr      -> Addr
×
95
             end,
96
    case (IfAddr =:= undefined) orelse (IfAddr =:= Addr) of
5✔
97
        true -> ok;
5✔
98
        _ -> error({badarg, inconsistent_addr})
×
99
    end,
100
    lists:keystore(ip, 1, SockOpts, {ip, Addr}).
5✔
101

102
-spec(count_peers(pid()) -> integer()).
103
count_peers(Pid) ->
104
    gen_server:call(Pid, count_peers).
4✔
105

106
-spec(stop(pid()) -> ok).
107
stop(Pid) -> gen_server:stop(Pid).
4✔
108

109
%%--------------------------------------------------------------------
110
%% GET/SET APIs
111
%%--------------------------------------------------------------------
112

113
get_options(_ListenerRef, Pid) ->
114
    gen_server:call(Pid, options).
1✔
115

116
get_acceptors(_Pid) ->
117
    1.
1✔
118

119
get_max_connections(Pid) ->
120
    gen_server:call(Pid, max_peers).
2✔
121

122
get_max_conn_rate(_Pid, Proto, ListenOn) ->
123
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
2✔
124
        undefined ->
125
            {error, not_found};
×
126
        #{capacity := Capacity, interval := Interval} ->
127
            {Capacity, Interval}
2✔
128
    end.
129

130
get_current_connections(Pid) ->
131
    gen_server:call(Pid, count_peers).
1✔
132

133
get_shutdown_count(_Pid) ->
134
    [].
1✔
135

136
set_options(_ListenerRef, _Pid, _Opts) ->
137
    {error, not_supported}.
×
138

139
set_max_connections(_ListenerRef, Pid, MaxLimit) when is_integer(MaxLimit) ->
140
    gen_server:call(Pid, {max_peers, MaxLimit}).
1✔
141

142
set_max_conn_rate(_ListenerRef = {Proto, ListenOn}, Pid, Opts) ->
143
    gen_server:call(Pid, {max_conn_rate, Proto, ListenOn, Opts}).
1✔
144

145
get_access_rules(Pid) ->
146
    gen_server:call(Pid, access_rules).
×
147

148
allow(Pid, CIDR) ->
149
    gen_server:call(Pid, {add_rule, {allow, CIDR}}).
×
150

151
deny(Pid, CIDR) ->
152
    gen_server:call(Pid, {add_rule, {deny, CIDR}}).
×
153

154
%%--------------------------------------------------------------------
155
%% gen_server callbacks
156
%%--------------------------------------------------------------------
157

158
init([Proto, ListenOn, Opts]) ->
159
    process_flag(trap_exit, true),
12✔
160
    put(incoming_peers, 0),
12✔
161

162
    MFA = proplists:get_value(connection_mfargs, Opts),
12✔
163
    RawRules = proplists:get_value(access_rules, Opts, [{allow, all}]),
12✔
164
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
12✔
165

166
    Port = port(ListenOn),
12✔
167
    UdpOpts = resolve_addr(ListenOn, sockopts(Opts)),
12✔
168
    case gen_udp:open(Port, esockd:merge_opts(?DEFAULT_OPTS, UdpOpts)) of
12✔
169
        {ok, Sock} ->
170
            %% Trigger the udp_passive event
171
            ok = inet:setopts(Sock, [{active, 1}]),
12✔
172
            Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
12✔
173
            MaxPeers = proplists:get_value(max_connections, Opts, infinity),
12✔
174
            init_health_check(#state{proto = Proto,
12✔
175
                                     sock = Sock,
176
                                     port = Port,
177
                                     max_peers = MaxPeers,
178
                                     peers = #{},
179
                                     access_rules = AccessRules,
180
                                     conn_limiter = Limiter,
181
                                     options = Opts,
182
                                     mfa = MFA},
183
                              Opts);
184
        {error, Reason} ->
185
            {stop, Reason}
×
186
    end.
187

188
port(Port) when is_integer(Port) -> Port;
7✔
189
port({_Addr, Port}) -> Port.
5✔
190

191
sockopts(Opts) ->
192
    esockd:merge_opts(
12✔
193
      ?DEFAULT_OPTS,
194
      proplists:get_value(udp_options, Opts, [])
195
     ).
196

197
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
198
    {reply, maps:size(Peers) div 2, State};
5✔
199

200
handle_call(max_peers, _From, State = #state{max_peers = MaxLimit}) ->
201
    {reply, MaxLimit, State};
2✔
202

203
handle_call({max_peers, MaxLimit}, _From, State) ->
204
    {reply, ok, State#state{max_peers = MaxLimit}};
1✔
205

206
handle_call({max_conn_rate, Proto, ListenOn, Opts}, _From, State) ->
207
    Limiter = conn_rate_limiter(conn_limiter_opt(Opts, {listener, Proto, ListenOn})),
1✔
208
    {reply, ok, State#state{conn_limiter = Limiter}};
1✔
209

210
handle_call(options, _From, State = #state{options = Opts}) ->
211
    {reply, Opts, State};
1✔
212

213
handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
214
    {reply, [raw(Rule) || Rule <- Rules], State};
×
215

216
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
217
    try esockd_access:compile(RawRule) of
×
218
        Rule ->
219
            case lists:member(Rule, Rules) of
×
220
                true ->
221
                    {reply, {error, already_exists}, State};
×
222
                false ->
223
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
×
224
            end
225
    catch
226
        error:Reason ->
227
            ?ERROR_MSG("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
228
            {reply, {error, bad_access_rule}, State}
×
229
    end;
230

231
%% mimic the supervisor's which_children reply
232
handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Func, _Args}}) ->
233
     {reply, [{undefined, Pid, worker, [Mod]}
×
234
              || Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};
×
235

236
handle_call(Req, _From, State) ->
237
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
1✔
238
    {reply, ignore, State}.
1✔
239

240
handle_cast(Msg, State) ->
241
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
1✔
242
    {noreply, State}.
1✔
243

244
handle_info({udp, Sock, IP, Port, Request},
245
            State = #state{sock = Sock,
246
                           health_check_request = Request,
247
                           health_check_reply = Reply}) ->
248
    case gen_udp:send(Sock, IP, Port, Reply) of
1✔
249
        ok -> ok;
1✔
250
        {error, Reason} ->
NEW
251
            ?ERROR_MSG("Health check response to: ~s failed, reason: ~s",
×
252
                       [esockd:format({IP, Port}), Reason])
253
    end,
254
    {noreply, State};
1✔
255
handle_info({udp, Sock, IP, InPortNo, Packet},
256
            State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
257
    case maps:find(Peer = {IP, InPortNo}, Peers) of
13✔
258
        {ok, Pid} ->
259
            Pid ! {datagram, self(), Packet},
2✔
260
            {noreply, State};
2✔
261
        error ->
262
            case allowed(IP, Rules) of
11✔
263
                true ->
264
                    put(incoming_peers, get(incoming_peers) + 1),
11✔
265
                    try should_throttle(State) orelse
11✔
266
                        start_channel({udp, self(), Sock}, Peer, State) of
11✔
267
                        true ->
268
                            ?ERROR_MSG("Cannot create udp channel for peer ~s due to throttling.",
×
269
                                       [esockd:format(Peer)]),
270
                            {noreply, State};
×
271
                        {ok, Pid} ->
272
                            true = erlang:link(Pid),
11✔
273
                            Pid ! {datagram, self(), Packet},
11✔
274
                            {noreply, store_peer(Peer, Pid, State)};
11✔
275
                        {error, Reason} ->
276
                            ?ERROR_MSG("Failed to start udp channel for peer ~s, reason: ~p",
×
277
                                       [esockd:format(Peer), Reason]),
278
                            {noreply, State}
×
279
                    catch
280
                        _Error:Reason ->
281
                            ?ERROR_MSG("Exception occurred when starting udp channel for peer ~s, reason: ~p",
×
282
                                       [esockd:format(Peer), Reason]),
283
                            {noreply, State}
×
284
                    end;
285
                false ->
286
                    {noreply, State}
×
287
            end
288
    end;
289

290
handle_info({udp_passive, Sock}, State = #state{sock = Sock, rate_limit = Rl}) ->
291
    NState = case ?ENABLED(Rl) andalso
8✔
292
                  esockd_rate_limit:check(put(incoming_peers, 0), Rl) of
×
293
                 false ->
294
                     activate_sock(State);
8✔
295
                 {0, Rl1} ->
296
                     activate_sock(State#state{rate_limit = Rl1});
×
297
                 {Pause, Rl1} ->
298
                     ?ERROR_MSG("Pause ~w(ms) due to rate limit.", [Pause]),
×
299
                     TRef = erlang:start_timer(Pause, self(), activate_sock),
×
300
                     State#state{rate_limit = Rl1, limit_timer = TRef}
×
301
             end,
302
    {noreply, NState, hibernate};
8✔
303

304
handle_info({udp_error, Sock, Reason}, State = #state{sock = Sock}) ->
305
  {stop, {udp_error, Reason}, State};
1✔
306
handle_info({udp_closed, Sock}, State = #state{sock = Sock}) ->
307
  {stop, udp_closed, State};
×
308

309
handle_info({timeout, TRef, activate_sock}, State = #state{limit_timer = TRef}) ->
310
    NState = State#state{limit_timer = undefined},
×
311
    {noreply, activate_sock(NState)};
×
312

313
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
314
    handle_peer_down(DownPid, Peers, State);
×
315

316
handle_info({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
317
    handle_peer_down(DownPid, Peers, State);
1✔
318

319
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
320
    case gen_udp:send(Sock, IP, Port, Packet) of
12✔
321
        ok -> ok;
12✔
322
        {error, Reason} ->
323
            ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd:format(Peer), Reason])
×
324
    end,
325
    {noreply, State};
12✔
326
handle_info(Info, State) ->
327
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
1✔
328
    {noreply, State}.
1✔
329

330
terminate(_Reason, #state{sock = Sock}) ->
331
    gen_udp:close(Sock).
12✔
332

333
code_change(_OldVsn, State, _Extra) ->
334
    {ok, State}.
1✔
335

336
%%--------------------------------------------------------------------
337
%% Internel functions
338
%%--------------------------------------------------------------------
339

340
handle_peer_down(DownPid, Peers, State) ->
341
    case maps:find(DownPid, Peers) of
1✔
342
        {ok, Peer} ->
343
            {noreply, erase_peer(Peer, DownPid, State)};
1✔
344
        error ->
345
            {noreply, State}
×
346
    end.
347

348
-compile({inline,
349
          [ allowed/2
350
          , should_throttle/1
351
          , start_channel/3
352
          , activate_sock/1
353
          , store_peer/3
354
          , erase_peer/3
355
          , raw/1
356
          ]}).
357

358
allowed(Addr, Rules) ->
359
    case esockd_access:match(Addr, Rules) of
11✔
360
        nomatch          -> true;
×
361
        {matched, allow} -> true;
11✔
362
        {matched, deny}  -> false
×
363
    end.
364

365
should_throttle(#state{max_peers = infinity}) -> false;
11✔
366
should_throttle(#state{max_peers = MaxLimit, peers = Peers}) ->
367
    (maps:size(Peers) div 2) > MaxLimit.
×
368

369
start_channel(Transport, Peer, #state{mfa = MFA}) ->
370
    esockd:start_mfargs(MFA, Transport, Peer).
11✔
371

372
activate_sock(State = #state{sock = Sock}) ->
373
    ok = inet:setopts(Sock, [{active, ?ACTIVE_N}]), State.
8✔
374

375
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
376
    State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
11✔
377

378
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
379
    State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
1✔
380

381
raw({allow, CIDR = {_Start, _End, _Len}}) ->
382
     {allow, esockd_cidr:to_string(CIDR)};
×
383
raw({deny, CIDR = {_Start, _End, _Len}}) ->
384
     {deny, esockd_cidr:to_string(CIDR)};
×
385
raw(Rule) ->
386
     Rule.
×
387

388
init_health_check(State, Opts) ->
389
    case proplists:get_value(health_check, Opts) of
12✔
390
        #{request := Request, reply := Reply} when is_binary(Request), is_binary(Reply) ->
391
            {ok, State#state{health_check_request = Request, health_check_reply = Reply}};
1✔
392
        undefined ->
393
            {ok, State};
11✔
394
        Any ->
NEW
395
            {error, {invalid_health_check_data, Any}}
×
396
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc