• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 349

28 Nov 2023 12:32PM UTC coverage: 70.65% (-0.1%) from 70.781%
349

push

github

web-flow
Merge pull request #182 from emqx/dev/william/improve-robustness

improve robustness

9 of 13 new or added lines in 4 files covered. (69.23%)

3 existing lines in 2 files now uncovered.

739 of 1046 relevant lines covered (70.65%)

58.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.85
/src/esockd_acceptor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_acceptor).
18

19
-behaviour(gen_statem).
20

21
-include("esockd.hrl").
22

23
-export([
24
    start_link/7,
25
    set_conn_limiter/2
26
]).
27

28
%% state callbacks
29
-export([handle_event/4]).
30

31
%% The state diagram:
32
%%
33
%%         +----------------------------------------------+
34
%%         |                                              |
35
%%   +-----v----+       +-----------------+        +------+------+
36
%%   | waiting  +------>+  token request  +------->+  accepting  |
37
%%   +-+----^---+       +----+------^-----+        +-------------+
38
%%     |    |                |      |
39
%%     |    +-------------+  |      |
40
%%     |                  |  |      |
41
%%     |                +-+--V------+-----+
42
%%     +--------------->+    suspending   |
43
%%                      +-----------------+
44
%%
45

46
%% gen_statem Callbacks
47
-export([
48
    init/1,
49
    callback_mode/0,
50
    terminate/3,
51
    code_change/4
52
]).
53

54
-record(state, {
55
    proto :: atom(),
56
    listen_on :: esockd:listen_on(),
57
    lsock :: inet:socket(),
58
    sockmod :: module(),
59
    sockname :: {inet:ip_address(), inet:port_number()},
60
    tune_fun :: esockd:sock_fun(),
61
    upgrade_funs :: [esockd:sock_fun()],
62
    conn_limiter :: undefined | esockd_generic_limiter:limiter(),
63
    conn_sup :: pid(),
64
    accept_ref :: term()
65
}).
66

67
%% @doc Start an acceptor
68
-spec start_link(
69
    atom(),
70
    esockd:listen_on(),
71
    pid(),
72
    esockd:sock_fun(),
73
    [esockd:sock_fun()],
74
    esockd_generic_limiter:limiter(),
75
    inet:socket()
76
) ->
77
    {ok, pid()} | {error, term()}.
78
start_link(
79
    Proto,
80
    ListenOn,
81
    ConnSup,
82
    TuneFun,
83
    UpgradeFuns,
84
    Limiter,
85
    LSock
86
) ->
87
    gen_statem:start_link(
660✔
88
        ?MODULE,
89
        [
90
            Proto,
91
            ListenOn,
92
            ConnSup,
93
            TuneFun,
94
            UpgradeFuns,
95
            Limiter,
96
            LSock
97
        ],
98
        []
99
    ).
100

101
-spec set_conn_limiter(pid(), esockd_generic_limiter:limiter()) -> ok.
102
set_conn_limiter(Acceptor, Limiter) ->
103
    gen_statem:call(Acceptor, {set_conn_limiter, Limiter}, 5000).
16✔
104

105
%%--------------------------------------------------------------------
106
%% gen_server callbacks
107
%%--------------------------------------------------------------------
108
callback_mode() -> handle_event_function.
660✔
109

110
init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
111
    _ = rand:seed(exsplus, erlang:timestamp()),
660✔
112
    {ok, Sockname} = inet:sockname(LSock),
660✔
113
    {ok, SockMod} = inet_db:lookup_socket(LSock),
660✔
114
    {ok, waiting,
660✔
115
        #state{
116
            proto = Proto,
117
            listen_on = ListenOn,
118
            lsock = LSock,
119
            sockmod = SockMod,
120
            sockname = Sockname,
121
            tune_fun = TuneFun,
122
            upgrade_funs = UpgradeFuns,
123
            conn_limiter = Limiter,
124
            conn_sup = ConnSup
125
        },
126
        {next_event, internal, begin_waiting}}.
127

128
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) ->
129
    case prim_inet:async_accept(LSock, -1) of
699✔
130
        {ok, Ref} ->
131
            {keep_state, State#state{accept_ref = Ref}};
699✔
132
        {error, Reason} when
133
            Reason =:= emfile;
134
            Reason =:= enfile
135
        ->
136
            {next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
×
137
        {error, econnaborted} ->
NEW
138
            {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
139
        {error, closed} ->
140
            {stop, normal, State};
×
141
        {error, Reason} ->
NEW
142
            error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]),
×
UNCOV
143
            {stop, Reason, State}
×
144
    end;
145
handle_event(
146
    info,
147
    {inet_async, LSock, Ref, {ok, Sock}},
148
    waiting,
149
    State = #state{lsock = LSock, accept_ref = Ref}
150
) ->
151
    {next_state, token_request, State, {next_event, internal, {token_request, Sock}}};
39✔
152
handle_event(
153
    internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter}
154
) ->
155
    case esockd_generic_limiter:consume(1, Limiter) of
39✔
156
        {ok, Limiter2} ->
157
            {next_state, accepting, State#state{conn_limiter = Limiter2},
39✔
158
                {next_event, internal, {accept, Sock}}};
159
        {pause, PauseTime, Limiter2} ->
160
            {next_state, suspending, State#state{conn_limiter = Limiter2},
×
161
                {state_timeout, PauseTime, Content}}
162
    end;
163
handle_event(
164
    internal,
165
    {accept, Sock},
166
    accepting,
167
    State = #state{
168
        proto = Proto,
169
        listen_on = ListenOn,
170
        sockmod = SockMod,
171
        tune_fun = TuneFun,
172
        upgrade_funs = UpgradeFuns,
173
        conn_sup = ConnSup
174
    }
175
) ->
176
    %% make it look like gen_tcp:accept
177
    inet_db:register_socket(Sock, SockMod),
39✔
178

179
    %% Inc accepted stats.
180
    esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),
39✔
181

182
    case eval_tune_socket_fun(TuneFun, Sock) of
39✔
183
        {ok, Sock} ->
184
            case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
37✔
185
                {ok, _Pid} ->
186
                    ok;
37✔
187
                {error, Reason} ->
188
                    handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State),
×
189
                    close(Sock)
×
190
            end;
191
        {error, Reason} ->
192
            handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State),
2✔
193
            close(Sock)
2✔
194
    end,
195
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
39✔
196
handle_event(state_timeout, {token_request, _} = Content, suspending, State) ->
197
    {next_state, token_request, State, {next_event, internal, Content}};
×
198
handle_event(state_timeout, begin_waiting, suspending, State) ->
199
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
200
handle_event({call, From}, {set_conn_limiter, Limiter}, _, State) ->
201
    {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
16✔
202
handle_event(
203
    info,
204
    {inet_async, LSock, Ref, {error, Reason}},
205
    _,
206
    State = #state{lsock = LSock, accept_ref = Ref}
207
) ->
208
    handle_socket_error(Reason, State);
660✔
209
handle_event(Type, Content, StateName, _) ->
210
    error_logger:warning_msg(
×
211
        "Unhandled message, State:~p, Type:~p Content:~p",
212
        [StateName, Type, Content]
213
    ),
214
    keep_state_and_data.
×
215

216
terminate(normal, _StateName, #state{}) ->
217
    ok;
660✔
218
terminate(Reason, _StateName, #state{}) ->
NEW
219
    error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
×
NEW
220
    ok.
×
221

222
code_change(_OldVsn, StateName, State, _Extra) ->
223
    {ok, StateName, State}.
×
224

225
%%--------------------------------------------------------------------
226
%% Internal funcs
227
%%--------------------------------------------------------------------
228

229
close(Sock) -> catch port_close(Sock).
2✔
230

231
eval_tune_socket_fun({Fun, Args1}, Sock) ->
232
    apply(Fun, [Sock | Args1]).
39✔
233

234
handle_accept_error(enotconn, _, _) ->
235
    ok;
×
236
handle_accept_error(einval, _, _) ->
237
    ok;
×
238
handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) ->
239
    esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
2✔
240
    ok;
2✔
241
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
242
    error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]).
×
243

244
handle_socket_error(closed, State) ->
245
    {stop, normal, State};
660✔
246
%% {error, econnaborted} -> accept
247
%% {error, esslaccept}   -> accept
248
%% {error, esslaccept}   -> accept
249
handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept ->
250
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
251
%% emfile: The per-process limit of open file descriptors has been reached.
252
%% enfile: The system limit on the total number of open files has been reached.
253
%% enfile: The system limit on the total number of open files has been reached.
254
handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile ->
255
    error_logger:error_msg(
×
256
        "Accept error on ~s: ~s",
257
        [esockd:format(State#state.sockname), explain_posix(Reason)]
258
    ),
259
    {next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
×
260
handle_socket_error(Reason, State) ->
261
    {stop, Reason, State}.
×
262

263
explain_posix(emfile) ->
264
    "EMFILE (Too many open files)";
×
265
explain_posix(enfile) ->
266
    "ENFILE (File table overflow)".
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc