• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 527

16 Sep 2025 07:26AM UTC coverage: 67.052% (+1.0%) from 66.039%
527

push

github

web-flow
Merge pull request #211 from JimMoen/fix-rate-limit-pause

fix: the next check start time should be `Now + Pasue`

2 of 2 new or added lines in 1 file covered. (100.0%)

228 existing lines in 13 files now uncovered.

696 of 1038 relevant lines covered (67.05%)

106.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/src/esockd_acceptor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_acceptor).
18

19
-behaviour(gen_statem).
20

21
-include("esockd.hrl").
22

23
-export([ start_link/7
24
        , set_conn_limiter/2
25
        ]).
26

27
%% state callbacks
28
-export([ accepting/3
29
        , suspending/3
30
        ]).
31

32
%% gen_statem Callbacks
33
-export([ init/1
34
        , callback_mode/0
35
        , terminate/3
36
        , code_change/4
37
        ]).
38

39
-record(state, {
40
          proto        :: atom(),
41
          listen_on    :: esockd:listen_on(),
42
          lsock        :: inet:socket(),
43
          sockmod      :: module(),
44
          sockname     :: {inet:ip_address(), inet:port_number()},
45
          tune_fun     :: esockd:sock_fun(),
46
          upgrade_funs :: [esockd:sock_fun()],
47
          conn_limiter :: undefined | esockd_limiter:bucket_name(),
48
          conn_sup     :: pid(),
49
          accept_ref   :: term()
50
        }).
51

52
%% @doc Start an acceptor
53
-spec(start_link(atom(), esockd:listen_on(), pid(),
54
                 esockd:sock_fun(), [esockd:sock_fun()],
55
                 esockd_limiter:bucket_name(), inet:socket())
56
      -> {ok, pid()} | {error, term()}).
57
start_link(Proto, ListenOn, ConnSup,
58
           TuneFun, UpgradeFuns, Limiter, LSock) ->
59
    gen_statem:start_link(?MODULE, [Proto, ListenOn, ConnSup,
1,260✔
60
                                    TuneFun, UpgradeFuns, Limiter, LSock], []).
61

62
-spec(set_conn_limiter(pid(), esockd_limiter:bucket_name()) -> ok).
63
set_conn_limiter(Acceptor, Limiter) ->
64
    gen_statem:call(Acceptor, {set_conn_limiter, Limiter}, 5000).
48✔
65

66
%%--------------------------------------------------------------------
67
%% gen_server callbacks
68
%%--------------------------------------------------------------------
69

70
init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
71
    _ = rand:seed(exsplus, erlang:timestamp()),
1,260✔
72
    {ok, Sockname} = inet:sockname(LSock),
1,260✔
73
    {ok, SockMod} = inet_db:lookup_socket(LSock),
1,260✔
74
    {ok, accepting, #state{proto         = Proto,
1,260✔
75
                           listen_on     = ListenOn,
76
                           lsock         = LSock,
77
                           sockmod       = SockMod,
78
                           sockname      = Sockname,
79
                           tune_fun      = TuneFun,
80
                           upgrade_funs  = UpgradeFuns,
81
                           conn_limiter  = Limiter,
82
                           conn_sup      = ConnSup},
83
     {next_event, internal, accept}}.
84

85
callback_mode() -> state_functions.
1,260✔
86

87
accepting(internal, accept, State = #state{lsock = LSock}) ->
88
    case prim_inet:async_accept(LSock, -1) of
1,335✔
89
        {ok, Ref} ->
90
            {keep_state, State#state{accept_ref = Ref}};
1,335✔
91
        {error, Reason} when Reason =:= emfile;
92
                             Reason =:= enfile ->
UNCOV
93
            {next_state, suspending, State, 1000};
×
94
        {error, closed} ->
UNCOV
95
            {stop, normal, State};
×
96
        {error, Reason} ->
UNCOV
97
            {stop, Reason, State}
×
98
    end;
99

100
accepting({call, From}, {set_conn_limiter, Limiter}, State) ->
101
    {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
48✔
102

103
accepting(info, {inet_async, LSock, Ref, {ok, Sock}},
104
          State = #state{proto        = Proto,
105
                         listen_on    = ListenOn,
106
                         lsock        = LSock,
107
                         sockmod      = SockMod,
108
                         sockname     = Sockname,
109
                         tune_fun     = TuneFun,
110
                         upgrade_funs = UpgradeFuns,
111
                         conn_sup     = ConnSup,
112
                         accept_ref   = Ref}) ->
113
    %% make it look like gen_tcp:accept
114
    inet_db:register_socket(Sock, SockMod),
75✔
115

116
    %% Inc accepted stats.
117
    esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),
75✔
118

119
    case eval_tune_socket_fun(TuneFun, Sock) of
75✔
120
        {ok, Sock} ->
121
            case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
75✔
122
                {ok, _Pid} -> ok;
75✔
123
                {error, enotconn} ->
UNCOV
124
                    close(Sock); %% quiet...issue #10
×
125
                {error, einval} ->
126
                    close(Sock); %% quiet... haproxy check
×
127
                {error, Reason} ->
UNCOV
128
                    error_logger:error_msg("Failed to start connection on ~s: ~p",
×
129
                                           [esockd:format(Sockname), Reason]),
UNCOV
130
                    close(Sock)
×
131
                end;
132
        {error, enotconn} ->
UNCOV
133
            close(Sock);
×
134
        {error, einval} ->
135
            close(Sock);
×
136
        {error, closed} ->
137
            close(Sock);
×
138
        {error, Reason} ->
139
            error_logger:error_msg("Tune buffer failed on ~s: ~s",
×
140
                                   [esockd:format(Sockname), Reason]),
141
            close(Sock)
×
142
    end,
143
    rate_limit(State);
75✔
144

145
accepting(info, {inet_async, LSock, Ref, {error, closed}},
146
          State = #state{lsock = LSock, accept_ref = Ref}) ->
147
    {stop, normal, State};
1,260✔
148

149
%% {error, econnaborted} -> accept
150
%% {error, esslaccept}   -> accept
151
accepting(info, {inet_async, LSock, Ref, {error, Reason}},
152
          #state{lsock = LSock, accept_ref = Ref})
153
    when Reason =:= econnaborted; Reason =:= esslaccept ->
154
    {keep_state_and_data, {next_event, internal, accept}};
×
155

156
%% emfile: The per-process limit of open file descriptors has been reached.
157
%% enfile: The system limit on the total number of open files has been reached.
158
accepting(info, {inet_async, LSock, Ref, {error, Reason}},
159
          State = #state{lsock = LSock, sockname = Sockname, accept_ref = Ref})
160
    when Reason =:= emfile; Reason =:= enfile ->
UNCOV
161
    error_logger:error_msg("Accept error on ~s: ~s",
×
162
                           [esockd:format(Sockname), esockd_utils:explain_posix(Reason)]),
UNCOV
163
    {next_state, suspending, State, 1000};
×
164

165
accepting(info, {inet_async, LSock, Ref, {error, Reason}},
166
          State = #state{lsock = LSock, accept_ref = Ref}) ->
UNCOV
167
    {stop, Reason, State}.
×
168

169
suspending({call, From}, {set_conn_limiter, Limiter}, State) ->
UNCOV
170
    {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
×
171

172
suspending(timeout, _Timeout, State) ->
UNCOV
173
    {next_state, accepting, State, {next_event, internal, accept}}.
×
174

175
terminate(_Reason, _StateName, _State) ->
176
    ok.
1,260✔
177

178
code_change(_OldVsn, StateName, State, _Extra) ->
UNCOV
179
    {ok, StateName, State}.
×
180

181
%%--------------------------------------------------------------------
182
%% Internal funcs
183
%%--------------------------------------------------------------------
184

UNCOV
185
close(Sock) -> catch port_close(Sock).
×
186

187
rate_limit(State = #state{conn_limiter = Limiter}) ->
188
    case esockd_limiter:consume(Limiter, 1) of
75✔
189
        {I, Pause} when I =< 0 ->
UNCOV
190
            {next_state, suspending, State, Pause};
×
191
        _ ->
192
            {keep_state, State, {next_event, internal, accept}}
75✔
193
    end.
194

195
eval_tune_socket_fun({Fun, Args1}, Sock) ->
196
    apply(Fun, [Sock|Args1]).
75✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc