• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 339

21 Nov 2023 08:05AM UTC coverage: 70.881%. First build
339

Pull #182

github

qzhuyan
ci: coverall only when success
Pull Request #182: improve robustness

5 of 8 new or added lines in 2 files covered. (62.5%)

740 of 1044 relevant lines covered (70.88%)

176.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.9
/src/esockd_acceptor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_acceptor).
18

19
-behaviour(gen_statem).
20

21
-include("esockd.hrl").
22

23
-export([
24
    start_link/7,
25
    set_conn_limiter/2
26
]).
27

28
%% state callbacks
29
-export([handle_event/4]).
30

31
%% The state diagram:
32
%%
33
%%         +----------------------------------------------+
34
%%         |                                              |
35
%%   +-----v----+       +-----------------+        +------+------+
36
%%   | waiting  +------>+  token request  +------->+  accepting  |
37
%%   +-+----^---+       +----+------^-----+        +-------------+
38
%%     |    |                |      |
39
%%     |    +-------------+  |      |
40
%%     |                  |  |      |
41
%%     |                +-+--V------+-----+
42
%%     +--------------->+    suspending   |
43
%%                      +-----------------+
44
%%
45

46
%% gen_statem Callbacks
47
-export([
48
    init/1,
49
    callback_mode/0,
50
    terminate/3,
51
    code_change/4
52
]).
53

54
-record(state, {
55
    proto :: atom(),
56
    listen_on :: esockd:listen_on(),
57
    lsock :: inet:socket(),
58
    sockmod :: module(),
59
    sockname :: {inet:ip_address(), inet:port_number()},
60
    tune_fun :: esockd:sock_fun(),
61
    upgrade_funs :: [esockd:sock_fun()],
62
    conn_limiter :: undefined | esockd_generic_limiter:limiter(),
63
    conn_sup :: pid(),
64
    accept_ref :: term()
65
}).
66

67
%% @doc Start an acceptor
68
-spec start_link(
69
    atom(),
70
    esockd:listen_on(),
71
    pid(),
72
    esockd:sock_fun(),
73
    [esockd:sock_fun()],
74
    esockd_generic_limiter:limiter(),
75
    inet:socket()
76
) ->
77
    {ok, pid()} | {error, term()}.
78
start_link(
79
    Proto,
80
    ListenOn,
81
    ConnSup,
82
    TuneFun,
83
    UpgradeFuns,
84
    Limiter,
85
    LSock
86
) ->
87
    gen_statem:start_link(
1,980✔
88
        ?MODULE,
89
        [
90
            Proto,
91
            ListenOn,
92
            ConnSup,
93
            TuneFun,
94
            UpgradeFuns,
95
            Limiter,
96
            LSock
97
        ],
98
        []
99
    ).
100

101
-spec set_conn_limiter(pid(), esockd_generic_limiter:limiter()) -> ok.
102
set_conn_limiter(Acceptor, Limiter) ->
103
    gen_statem:call(Acceptor, {set_conn_limiter, Limiter}, 5000).
48✔
104

105
%%--------------------------------------------------------------------
106
%% gen_server callbacks
107
%%--------------------------------------------------------------------
108
callback_mode() -> handle_event_function.
1,980✔
109

110
init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
111
    _ = rand:seed(exsplus, erlang:timestamp()),
1,980✔
112
    {ok, Sockname} = inet:sockname(LSock),
1,980✔
113
    {ok, SockMod} = inet_db:lookup_socket(LSock),
1,980✔
114
    {ok, waiting,
1,980✔
115
        #state{
116
            proto = Proto,
117
            listen_on = ListenOn,
118
            lsock = LSock,
119
            sockmod = SockMod,
120
            sockname = Sockname,
121
            tune_fun = TuneFun,
122
            upgrade_funs = UpgradeFuns,
123
            conn_limiter = Limiter,
124
            conn_sup = ConnSup
125
        },
126
        {next_event, internal, begin_waiting}}.
127

128
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) ->
129
    case prim_inet:async_accept(LSock, -1) of
2,097✔
130
        {ok, Ref} ->
131
            {keep_state, State#state{accept_ref = Ref}};
2,097✔
132
        {error, Reason} when
133
            Reason =:= emfile;
134
            Reason =:= enfile
135
        ->
136
            {next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
×
137
        {error, closed} ->
138
            {stop, normal, State};
×
139
        {error, Reason} ->
NEW
140
            error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]),
×
141
            {stop, Reason, State}
×
142
    end;
143
handle_event(
144
    info,
145
    {inet_async, LSock, Ref, {ok, Sock}},
146
    waiting,
147
    State = #state{lsock = LSock, accept_ref = Ref}
148
) ->
149
    {next_state, token_request, State, {next_event, internal, {token_request, Sock}}};
117✔
150
handle_event(
151
    internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter}
152
) ->
153
    case esockd_generic_limiter:consume(1, Limiter) of
117✔
154
        {ok, Limiter2} ->
155
            {next_state, accepting, State#state{conn_limiter = Limiter2},
117✔
156
                {next_event, internal, {accept, Sock}}};
157
        {pause, PauseTime, Limiter2} ->
158
            {next_state, suspending, State#state{conn_limiter = Limiter2},
×
159
                {state_timeout, PauseTime, Content}}
160
    end;
161
handle_event(
162
    internal,
163
    {accept, Sock},
164
    accepting,
165
    State = #state{
166
        proto = Proto,
167
        listen_on = ListenOn,
168
        sockmod = SockMod,
169
        tune_fun = TuneFun,
170
        upgrade_funs = UpgradeFuns,
171
        conn_sup = ConnSup
172
    }
173
) ->
174
    %% make it look like gen_tcp:accept
175
    inet_db:register_socket(Sock, SockMod),
117✔
176

177
    %% Inc accepted stats.
178
    esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),
117✔
179

180
    case eval_tune_socket_fun(TuneFun, Sock) of
117✔
181
        {ok, Sock} ->
182
            case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
111✔
183
                {ok, _Pid} ->
184
                    ok;
111✔
185
                {error, Reason} ->
186
                    handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State),
×
187
                    close(Sock)
×
188
            end;
189
        {error, Reason} ->
190
            handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State),
6✔
191
            close(Sock)
6✔
192
    end,
193
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
117✔
194
handle_event(state_timeout, {token_request, _} = Content, suspending, State) ->
195
    {next_state, token_request, State, {next_event, internal, Content}};
×
196
handle_event(state_timeout, begin_waiting, suspending, State) ->
197
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
198
handle_event({call, From}, {set_conn_limiter, Limiter}, _, State) ->
199
    {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
48✔
200
handle_event(
201
    info,
202
    {inet_async, LSock, Ref, {error, Reason}},
203
    _,
204
    State = #state{lsock = LSock, accept_ref = Ref}
205
) ->
206
    handle_socket_error(Reason, State);
1,980✔
207
handle_event(Type, Content, StateName, _) ->
208
    error_logger:warning_msg(
×
209
        "Unhandled message, State:~p, Type:~p Content:~p",
210
        [StateName, Type, Content]
211
    ),
212
    keep_state_and_data.
×
213

214
terminate(normal, _StateName, #state{}) ->
215
    ok;
1,980✔
216
terminate(Reason, _StateName, #state{}) ->
NEW
217
    error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
×
NEW
218
    ok.
×
219

220
code_change(_OldVsn, StateName, State, _Extra) ->
221
    {ok, StateName, State}.
×
222

223
%%--------------------------------------------------------------------
224
%% Internal funcs
225
%%--------------------------------------------------------------------
226

227
close(Sock) -> catch port_close(Sock).
6✔
228

229
eval_tune_socket_fun({Fun, Args1}, Sock) ->
230
    apply(Fun, [Sock | Args1]).
117✔
231

232
handle_accept_error(enotconn, _, _) ->
233
    ok;
×
234
handle_accept_error(einval, _, _) ->
235
    ok;
×
236
handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) ->
237
    esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
6✔
238
    ok;
6✔
239
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
240
    error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]).
×
241

242
handle_socket_error(closed, State) ->
243
    {stop, normal, State};
1,980✔
244
%% {error, econnaborted} -> accept
245
%% {error, esslaccept}   -> accept
246
%% {error, esslaccept}   -> accept
247
handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept ->
248
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
249
%% emfile: The per-process limit of open file descriptors has been reached.
250
%% enfile: The system limit on the total number of open files has been reached.
251
%% enfile: The system limit on the total number of open files has been reached.
252
handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile ->
253
    error_logger:error_msg(
×
254
        "Accept error on ~s: ~s",
255
        [esockd:format(State#state.sockname), explain_posix(Reason)]
256
    ),
257
    {next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
×
258
handle_socket_error(Reason, State) ->
259
    {stop, Reason, State}.
×
260

261
explain_posix(emfile) ->
262
    "EMFILE (Too many open files)";
×
263
explain_posix(enfile) ->
264
    "ENFILE (File table overflow)".
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc