• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 435

13 Jul 2024 11:07AM UTC coverage: 66.998%. First build
435

Pull #190

github

web-flow
Merge 8478e6815 into 4ec038215
Pull Request #190: fix: refactor the udp proxy

0 of 46 new or added lines in 4 files covered. (0.0%)

877 of 1309 relevant lines covered (67.0%)

56.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.99
/src/esockd_udp.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp).
18

19
-behaviour(gen_server).
20

21
-import(esockd_listener_sup, [conn_rate_limiter/1, conn_limiter_opts/2, conn_limiter_opt/2]).
22

23
-export([ server/3
24
        , count_peers/1
25
        , stop/1
26
        ]).
27

28
%% get/set
29
-export([ get_options/2
30
        , get_acceptors/1
31
        , get_max_connections/1
32
        , get_max_conn_rate/3
33
        , get_current_connections/1
34
        , get_shutdown_count/1
35
        ]).
36

37
-export([ set_options/3
38
        , set_max_connections/3
39
        , set_max_conn_rate/3
40
        ]).
41

42
-export([ get_access_rules/1
43
        , allow/2
44
        , deny/2
45
        ]).
46

47
%% gen_server callbacks
48
-export([ init/1
49
        , handle_call/3
50
        , handle_cast/2
51
        , handle_info/2
52
        , terminate/2
53
        , code_change/3
54
        ]).
55

56
-export([proxy_request/1]).
57

58
-type(maybe(T) :: undefined | T).
59

60
-record(state, {
61
          proto        :: atom(),
62
          sock         :: inet:socket(),
63
          port         :: inet:port_number(),
64
          rate_limit   :: maybe(esockd_rate_limit:bucket()),
65
          conn_limiter :: esockd_generic_limiter:limiter(),
66
          limit_timer  :: maybe(reference()),
67
          max_peers    :: infinity | pos_integer(),
68
          peers        :: map(),
69
          options      :: [esockd:option()],
70
          access_rules :: list(),
71
          mfa          :: esockd:mfargs(),
72
          health_check_request :: maybe(binary()),
73
          health_check_reply :: maybe(binary())
74
         }).
75

76
-define(ACTIVE_N, 100).
77
-define(ENABLED(X), (X =/= undefined)).
78
-define(DEFAULT_OPTS, [binary, {reuseaddr, true}]).
79
-define(ERROR_MSG(Format, Args),
80
        error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])).
81

82
%%--------------------------------------------------------------------
83
%% API
84
%%--------------------------------------------------------------------
85

86
-spec(server(atom(), esockd:listen_on(), [esockd:option()])
87
      -> {ok, pid()} | {error, term()}).
88
server(Proto, ListenOn, Opts) ->
89
    gen_server:start_link(?MODULE, [Proto, ListenOn, Opts], []).
12✔
90

91
resolve_addr(Port, SockOpts) when is_integer(Port) ->
92
    SockOpts;
7✔
93
resolve_addr({Addr, _Port}, SockOpts) ->
94
    IfAddr = case proplists:get_value(ip, SockOpts) of
5✔
95
                 undefined -> proplists:get_value(ifaddr, SockOpts);
5✔
96
                 Addr      -> Addr
×
97
             end,
98
    case (IfAddr =:= undefined) orelse (IfAddr =:= Addr) of
5✔
99
        true -> ok;
5✔
100
        _ -> error({badarg, inconsistent_addr})
×
101
    end,
102
    lists:keystore(ip, 1, SockOpts, {ip, Addr}).
5✔
103

104
-spec(count_peers(pid()) -> integer()).
105
count_peers(Pid) ->
106
    gen_server:call(Pid, count_peers).
4✔
107

108
-spec(stop(pid()) -> ok).
109
stop(Pid) -> gen_server:stop(Pid).
4✔
110

111
proxy_request(Fun) ->
NEW
112
    Parent = gen:get_parent(),
×
NEW
113
    gen_server:call(Parent, {?FUNCTION_NAME, Fun}, infinity).
×
114

115
%%--------------------------------------------------------------------
116
%% GET/SET APIs
117
%%--------------------------------------------------------------------
118

119
get_options(_ListenerRef, Pid) ->
120
    gen_server:call(Pid, options).
1✔
121

122
get_acceptors(_Pid) ->
123
    1.
1✔
124

125
get_max_connections(Pid) ->
126
    gen_server:call(Pid, max_peers).
2✔
127

128
get_max_conn_rate(_Pid, Proto, ListenOn) ->
129
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
2✔
130
        undefined ->
131
            {error, not_found};
×
132
        #{capacity := Capacity, interval := Interval} ->
133
            {Capacity, Interval}
2✔
134
    end.
135

136
get_current_connections(Pid) ->
137
    gen_server:call(Pid, count_peers).
1✔
138

139
get_shutdown_count(_Pid) ->
140
    [].
1✔
141

142
set_options(_ListenerRef, _Pid, _Opts) ->
143
    {error, not_supported}.
×
144

145
set_max_connections(_ListenerRef, Pid, MaxLimit) when is_integer(MaxLimit) ->
146
    gen_server:call(Pid, {max_peers, MaxLimit}).
1✔
147

148
set_max_conn_rate(_ListenerRef = {Proto, ListenOn}, Pid, Opts) ->
149
    gen_server:call(Pid, {max_conn_rate, Proto, ListenOn, Opts}).
1✔
150

151
get_access_rules(Pid) ->
152
    gen_server:call(Pid, access_rules).
×
153

154
allow(Pid, CIDR) ->
155
    gen_server:call(Pid, {add_rule, {allow, CIDR}}).
×
156

157
deny(Pid, CIDR) ->
158
    gen_server:call(Pid, {add_rule, {deny, CIDR}}).
×
159

160
%%--------------------------------------------------------------------
161
%% gen_server callbacks
162
%%--------------------------------------------------------------------
163

164
init([Proto, ListenOn, Opts]) ->
165
    process_flag(trap_exit, true),
12✔
166
    put(incoming_peers, 0),
12✔
167

168
    MFA = proplists:get_value(connection_mfargs, Opts),
12✔
169
    RawRules = proplists:get_value(access_rules, Opts, [{allow, all}]),
12✔
170
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
12✔
171

172
    Port = port(ListenOn),
12✔
173
    UdpOpts = resolve_addr(ListenOn, sockopts(Opts)),
12✔
174
    case gen_udp:open(Port, esockd:merge_opts(?DEFAULT_OPTS, UdpOpts)) of
12✔
175
        {ok, Sock} ->
176
            %% Trigger the udp_passive event
177
            ok = inet:setopts(Sock, [{active, 1}]),
12✔
178
            Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
12✔
179
            MaxPeers = proplists:get_value(max_connections, Opts, infinity),
12✔
180
            init_health_check(#state{proto = Proto,
12✔
181
                                     sock = Sock,
182
                                     port = Port,
183
                                     max_peers = MaxPeers,
184
                                     peers = #{},
185
                                     access_rules = AccessRules,
186
                                     conn_limiter = Limiter,
187
                                     options = Opts,
188
                                     mfa = MFA},
189
                              Opts);
190
        {error, Reason} ->
191
            {stop, Reason}
×
192
    end.
193

194
port(Port) when is_integer(Port) -> Port;
7✔
195
port({_Addr, Port}) -> Port.
5✔
196

197
sockopts(Opts) ->
198
    esockd:merge_opts(
12✔
199
      ?DEFAULT_OPTS,
200
      proplists:get_value(udp_options, Opts, [])
201
     ).
202

203
handle_call(count_peers, _From, State = #state{peers = Peers}) ->
204
    {reply, maps:size(Peers) div 2, State};
5✔
205

206
handle_call(max_peers, _From, State = #state{max_peers = MaxLimit}) ->
207
    {reply, MaxLimit, State};
2✔
208

209
handle_call({max_peers, MaxLimit}, _From, State) ->
210
    {reply, ok, State#state{max_peers = MaxLimit}};
1✔
211

212
handle_call({max_conn_rate, Proto, ListenOn, Opts}, _From, State) ->
213
    Limiter = conn_rate_limiter(conn_limiter_opt(Opts, {listener, Proto, ListenOn})),
1✔
214
    {reply, ok, State#state{conn_limiter = Limiter}};
1✔
215

216
handle_call(options, _From, State = #state{options = Opts}) ->
217
    {reply, Opts, State};
1✔
218

219
handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
220
    {reply, [raw(Rule) || Rule <- Rules], State};
×
221

222
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
223
    try esockd_access:compile(RawRule) of
×
224
        Rule ->
225
            case lists:member(Rule, Rules) of
×
226
                true ->
227
                    {reply, {error, already_exists}, State};
×
228
                false ->
229
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
×
230
            end
231
    catch
232
        error:Reason ->
233
            ?ERROR_MSG("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
234
            {reply, {error, bad_access_rule}, State}
×
235
    end;
236

237
%% mimic the supervisor's which_children reply
238
handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Func, _Args}}) ->
239
     {reply, [{undefined, Pid, worker, [Mod]}
×
240
              || Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};
×
241

242
handle_call({proxy_request, Fun}, _From, State) ->
NEW
243
    Result = Fun(),
×
NEW
244
    {reply, Result, State};
×
245

246
handle_call(Req, _From, State) ->
247
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
1✔
248
    {reply, ignore, State}.
1✔
249

250
handle_cast(Msg, State) ->
251
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
1✔
252
    {noreply, State}.
1✔
253

254
handle_info({udp, Sock, IP, Port, Request},
255
            State = #state{sock = Sock,
256
                           health_check_request = Request,
257
                           health_check_reply = Reply}) ->
258
    case gen_udp:send(Sock, IP, Port, Reply) of
1✔
259
        ok -> ok;
1✔
260
        {error, Reason} ->
261
            ?ERROR_MSG("Health check response to: ~s failed, reason: ~s",
×
262
                       [esockd:format({IP, Port}), Reason])
263
    end,
264
    {noreply, State};
1✔
265
handle_info({udp, Sock, IP, InPortNo, Packet},
266
            State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
267
    case maps:find(Peer = {IP, InPortNo}, Peers) of
13✔
268
        {ok, Pid} ->
269
            Pid ! {datagram, self(), Packet},
2✔
270
            {noreply, State};
2✔
271
        error ->
272
            case allowed(IP, Rules) of
11✔
273
                true ->
274
                    put(incoming_peers, get(incoming_peers) + 1),
11✔
275
                    try should_throttle(State) orelse
11✔
276
                        start_channel({udp, self(), Sock}, Peer, State) of
11✔
277
                        true ->
278
                            ?ERROR_MSG("Cannot create udp channel for peer ~s due to throttling.",
×
279
                                       [esockd:format(Peer)]),
280
                            {noreply, State};
×
281
                        {ok, Pid} ->
282
                            true = erlang:link(Pid),
11✔
283
                            Pid ! {datagram, self(), Packet},
11✔
284
                            {noreply, store_peer(Peer, Pid, State)};
11✔
285
                        {error, Reason} ->
286
                            ?ERROR_MSG("Failed to start udp channel for peer ~s, reason: ~p",
×
287
                                       [esockd:format(Peer), Reason]),
288
                            {noreply, State}
×
289
                    catch
290
                        _Error:Reason ->
291
                            ?ERROR_MSG("Exception occurred when starting udp channel for peer ~s, reason: ~p",
×
292
                                       [esockd:format(Peer), Reason]),
293
                            {noreply, State}
×
294
                    end;
295
                false ->
296
                    {noreply, State}
×
297
            end
298
    end;
299

300
handle_info({udp_passive, Sock}, State = #state{sock = Sock, rate_limit = Rl}) ->
301
    NState = case ?ENABLED(Rl) andalso
8✔
302
                  esockd_rate_limit:check(put(incoming_peers, 0), Rl) of
×
303
                 false ->
304
                     activate_sock(State);
8✔
305
                 {0, Rl1} ->
306
                     activate_sock(State#state{rate_limit = Rl1});
×
307
                 {Pause, Rl1} ->
308
                     ?ERROR_MSG("Pause ~w(ms) due to rate limit.", [Pause]),
×
309
                     TRef = erlang:start_timer(Pause, self(), activate_sock),
×
310
                     State#state{rate_limit = Rl1, limit_timer = TRef}
×
311
             end,
312
    {noreply, NState, hibernate};
8✔
313

314
handle_info({udp_error, Sock, Reason}, State = #state{sock = Sock}) ->
315
  {stop, {udp_error, Reason}, State};
1✔
316
handle_info({udp_closed, Sock}, State = #state{sock = Sock}) ->
317
  {stop, udp_closed, State};
×
318

319
handle_info({timeout, TRef, activate_sock}, State = #state{limit_timer = TRef}) ->
320
    NState = State#state{limit_timer = undefined},
×
321
    {noreply, activate_sock(NState)};
×
322

323
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{peers = Peers}) ->
324
    handle_peer_down(DownPid, Peers, State);
×
325

326
handle_info({'EXIT', DownPid, _Reason}, State = #state{peers = Peers}) ->
327
    handle_peer_down(DownPid, Peers, State);
1✔
328

329
handle_info({datagram, Peer = {IP, Port}, Packet}, State = #state{sock = Sock}) ->
330
    case gen_udp:send(Sock, IP, Port, Packet) of
12✔
331
        ok -> ok;
12✔
332
        {error, Reason} ->
333
            ?ERROR_MSG("Dropped packet to: ~s, reason: ~s", [esockd:format(Peer), Reason])
×
334
    end,
335
    {noreply, State};
12✔
336
handle_info(Info, State) ->
337
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
1✔
338
    {noreply, State}.
1✔
339

340
terminate(_Reason, #state{sock = Sock}) ->
341
    gen_udp:close(Sock).
12✔
342

343
code_change(_OldVsn, State, _Extra) ->
344
    {ok, State}.
1✔
345

346
%%--------------------------------------------------------------------
347
%% Internel functions
348
%%--------------------------------------------------------------------
349

350
handle_peer_down(DownPid, Peers, State) ->
351
    case maps:find(DownPid, Peers) of
1✔
352
        {ok, Peer} ->
353
            {noreply, erase_peer(Peer, DownPid, State)};
1✔
354
        error ->
355
            {noreply, State}
×
356
    end.
357

358
-compile({inline,
359
          [ allowed/2
360
          , should_throttle/1
361
          , start_channel/3
362
          , activate_sock/1
363
          , store_peer/3
364
          , erase_peer/3
365
          , raw/1
366
          ]}).
367

368
allowed(Addr, Rules) ->
369
    case esockd_access:match(Addr, Rules) of
11✔
370
        nomatch          -> true;
×
371
        {matched, allow} -> true;
11✔
372
        {matched, deny}  -> false
×
373
    end.
374

375
should_throttle(#state{max_peers = infinity}) -> false;
11✔
376
should_throttle(#state{max_peers = MaxLimit, peers = Peers}) ->
377
    (maps:size(Peers) div 2) > MaxLimit.
×
378

379
start_channel(Transport, Peer, #state{mfa = MFA}) ->
380
    esockd:start_mfargs(MFA, Transport, Peer).
11✔
381

382
activate_sock(State = #state{sock = Sock}) ->
383
    ok = inet:setopts(Sock, [{active, ?ACTIVE_N}]), State.
8✔
384

385
store_peer(Peer, Pid, State = #state{peers = Peers}) ->
386
    State#state{peers = maps:put(Pid, Peer, maps:put(Peer, Pid, Peers))}.
11✔
387

388
erase_peer(Peer, Pid, State = #state{peers = Peers}) ->
389
    State#state{peers = maps:remove(Peer, maps:remove(Pid, Peers))}.
1✔
390

391
raw({allow, CIDR = {_Start, _End, _Len}}) ->
392
     {allow, esockd_cidr:to_string(CIDR)};
×
393
raw({deny, CIDR = {_Start, _End, _Len}}) ->
394
     {deny, esockd_cidr:to_string(CIDR)};
×
395
raw(Rule) ->
396
     Rule.
×
397

398
init_health_check(State, Opts) ->
399
    case proplists:get_value(health_check, Opts) of
12✔
400
        #{request := Request, reply := Reply} when is_binary(Request), is_binary(Reply) ->
401
            {ok, State#state{health_check_request = Request, health_check_reply = Reply}};
1✔
402
        undefined ->
403
            {ok, State};
11✔
404
        Any ->
405
            {error, {invalid_health_check_data, Any}}
×
406
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc