• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 386

15 Apr 2024 11:41AM UTC coverage: 73.328%. First build
386

Pull #187

github

zhongwencool
chore: ca's validity extended to 10 years
Pull Request #187: fix: stop process when receive udp_error/udp_closed

0 of 2 new or added lines in 1 file covered. (0.0%)

866 of 1181 relevant lines covered (73.33%)

62.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.48
/src/esockd_udp.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp).
18

19
-behaviour(gen_server).
20

21
-import(esockd_listener_sup, [conn_rate_limiter/1, conn_limiter_opts/2, conn_limiter_opt/2]).
22

23
-export([ server/3
24
        , count_peers/1
25
        , stop/1
26
        ]).
27

28
%% get/set
29
-export([ get_options/2
30
        , get_acceptors/1
31
        , get_max_connections/1
32
        , get_max_conn_rate/3
33
        , get_current_connections/1
34
        , get_shutdown_count/1
35
        ]).
36

37
-export([ set_options/3
38
        , set_max_connections/3
39
        , set_max_conn_rate/3
40
        ]).
41

42
-export([ get_access_rules/1
43
        , allow/2
44
        , deny/2
45
        ]).
46

47
%% gen_server callbacks
48
-export([ init/1
49
        , handle_call/3
50
        , handle_cast/2
51
        , handle_info/2
52
        , terminate/2
53
        , code_change/3
54
        ]).
55

56
-type(maybe(T) :: undefined | T).
57

58
-record(state, {
59
          proto        :: atom(),
60
          sock         :: inet:socket(),
61
          port         :: inet:port_number(),
62
          rate_limit   :: maybe(esockd_rate_limit:bucket()),
63
          conn_limiter :: esockd_generic_limiter:limiter(),
64
          limit_timer  :: maybe(reference()),
65
          max_peers    :: infinity | pos_integer(),
66
          peers        :: map(),
67
          options      :: [esockd:option()],
68
          access_rules :: list(),
69
          mfa          :: esockd:mfargs()
70
         }).
71

72
-define(ACTIVE_N, 100).
73
-define(ENABLED(X), (X =/= undefined)).
74
-define(DEFAULT_OPTS, [binary, {reuseaddr, true}]).
75
-define(ERROR_MSG(Format, Args),
76
        error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
77

78
%%--------------------------------------------------------------------
79
%% API
80
%%--------------------------------------------------------------------
81

82
-spec(server(atom(), esockd:listen_on(), [esockd:option()])
83
      -> {ok, pid()} | {error, term()}).
84
server(Proto, ListenOn, Opts) ->
85
    gen_server:start_link(?MODULE, [Proto, ListenOn, Opts], []).
10✔
86

87
resolve_addr(Port, SockOpts) when is_integer(Port) ->
88
    SockOpts;
7✔
89
resolve_addr({Addr, _Port}, SockOpts) ->
90
    IfAddr = case proplists:get_value(ip, SockOpts) of
3✔
91
                 undefined -> proplists:get_value(ifaddr, SockOpts);
3✔
92
                 Addr      -> Addr
×
93
             end,
94
    case (IfAddr =:= undefined) orelse (IfAddr =:= Addr) of
3✔
95
        true -> ok;
3✔
96
        _ -> error({badarg, inconsistent_addr})
×
97
    end,
98
    lists:keystore(ip, 1, SockOpts, {ip, Addr}).
3✔
99

100
-spec(count_peers(pid()) -> integer()).
101
count_peers(Pid) ->
102
    gen_server:call(Pid, count_peers).
3✔
103

104
-spec(stop(pid()) -> ok).
105
stop(Pid) -> gen_server:stop(Pid).
3✔
106

107
%%--------------------------------------------------------------------
108
%% GET/SET APIs
109
%%--------------------------------------------------------------------
110

111
get_options(_ListenerRef, Pid) ->
112
    gen_server:call(Pid, options).
1✔
113

114
get_acceptors(_Pid) ->
115
    1.
1✔
116

117
get_max_connections(Pid) ->
118
    gen_server:call(Pid, max_peers).
2✔
119

120
get_max_conn_rate(_Pid, Proto, ListenOn) ->
121
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
2✔
122
        undefined ->
123
            {error, not_found};
×
124
        #{capacity := Capacity, interval := Interval} ->
125
            {Capacity, Interval}
2✔
126
    end.
127

128
get_current_connections(Pid) ->
129
    gen_server:call(Pid, count_peers).
1✔
130

131
get_shutdown_count(_Pid) ->
132
    [].
1✔
133

134
set_options(_ListenerRef, _Pid, _Opts) ->
135
    {error, not_supported}.
×
136

137
set_max_connections(_ListenerRef, Pid, MaxLimit) when is_integer(MaxLimit) ->
138
    gen_server:call(Pid, {max_peers, MaxLimit}).
1✔
139

140
set_max_conn_rate(_ListenerRef = {Proto, ListenOn}, Pid, Opts) ->
141
    gen_server:call(Pid, {max_conn_rate, Proto, ListenOn, Opts}).
1✔
142

143
get_access_rules(Pid) ->
144
    gen_server:call(Pid, access_rules).
×
145

146
allow(Pid, CIDR) ->
147
    gen_server:call(Pid, {add_rule, {allow, CIDR}}).
×
148

149
deny(Pid, CIDR) ->
150
    gen_server:call(Pid, {add_rule, {deny, CIDR}}).
×
151

152
%%--------------------------------------------------------------------
153
%% gen_server callbacks
154
%%--------------------------------------------------------------------
155

156
init([Proto, ListenOn, Opts]) ->
157
    process_flag(trap_exit, true),
10✔
158
    put(incoming_peers, 0),
10✔
159

160
    MFA = proplists:get_value(connection_mfargs, Opts),
10✔
161
    RawRules = proplists:get_value(access_rules, Opts, [{allow, all}]),
10✔
162
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
10✔
163

164
    Port = port(ListenOn),
10✔
165
    UdpOpts = resolve_addr(ListenOn, sockopts(Opts)),
10✔
166
    case gen_udp:open(Port, esockd:merge_opts(?DEFAULT_OPTS, UdpOpts)) of
10✔
167
        {ok, Sock} ->
168
            %% Trigger the udp_passive event
169
            ok = inet:setopts(Sock, [{active, 1}]),
10✔
170
            Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
10✔
171
            MaxPeers = proplists:get_value(max_connections, Opts, infinity),
10✔
172
            {ok, #state{proto = Proto,
10✔
173
                        sock = Sock,
174
                        port = Port,
175
                        max_peers = MaxPeers,
176
                        peers = #{},
177
                        access_rules = AccessRules,
178
                        conn_limiter = Limiter,
179
                        options = Opts,
180
                        mfa = MFA}};
181
        {error, Reason} ->
182
            {stop, Reason}
×
183
    end.
184

185
port(Port) when is_integer(Port) -> Port;
7✔
186
port({_Addr, Port}) -> Port.
3✔
187

188
sockopts(Opts) ->
189
    esockd:merge_opts(
10✔
190
      ?DEFAULT_OPTS,
191
      proplists:get_value(udp_options, Opts, [])
192
     ).
193

194
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
195
    {reply, maps:size(Peers) div 2, State};
4✔
196

197
handle_call(max_peers, _From, State = #state{max_peers = MaxLimit}) ->
198
    {reply, MaxLimit, State};
2✔
199

200
handle_call({max_peers, MaxLimit}, _From, State) ->
201
    {reply, ok, State#state{max_peers = MaxLimit}};
1✔
202

203
handle_call({max_conn_rate, Proto, ListenOn, Opts}, _From, State) ->
204
    Limiter = conn_rate_limiter(conn_limiter_opt(Opts, {listener, Proto, ListenOn})),
1✔
205
    {reply, ok, State#state{conn_limiter = Limiter}};
1✔
206

207
handle_call(options, _From, State = #state{options = Opts}) ->
208
    {reply, Opts, State};
1✔
209

210
handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
211
    {reply, [raw(Rule) || Rule <- Rules], State};
×
212

213
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
214
    try esockd_access:compile(RawRule) of
×
215
        Rule ->
216
            case lists:member(Rule, Rules) of
×
217
                true ->
218
                    {reply, {error, already_exists}, State};
×
219
                false ->
220
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
×
221
            end
222
    catch
223
        error:Reason ->
224
            ?ERROR_MSG("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
225
            {reply, {error, bad_access_rule}, State}
×
226
    end;
227

228
%% mimic the supervisor's which_children reply
229
handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Func, _Args}}) ->
230
     {reply, [{undefined, Pid, worker, [Mod]}
×
231
              || Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};
×
232

233
handle_call(Req, _From, State) ->
234
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
1✔
235
    {reply, ignore, State}.
1✔
236

237
handle_cast(Msg, State) ->
238
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
1✔
239
    {noreply, State}.
1✔
240

241
handle_info({udp, Sock, IP, InPortNo, Packet},
242
            State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
243
    case maps:find(Peer = {IP, InPortNo}, Peers) of
11✔
244
        {ok, Pid} ->
245
            Pid ! {datagram, self(), Packet},
2✔
246
            {noreply, State};
2✔
247
        error ->
248
            case allowed(IP, Rules) of
9✔
249
                true ->
250
                    put(incoming_peers, get(incoming_peers) + 1),
9✔
251
                    try should_throttle(State) orelse
9✔
252
                        start_channel({udp, self(), Sock}, Peer, State) of
9✔
253
                        true ->
254
                            ?ERROR_MSG("Cannot create udp channel for peer ~s due to throttling.",
×
255
                                       [esockd:format(Peer)]),
256
                            {noreply, State};
×
257
                        {ok, Pid} ->
258
                            _Ref = erlang:monitor(process, Pid),
9✔
259
                            Pid ! {datagram, self(), Packet},
9✔
260
                            {noreply, store_peer(Peer, Pid, State)};
9✔
261
                        {error, Reason} ->
262
                            ?ERROR_MSG("Failed to start udp channel for peer ~s, reason: ~p",
×
263
                                       [esockd:format(Peer), Reason]),
264
                            {noreply, State}
×
265
                    catch
266
                        _Error:Reason ->
267
                            ?ERROR_MSG("Exception occurred when starting udp channel for peer ~s, reason: ~p",
×
268
                                       [esockd:format(Peer), Reason]),
269
                            {noreply, State}
×
270
                    end;
271
                false ->
272
                    {noreply, State}
×
273
            end
274
    end;
275

276
handle_info({udp_passive, Sock}, State = #state{sock = Sock, rate_limit = Rl}) ->
277
    NState = case ?ENABLED(Rl) andalso
6✔
278
                  esockd_rate_limit:check(put(incoming_peers, 0), Rl) of
×
279
                 false ->
280
                     activate_sock(State);
6✔
281
                 {0, Rl1} ->
282
                     activate_sock(State#state{rate_limit = Rl1});
×
283
                 {Pause, Rl1} ->
284
                     ?ERROR_MSG("Pause ~w(ms) due to rate limit.", [Pause]),
×
285
                     TRef = erlang:start_timer(Pause, self(), activate_sock),
×
286
                     State#state{rate_limit = Rl1, limit_timer = TRef}
×
287
             end,
288
    {noreply, NState, hibernate};
6✔
289

290
handle_info({udp_error, _Sock, Reason}, State) ->
NEW
291
    {stop, {udp_error, Reason}, State};
×
292
handle_info({udp_closed, _Sock}, State) ->
NEW
293
    {stop, udp_closed, State};
×
294

295
handle_info({timeout, TRef, activate_sock}, State = #state{limit_timer = TRef}) ->
296
    NState = State#state{limit_timer = undefined},
×
297
    {noreply, activate_sock(NState)};
×
298

299
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
300
    handle_peer_down(DownPid, Peers, State);
1✔
301

302
handle_info({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
303
    handle_peer_down(DownPid, Peers, State);
×
304

305
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
306
    case gen_udp:send(Sock, IP, Port, Packet) of
10✔
307
        ok -> ok;
10✔
308
        {error, Reason} ->
309
            ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd:format(Peer), Reason])
×
310
    end,
311
    {noreply, State};
10✔
312
handle_info(Info, State) ->
313
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
1✔
314
    {noreply, State}.
1✔
315

316
terminate(_Reason, #state{sock = Sock}) ->
317
    gen_udp:close(Sock).
10✔
318

319
code_change(_OldVsn, State, _Extra) ->
320
    {ok, State}.
1✔
321

322
%%--------------------------------------------------------------------
323
%% Internel functions
324
%%--------------------------------------------------------------------
325

326
handle_peer_down(DownPid, Peers, State) ->
327
    case maps:find(DownPid, Peers) of
1✔
328
        {ok, Peer} ->
329
            {noreply, erase_peer(Peer, DownPid, State)};
1✔
330
        error ->
331
            {noreply, State}
×
332
    end.
333

334
-compile({inline,
335
          [ allowed/2
336
          , should_throttle/1
337
          , start_channel/3
338
          , activate_sock/1
339
          , store_peer/3
340
          , erase_peer/3
341
          , raw/1
342
          ]}).
343

344
allowed(Addr, Rules) ->
345
    case esockd_access:match(Addr, Rules) of
9✔
346
        nomatch          -> true;
×
347
        {matched, allow} -> true;
9✔
348
        {matched, deny}  -> false
×
349
    end.
350

351
should_throttle(#state{max_peers = infinity}) -> false;
9✔
352
should_throttle(#state{max_peers = MaxLimit, peers = Peers}) ->
353
    (maps:size(Peers) div 2) > MaxLimit.
×
354

355
start_channel(Transport, Peer, #state{mfa = MFA}) ->
356
    esockd:start_mfargs(MFA, Transport, Peer).
9✔
357

358
activate_sock(State = #state{sock = Sock}) ->
359
    ok = inet:setopts(Sock, [{active, ?ACTIVE_N}]), State.
6✔
360

361
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
362
    State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
9✔
363

364
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
365
    State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
1✔
366

367
raw({allow, CIDR = {_Start, _End, _Len}}) ->
368
     {allow, esockd_cidr:to_string(CIDR)};
×
369
raw({deny, CIDR = {_Start, _End, _Len}}) ->
370
     {deny, esockd_cidr:to_string(CIDR)};
×
371
raw(Rule) ->
372
     Rule.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc