• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 456

09 Nov 2024 11:07AM UTC coverage: 66.039%. First build
456

Pull #196

github

zmstone
feat: enhance connection rate limit

Prior to this change, if connection rate is limited, the acceptors
will enter suspending state and stop accepting the sockets
leaving the sockets in the system backlog.

If the acceptor backlog (default=1024) is filled up, for long enough
time to cause the majority of the clients to have closed socket from
their end and try to reconnect aggressively, the acceptor may never
be able to get a normal socket again.

The fix is: in suspending state, accept the sockets and immediately
cose them to free up the backlog.
The close triggers TCP-RST to cut the TCP graceful close overheads.
Pull Request #196: feat: enhance connection rate limit

4 of 29 new or added lines in 1 file covered. (13.79%)

877 of 1328 relevant lines covered (66.04%)

55.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.58
/src/esockd_acceptor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_acceptor).
18

19
-behaviour(gen_statem).
20

21
-include("esockd.hrl").
22

23
-export([
24
    start_link/7
25
]).
26

27
%% state callbacks
28
-export([handle_event/4]).
29

30
%% The state diagram:
31
%%
32
%%         +----------------------------------------------+
33
%%         |                                              |
34
%%   +-----v----+       +-----------------+        +------+------+
35
%%   | waiting  +------>+  token request  +------->+  accepting  |
36
%%   +-+----^---+       +----+------^-----+        +-------------+
37
%%     |    |                |      |
38
%%     |    +-------------+  |      |
39
%%     |                  |  |      |
40
%%     |                +-+--V------+-----+
41
%%     +--------------->+    suspending   |
42
%%                      +-----------------+
43
%%
44

45
%% gen_statem Callbacks
46
-export([
47
    init/1,
48
    callback_mode/0,
49
    terminate/3,
50
    code_change/4
51
]).
52

53
-record(state, {
54
    proto :: atom(),
55
    listen_on :: esockd:listen_on(),
56
    lsock :: inet:socket(),
57
    sockmod :: module(),
58
    sockname :: {inet:ip_address(), inet:port_number()},
59
    tune_fun :: esockd:sock_fun(),
60
    upgrade_funs :: [esockd:sock_fun()],
61
    conn_limiter :: undefined | esockd_generic_limiter:limiter(),
62
    conn_sup :: pid(),
63
    accept_ref = no_ref :: term()
64
}).
65

66
%% @doc Start an acceptor
67
-spec start_link(
68
    atom(),
69
    esockd:listen_on(),
70
    pid(),
71
    esockd:sock_fun(),
72
    [esockd:sock_fun()],
73
    esockd_generic_limiter:limiter(),
74
    inet:socket()
75
) ->
76
    {ok, pid()} | {error, term()}.
77
start_link(
78
    Proto,
79
    ListenOn,
80
    ConnSup,
81
    TuneFun,
82
    UpgradeFuns,
83
    Limiter,
84
    LSock
85
) ->
86
    gen_statem:start_link(
760✔
87
        ?MODULE,
88
        [
89
            Proto,
90
            ListenOn,
91
            ConnSup,
92
            TuneFun,
93
            UpgradeFuns,
94
            Limiter,
95
            LSock
96
        ],
97
        []
98
    ).
99

100
%%--------------------------------------------------------------------
101
%% gen_server callbacks
102
%%--------------------------------------------------------------------
103
callback_mode() -> handle_event_function.
760✔
104

105
init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
106
    _ = erlang:process_flag(trap_exit, true),
760✔
107
    _ = rand:seed(exsplus, erlang:timestamp()),
760✔
108
    {ok, Sockname} = inet:sockname(LSock),
760✔
109
    {ok, SockMod} = inet_db:lookup_socket(LSock),
760✔
110
    {ok, waiting,
760✔
111
        #state{
112
            proto = Proto,
113
            listen_on = ListenOn,
114
            lsock = LSock,
115
            sockmod = SockMod,
116
            sockname = Sockname,
117
            tune_fun = TuneFun,
118
            upgrade_funs = UpgradeFuns,
119
            conn_limiter = Limiter,
120
            conn_sup = ConnSup
121
        },
122
        {next_event, internal, begin_waiting}}.
123

124
handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref ->
125
    %% already waiting
NEW
126
    keep_state_and_data;
×
127
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) ->
128
    case prim_inet:async_accept(LSock, -1) of
808✔
129
        {ok, Ref} ->
130
            {keep_state, State#state{accept_ref = Ref}};
808✔
131
        {error, Reason} when
132
            Reason =:= emfile;
133
            Reason =:= enfile
134
        ->
NEW
135
            start_suspending(State, 1000);
×
136
        {error, econnaborted} ->
137
            {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
138
        {error, closed} ->
139
            {stop, normal, State};
×
140
        {error, Reason} ->
NEW
141
            logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]),
×
NEW
142
            {stop, Reason, State}
×
143
    end;
144
handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) ->
NEW
145
    case prim_inet:async_accept(LSock, -1) of
×
146
        {ok, Ref} ->
NEW
147
            {keep_state, State#state{accept_ref = Ref}};
×
148
        {error, Reason} when
149
            Reason =:= emfile;
150
            Reason =:= enfile
151
        ->
NEW
152
            {keep_state_and_data, {next_event, internal, accept_and_close}};
×
153
        {error, econnaborted} ->
NEW
154
            {keep_state_and_data, {next_event, internal, accept_and_close}};
×
155
        {error, closed} ->
NEW
156
            {stop, normal, State};
×
157
        {error, Reason} ->
NEW
158
            logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]),
×
159
            {stop, Reason, State}
×
160
    end;
161
handle_event(
162
    info,
163
    {inet_async, LSock, Ref, {ok, Sock}},
164
    waiting,
165
    State = #state{lsock = LSock, accept_ref = Ref}
166
) ->
167
    NextEvent = {next_event, internal, {token_request, Sock}},
48✔
168
    {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent};
48✔
169
handle_event(
170
    info,
171
    {inet_async, LSock, Ref, {ok, Sock}},
172
    suspending,
173
    State = #state{lsock = LSock, accept_ref = Ref}
174
) ->
NEW
175
    _ = close(Sock),
×
NEW
176
    NextEvent = {next_event, internal, accept_and_close},
×
NEW
177
    {keep_state, State#state{accept_ref = no_ref}, NextEvent};
×
178
handle_event(
179
    internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter}
180
) ->
181
    case esockd_generic_limiter:consume(1, Limiter) of
48✔
182
        {ok, Limiter2} ->
183
            {next_state, accepting, State#state{conn_limiter = Limiter2},
48✔
184
                {next_event, internal, {accept, Sock}}};
185
        {pause, PauseTime, Limiter2} ->
NEW
186
            _ = close(Sock),
×
NEW
187
            start_suspending(State#state{conn_limiter = Limiter2}, PauseTime)
×
188
    end;
189
handle_event(
190
    internal,
191
    {accept, Sock},
192
    accepting,
193
    State = #state{
194
        proto = Proto,
195
        listen_on = ListenOn,
196
        sockmod = SockMod,
197
        tune_fun = TuneFun,
198
        upgrade_funs = UpgradeFuns,
199
        conn_sup = ConnSup
200
    }
201
) ->
202
    %% make it look like gen_tcp:accept
203
    inet_db:register_socket(Sock, SockMod),
48✔
204

205
    %% Inc accepted stats.
206
    esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),
48✔
207

208
    case eval_tune_socket_fun(TuneFun, Sock) of
48✔
209
        {ok, Sock} ->
210
            case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
46✔
211
                {ok, _Pid} ->
212
                    ok;
45✔
213
                {error, Reason} ->
214
                    handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State),
1✔
215
                    close(Sock)
1✔
216
            end;
217
        {error, Reason} ->
218
            handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State),
2✔
219
            close(Sock)
2✔
220
    end,
221
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
48✔
222
handle_event(state_timeout, begin_waiting, suspending, State) ->
223
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
224
handle_event(
225
    info,
226
    {inet_async, LSock, Ref, {error, Reason}},
227
    StateName,
228
    State = #state{lsock = LSock, accept_ref = Ref}
229
) ->
230
    handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName);
32✔
231
handle_event(Type, Content, StateName, _) ->
NEW
232
    logger:log(warning,
×
233
        "Unhandled message, State:~p, Type:~p Content:~p",
234
        [StateName, Type, Content]
235
    ),
236
    keep_state_and_data.
×
237

238
terminate(normal, _StateName, #state{}) ->
239
    ok;
32✔
240
terminate(shutdown, _StateName, #state{}) ->
241
    ok;
728✔
242
terminate(Reason, _StateName, #state{}) ->
NEW
243
    logger:log(error, "~p terminating due to ~p", [?MODULE, Reason]),
×
244
    ok.
×
245

246
code_change(_OldVsn, StateName, State, _Extra) ->
247
    {ok, StateName, State}.
×
248

249
%%--------------------------------------------------------------------
250
%% Internal funcs
251
%%--------------------------------------------------------------------
252

253
close(Sock) ->
254
    try
3✔
255
        %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads
256
        true = port_close(Sock),
3✔
257
        receive {'EXIT', Sock, _} -> ok after 1 -> ok end
3✔
258
    catch
259
        error:_ -> ok
×
260
    end.
261

262
eval_tune_socket_fun({Fun, Args1}, Sock) ->
263
    apply(Fun, [Sock | Args1]).
48✔
264

265
handle_accept_error(enotconn, _, _) ->
266
    ok;
×
267
handle_accept_error(einval, _, _) ->
268
    ok;
×
269
handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) ->
270
    esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
2✔
271
    ok;
2✔
272
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
273
    logger:log(error, Msg, [esockd:format(Sockname), Reason]).
1✔
274

275
handle_socket_error(closed, State, _StateName) ->
276
    {stop, normal, State};
32✔
277
%% {error, econnaborted} -> accept
278
%% {error, esslaccept}   -> accept
279
%% {error, esslaccept}   -> accept
280
handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept ->
NEW
281
    {keep_state, State, {next_event, internal, accept_and_close}};
×
282
handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept ->
283
    {next_state, waiting, State, {next_event, internal, begin_waiting}};
×
284
%% emfile: The per-process limit of open file descriptors has been reached.
285
%% enfile: The system limit on the total number of open files has been reached.
286
%% enfile: The system limit on the total number of open files has been reached.
287
handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile ->
NEW
288
    log_system_limit(State, Reason),
×
NEW
289
    {keep_state, State, {next_event, internal, accept_and_close}};
×
290
handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile ->
NEW
291
    log_system_limit(State, Reason),
×
NEW
292
    start_suspending(State, 1000);
×
293
handle_socket_error(Reason, State, _StateName) ->
294
    {stop, Reason, State}.
×
295

296
explain_posix(emfile) ->
297
    "EMFILE (Too many open files)";
×
298
explain_posix(enfile) ->
299
    "ENFILE (File table overflow)".
×
300

301
log_system_limit(State, Reason) ->
NEW
302
    logger:log(error, #{msg => cannot_accept_more_connections,
×
303
                        acceptor => esockd:format(State#state.sockname),
304
                        cause => explain_posix(Reason)}).
305

306
start_suspending(State, Timeout) ->
NEW
307
    Actions = [{next_event, internal, accept_and_close},
×
308
               {state_timeout, Timeout, begin_waiting}],
NEW
309
    {next_state, suspending, State, Actions}.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc