• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 354

14 Dec 2023 12:54PM UTC coverage: 71.491%. First build
354

Pull #183

github

web-flow
Merge 3297859f4 into 5cb22a8b1
Pull Request #183: feat(listener): support changing options on the fly

170 of 192 new or added lines in 10 files covered. (88.54%)

820 of 1147 relevant lines covered (71.49%)

60.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.43
/src/esockd_connection_sup.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_connection_sup).
18

19
-behaviour(gen_server).
20

21
-import(proplists, [get_value/3]).
22

23
-export([start_link/2, start_supervised/2, stop/1]).
24

25
-export([ start_connection/3
26
        , count_connections/1
27
        ]).
28

29
-export([ get_max_connections/1
30
        , get_shutdown_count/1
31
        , get_options/1
32
        , set_options/2
33
        ]).
34

35
%% Allow, Deny
36
-export([ access_rules/1
37
        , allow/2
38
        , deny/2
39
        ]).
40

41
%% gen_server callbacks
42
-export([ init/1
43
        , handle_call/3
44
        , handle_cast/2
45
        , handle_info/2
46
        , terminate/2
47
        , code_change/3
48
        ]).
49

50
-type(shutdown() :: brutal_kill | infinity | pos_integer()).
51

52
-type option() :: {shutdown, shutdown()}
53
                | {max_connections, pos_integer()}
54
                | {access_rules, list()}.
55

56
-record(state, {
57
          curr_connections :: map(),
58
          max_connections :: pos_integer(),
59
          access_rules :: list(),
60
          shutdown :: shutdown(),
61
          mfargs :: esockd:mfargs()
62
         }).
63

64
-define(DEFAULT_MAX_CONNS, 1024).
65
-define(TRANSPORT, esockd_transport).
66
-define(ERROR_MSG(Format, Args),
67
        error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])).
68

69
%% @doc Start connection supervisor.
70
-spec(start_link([esockd:option()], esockd:mfargs())
71
      -> {ok, pid()} | ignore | {error, term()}).
72
start_link(Opts, MFA) ->
73
    gen_server:start_link(?MODULE, [Opts, MFA], []).
62✔
74

75
-spec start_supervised(esockd:listener_ref(), esockd:mfargs())
76
      -> {ok, pid()} | ignore | {error, term()}.
77
start_supervised(ListenerRef, MFA) ->
78
    Opts = esockd_server:get_listener_prop(ListenerRef, options),
56✔
79
    case start_link(Opts, MFA) of
56✔
80
        {ok, Pid} ->
81
            _ = esockd_server:set_listener_prop(ListenerRef, connection_sup, Pid),
56✔
82
            {ok, Pid};
56✔
83
        {error, _} = Error ->
NEW
84
            Error
×
85
    end.
86

87
-spec(stop(pid()) -> ok).
88
stop(Pid) -> gen_server:stop(Pid).
6✔
89

90
-spec get_options(pid()) -> [option()].
91
get_options(Pid) ->
92
    call(Pid, get_options).
18✔
93

94
-spec set_options(pid(), [option()]) -> ok | {error, _Reason}.
95
set_options(Pid, Opts) ->
96
    call(Pid, {set_options, Opts}).
12✔
97

98
%%--------------------------------------------------------------------
99
%% API
100
%%--------------------------------------------------------------------
101

102
%% @doc Start connection.
103
start_connection(Sup, Sock, UpgradeFuns) ->
104
    case call(Sup, {start_connection, Sock}) of
53✔
105
        {ok, ConnPid} ->
106
            %% Transfer controlling from acceptor to connection
107
            _ = ?TRANSPORT:controlling_process(Sock, ConnPid),
53✔
108
            _ = ?TRANSPORT:ready(ConnPid, Sock, UpgradeFuns),
53✔
109
            {ok, ConnPid};
53✔
110
        ignore -> ignore;
×
111
        {error, Reason} ->
112
            {error, Reason}
×
113
    end.
114

115
%% @doc Start the connection process.
116
-spec(start_connection_proc(esockd:mfargs(), esockd_transport:socket())
117
      -> {ok, pid()} | ignore | {error, term()}).
118
start_connection_proc(M, Sock) when is_atom(M) ->
119
    M:start_link(?TRANSPORT, Sock);
×
120
start_connection_proc({M, F}, Sock) when is_atom(M), is_atom(F) ->
121
    M:F(?TRANSPORT, Sock);
×
122
start_connection_proc({M, F, Args}, Sock) when is_atom(M), is_atom(F), is_list(Args) ->
123
    erlang:apply(M, F, [?TRANSPORT, Sock | Args]).
53✔
124

125
-spec(count_connections(pid()) -> integer()).
126
count_connections(Sup) ->
127
    call(Sup, count_connections).
4✔
128

129
-spec get_max_connections(pid()) -> pos_integer().
130
get_max_connections(Sup) when is_pid(Sup) ->
131
    proplists:get_value(max_connections, get_options(Sup)).
7✔
132

133
-spec(get_shutdown_count(pid()) -> [{atom(), integer()}]).
134
get_shutdown_count(Sup) ->
135
    call(Sup, get_shutdown_count).
4✔
136

137
access_rules(Sup) ->
138
    proplists:get_value(access_rules, get_options(Sup)).
11✔
139

140
allow(Sup, CIDR) ->
141
    call(Sup, {add_rule, {allow, CIDR}}).
4✔
142

143
deny(Sup, CIDR) ->
144
    call(Sup, {add_rule, {deny, CIDR}}).
4✔
145

146
call(Sup, Req) ->
147
    gen_server:call(Sup, Req, infinity).
99✔
148

149
%%--------------------------------------------------------------------
150
%% gen_server callbacks
151
%%--------------------------------------------------------------------
152

153
init([Opts, MFA]) ->
154
    process_flag(trap_exit, true),
62✔
155
    Shutdown = get_value(shutdown, Opts, brutal_kill),
62✔
156
    MaxConns = get_value(max_connections, Opts, ?DEFAULT_MAX_CONNS),
62✔
157
    RawRules = get_value(access_rules, Opts, [{allow, all}]),
62✔
158
    AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
62✔
159
    {ok, #state{curr_connections = #{},
62✔
160
                max_connections  = MaxConns,
161
                access_rules     = AccessRules,
162
                shutdown         = Shutdown,
163
                mfargs           = MFA}}.
164

165
handle_call({start_connection, _Sock}, _From,
166
            State = #state{curr_connections = Conns, max_connections = MaxConns})
167
    when map_size(Conns) >= MaxConns ->
168
    {reply, {error, maxlimit}, State};
×
169

170
handle_call({start_connection, Sock}, _From,
171
            State = #state{curr_connections = Conns, access_rules = Rules, mfargs = MFA}) ->
172
    case esockd_transport:peername(Sock) of
53✔
173
        {ok, {Addr, _Port}} ->
174
            case allowed(Addr, Rules) of
53✔
175
                true ->
176
                    try start_connection_proc(MFA, Sock) of
53✔
177
                        {ok, Pid} when is_pid(Pid) ->
178
                            NState = State#state{curr_connections = maps:put(Pid, true, Conns)},
53✔
179
                            {reply, {ok, Pid}, NState};
53✔
180
                        ignore ->
181
                            {reply, ignore, State};
×
182
                        {error, Reason} ->
183
                            {reply, {error, Reason}, State}
×
184
                    catch
185
                        _Error:Reason:ST ->
186
                            {reply, {error, {Reason, ST}}, State}
×
187
                    end;
188
                false ->
189
                    {reply, {error, forbidden}, State}
×
190
            end;
191
        {error, Reason} ->
192
            {reply, {error, Reason}, State}
×
193
    end;
194

195
handle_call(count_connections, _From, State = #state{curr_connections = Conns}) ->
196
    {reply, maps:size(Conns), State};
4✔
197

198
handle_call(get_shutdown_count, _From, State) ->
199
    Counts = [{Reason, Count} || {{shutdown_count, Reason}, Count} <- get()],
4✔
200
    {reply, Counts, State};
4✔
201

202
handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
203
    try esockd_access:compile(RawRule) of
8✔
204
        Rule ->
205
            case lists:member(Rule, Rules) of
8✔
206
                true ->
207
                    {reply, {error, already_exists}, State};
×
208
                false ->
209
                    {reply, ok, State#state{access_rules = [Rule | Rules]}}
8✔
210
            end
211
    catch
212
        error:Reason ->
213
            error_logger:error_msg("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
×
214
            {reply, {error, bad_access_rule}, State}
×
215
    end;
216

217
handle_call(get_options, _From, State) ->
218
    Options = [
18✔
219
        {shutdown, get_state_option(shutdown, State)},
220
        {max_connections, get_state_option(max_connections, State)},
221
        {access_rules, get_state_option(access_rules, State)}
222
    ],
223
    {reply, Options, State};
18✔
224

225
handle_call({set_options, Options}, _From, State) ->
226
    case set_state_options(Options, State) of
12✔
227
        NState = #state{} ->
228
            {reply, ok, NState};
11✔
229
        {error, Reason} ->
230
            {reply, {error, Reason}, State}
1✔
231
    end;
232

233
%% mimic the supervisor's which_children reply
234
handle_call(which_children, _From, State = #state{curr_connections = Conns, mfargs = MFA}) ->
235
    Mod = get_module(MFA),
×
236
    {reply, [{undefined, Pid, worker, [Mod]}
×
237
              || Pid <- maps:keys(Conns), erlang:is_process_alive(Pid)], State};
×
238

239
handle_call(Req, _From, State) ->
240
    ?ERROR_MSG("Unexpected call: ~p", [Req]),
1✔
241
    {reply, ignore, State}.
1✔
242

243
handle_cast(Msg, State) ->
244
    ?ERROR_MSG("Unexpected cast: ~p", [Msg]),
1✔
245
    {noreply, State}.
1✔
246

247
handle_info({'EXIT', Pid, Reason}, State = #state{curr_connections = Conns}) ->
248
    case maps:take(Pid, Conns) of
26✔
249
        {true, Conns1} ->
250
            connection_crashed(Pid, Reason, State),
26✔
251
            {noreply, State#state{curr_connections = Conns1}};
26✔
252
        error ->
253
            ?ERROR_MSG("Unexpected 'EXIT': ~p, reason: ~p", [Pid, Reason]),
×
254
            {noreply, State}
×
255
    end;
256

257
handle_info(Info, State) ->
258
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
1✔
259
    {noreply, State}.
1✔
260

261
terminate(_Reason, State) ->
262
    terminate_children(State).
62✔
263

264
code_change(_OldVsn, State, _Extra) ->
265
    {ok, State}.
×
266

267
%%--------------------------------------------------------------------
268
%% Internal functions
269
%%--------------------------------------------------------------------
270

271
allowed(Addr, Rules) ->
272
    case esockd_access:match(Addr, Rules) of
53✔
273
        nomatch          -> true;
×
274
        {matched, allow} -> true;
53✔
275
        {matched, deny}  -> false
×
276
    end.
277

278
get_state_option(max_connections, #state{max_connections = MaxConnections}) ->
279
    MaxConnections;
18✔
280
get_state_option(shutdown, #state{shutdown = Shutdown}) ->
281
    Shutdown;
18✔
282
get_state_option(access_rules, #state{access_rules = Rules}) ->
283
    [raw(Rule) || Rule <- Rules].
18✔
284

285
set_state_option({max_connections, MaxConns}, State) ->
286
    State#state{max_connections = MaxConns};
3✔
287
set_state_option({shutdown, Shutdown}, State) ->
NEW
288
    State#state{shutdown = Shutdown};
×
289
set_state_option({access_rules, Rules}, State) ->
290
    try
1✔
291
        CompiledRules = [esockd_access:compile(Rule) || Rule <- Rules],
1✔
NEW
292
        State#state{access_rules = CompiledRules}
×
293
    catch
294
        error:_Reason -> {error, bad_access_rules}
1✔
295
    end;
296
set_state_option(_, State) ->
297
    State.
11✔
298

299
set_state_options(Options, State) ->
300
    lists:foldl(fun
12✔
301
        (Option, St = #state{}) -> set_state_option(Option, St);
15✔
NEW
302
        (_, Error) -> Error
×
303
    end, State, Options).
304

305
raw({allow, CIDR = {_Start, _End, _Len}}) ->
306
     {allow, esockd_cidr:to_string(CIDR)};
18✔
307
raw({deny, CIDR = {_Start, _End, _Len}}) ->
308
     {deny, esockd_cidr:to_string(CIDR)};
4✔
309
raw(Rule) ->
310
     Rule.
7✔
311

312
connection_crashed(_Pid, normal, _State) ->
313
    ok;
5✔
314
connection_crashed(_Pid, shutdown, _State) ->
315
    ok;
×
316
connection_crashed(_Pid, killed, _State) ->
317
    ok;
×
318
connection_crashed(_Pid, Reason, _State) when is_atom(Reason) ->
319
    count_shutdown(Reason);
×
320
connection_crashed(_Pid, {shutdown, Reason}, _State) when is_atom(Reason) ->
321
    count_shutdown(Reason);
18✔
322
connection_crashed(Pid, {shutdown, {ssl_error, Reason}}, State) ->
323
    count_shutdown(ssl_error),
1✔
324
    log(info, ssl_error, Reason, Pid, State);
1✔
325
connection_crashed(Pid, {shutdown, #{shutdown_count := Key} = Reason}, State) when is_atom(Key) ->
326
    count_shutdown(Key),
1✔
327
    log(info, Key, maps:without([shutdown_count], Reason), Pid, State);
1✔
328
connection_crashed(Pid, {shutdown, Reason}, State) ->
329
    %% unidentified shutdown, cannot keep a counter of it,
330
    %% ideally we should try to add a 'shutdown_count' filed to the reason.
331
    log(error, connection_shutdown, Reason, Pid, State);
×
332
connection_crashed(Pid, Reason, State) ->
333
    %% unexpected crash, probably deserve a fix
334
    log(error, connection_crashed, Reason, Pid, State).
1✔
335

336
count_shutdown(Reason) ->
337
    Key = {shutdown_count, Reason},
20✔
338
    put(Key, case get(Key) of undefined -> 1; Cnt -> Cnt+1 end).
20✔
339

340
terminate_children(State = #state{curr_connections = Conns, shutdown = Shutdown}) ->
341
    {Pids, EStack0} = monitor_children(Conns),
62✔
342
    Sz = sets:size(Pids),
62✔
343
    EStack = case Shutdown of
62✔
344
                 brutal_kill ->
345
                     sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
62✔
346
                     wait_children(Shutdown, Pids, Sz, undefined, EStack0);
62✔
347
                 infinity ->
348
                     sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
×
349
                     wait_children(Shutdown, Pids, Sz, undefined, EStack0);
×
350
                 Time when is_integer(Time) ->
351
                     sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
×
352
                     TRef = erlang:start_timer(Time, self(), kill),
×
353
                     wait_children(Shutdown, Pids, Sz, TRef, EStack0)
×
354
             end,
355
    %% Unroll stacked errors and report them
356
    dict:fold(fun(Reason, Pid, _) ->
62✔
357
                  log(error, connection_shutdown_error, Reason, Pid, State)
×
358
              end, ok, EStack).
359

360
monitor_children(Conns) ->
361
    lists:foldl(fun(P, {Pids, EStack}) ->
62✔
362
        case monitor_child(P) of
27✔
363
            ok ->
364
                {sets:add_element(P, Pids), EStack};
27✔
365
            {error, normal} ->
366
                {Pids, EStack};
×
367
            {error, Reason} ->
368
                {Pids, dict:append(Reason, P, EStack)}
×
369
        end
370
    end, {sets:new(), dict:new()}, maps:keys(Conns)).
371

372
%% Help function to shutdown/2 switches from link to monitor approach
373
monitor_child(Pid) ->
374
    %% Do the monitor operation first so that if the child dies
375
    %% before the monitoring is done causing a 'DOWN'-message with
376
    %% reason noproc, we will get the real reason in the 'EXIT'-message
377
    %% unless a naughty child has already done unlink...
378
    erlang:monitor(process, Pid),
27✔
379
    unlink(Pid),
27✔
380

381
    receive
27✔
382
        %% If the child dies before the unlik we must empty
383
        %% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
384
        {'EXIT', Pid, Reason} ->
385
            receive
×
386
                {'DOWN', _, process, Pid, _} ->
387
                    {error, Reason}
×
388
            end
389
    after 0 ->
390
            %% If a naughty child did unlink and the child dies before
391
            %% monitor the result will be that shutdown/2 receives a
392
            %% 'DOWN'-message with reason noproc.
393
            %% If the child should die after the unlink there
394
            %% will be a 'DOWN'-message with a correct reason
395
            %% that will be handled in shutdown/2.
396
            ok
27✔
397
    end.
398

399
wait_children(_Shutdown, _Pids, 0, undefined, EStack) ->
400
    EStack;
62✔
401
wait_children(_Shutdown, _Pids, 0, TRef, EStack) ->
402
        %% If the timer has expired before its cancellation, we must empty the
403
        %% mail-box of the 'timeout'-message.
404
    _ = erlang:cancel_timer(TRef),
×
405
    receive
×
406
        {timeout, TRef, kill} ->
407
            EStack
×
408
    after 0 ->
409
            EStack
×
410
    end;
411

412
%%TODO: Copied from supervisor.erl, rewrite it later.
413
wait_children(brutal_kill, Pids, Sz, TRef, EStack) ->
414
    receive
27✔
415
        {'DOWN', _MRef, process, Pid, killed} ->
416
            wait_children(brutal_kill, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
27✔
417

418
        {'DOWN', _MRef, process, Pid, Reason} ->
419
            wait_children(brutal_kill, sets:del_element(Pid, Pids),
×
420
                          Sz-1, TRef, dict:append(Reason, Pid, EStack))
421
    end;
422

423
wait_children(Shutdown, Pids, Sz, TRef, EStack) ->
424
    receive
×
425
        {'DOWN', _MRef, process, Pid, shutdown} ->
426
            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
×
427
        {'DOWN', _MRef, process, Pid, normal} ->
428
            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1, TRef, EStack);
×
429
        {'DOWN', _MRef, process, Pid, Reason} ->
430
            wait_children(Shutdown, sets:del_element(Pid, Pids), Sz-1,
×
431
                          TRef, dict:append(Reason, Pid, EStack));
432
        {timeout, TRef, kill} ->
433
            sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
×
434
            wait_children(Shutdown, Pids, Sz-1, undefined, EStack)
×
435
    end.
436

437
log(Level, Error, Reason, Pid, #state{mfargs = MFA}) ->
438
    ErrorMsg = [{supervisor, {?MODULE, Pid}},
3✔
439
                {errorContext, Error},
440
                {reason, Reason},
441
                {offender, [{pid, Pid},
442
                            {name, connection},
443
                            {mfargs, MFA}]}],
444
    case Level of
3✔
445
        info ->
446
            error_logger:info_report(supervisor_report, ErrorMsg);
2✔
447
        error ->
448
            error_logger:error_report(supervisor_report, ErrorMsg)
1✔
449
    end.
450

451
get_module({M, _F, _A}) -> M;
×
452
get_module({M, _F}) -> M;
×
453
get_module(M) -> M.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc