• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 527

16 Sep 2025 07:26AM UTC coverage: 67.052% (+1.0%) from 66.039%
527

push

github

web-flow
Merge pull request #211 from JimMoen/fix-rate-limit-pause

fix: the next check start time should be `Now + Pasue`

2 of 2 new or added lines in 1 file covered. (100.0%)

228 existing lines in 13 files now uncovered.

696 of 1038 relevant lines covered (67.05%)

106.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.39
/src/esockd_listener_sup.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_listener_sup).
18

19
-behaviour(supervisor).
20

21
-include("esockd.hrl").
22

23
-export([ start_link/5
24
        , listener/1
25
        , acceptor_sup/1
26
        , connection_sup/1
27
       ]).
28

29
%% get/set
30
-export([ get_options/1
31
        , get_acceptors/1
32
        , get_max_connections/1
33
        , get_max_conn_rate/3
34
        , get_current_connections/1
35
        , get_shutdown_count/1
36
        ]).
37

38
-export([ set_max_connections/2
39
        , set_max_conn_rate/4
40
        ]).
41

42
-export([ get_access_rules/1
43
        , allow/2
44
        , deny/2
45
        ]).
46

47
-export([ conn_rate_limiter/2 ]).
48

49
%% supervisor callbacks
50
-export([init/1]).
51

52
%% callbacks
53
-export([ tune_socket/2
54
        ]).
55

56
-type listen_type() :: tcp | dtls.
57

58
%%--------------------------------------------------------------------
59
%% APIs
60
%%--------------------------------------------------------------------
61

62
%% @doc Start listener supervisor
63
-spec(start_link(listen_type(), atom(), esockd:listen_on(), [esockd:option()], esockd:mfargs())
64
      -> {ok, pid()} | {error, term()}).
65
start_link(Type, Proto, ListenOn, Opts, MFA) ->
66
    {ok, Sup} = supervisor:start_link(?MODULE, []),
120✔
67

68
    %% Start connection sup
69
    ConnSupSpec = #{id => connection_sup,
120✔
70
                    start => {esockd_connection_sup, start_link, [Opts, MFA]},
71
                    restart => transient,
72
                    shutdown => infinity,
73
                    type => supervisor,
74
                    modules => [esockd_connection_sup]},
75
    {ok, ConnSup} = supervisor:start_child(Sup, ConnSupSpec),
120✔
76

77
    %% Start acceptor sup
78
    ok = esockd_server:init_stats({Proto, ListenOn}, accepted),
120✔
79
    TuneFun = tune_socket_fun(Opts),
120✔
80
    UpgradeFuns = upgrade_funs(Type, Opts),
120✔
81
    Limiter = conn_rate_limiter({listener, Proto, ListenOn}, conn_rate_opt(Opts)),
120✔
82

83
    AcceptorSupMod = case Type of
120✔
84
                         dtls -> esockd_dtls_acceptor_sup;
30✔
85
                         _ -> esockd_acceptor_sup
90✔
86
                     end,
87
    AcceptorSupSpec = #{id => acceptor_sup,
120✔
88
                        start => {AcceptorSupMod, start_link,
89
                                  [Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]},
90
                        restart => transient,
91
                        shutdown => infinity,
92
                        type => supervisor,
93
                        modules => [AcceptorSupMod]},
94
    {ok, AcceptorSup} = supervisor:start_child(Sup, AcceptorSupSpec),
120✔
95

96
    %% Start listener
97
    ListenerMod = case Type of
120✔
98
                      dtls -> esockd_dtls_listener;
30✔
99
                      _ -> esockd_listener
90✔
100
                  end,
101
    ListenerSpec = #{id => listener,
120✔
102
                     start => {ListenerMod, start_link,
103
                               [Proto, ListenOn, Opts, AcceptorSup]},
104
                     restart => transient,
105
                     shutdown => 16#ffffffff,
106
                     type => worker,
107
                     modules => [ListenerMod]},
108
    case supervisor:start_child(Sup, ListenerSpec) of
120✔
109
        {ok, _} -> {ok, Sup};
120✔
110
        {error, {Reason, _ChildSpec}} ->
UNCOV
111
            {error, Reason}
×
112
    end.
113

114
%% @doc Get listener.
115
-spec(listener(pid()) -> pid()).
116
listener(Sup) -> child_pid(Sup, listener).
6✔
117

118
%% @doc Get connection supervisor.
119
-spec(connection_sup(pid()) -> pid()).
120
connection_sup(Sup) -> child_pid(Sup, connection_sup).
81✔
121

122
%% @doc Get acceptor supervisor.
123
-spec(acceptor_sup(pid()) -> pid()).
124
acceptor_sup(Sup) -> child_pid(Sup, acceptor_sup).
18✔
125

126
%% @doc Get child pid with id.
127
child_pid(Sup, ChildId) ->
128
    hd([Pid || {Id, Pid, _, _}
105✔
129
               <- supervisor:which_children(Sup), Id =:= ChildId]).
105✔
130

131
%%--------------------------------------------------------------------
132
%% Get/Set APIs
133
%%--------------------------------------------------------------------
134

135
get_options(Sup) ->
136
    esockd_listener:options(listener(Sup)).
6✔
137

138
get_acceptors(Sup) ->
139
    esockd_acceptor_sup:count_acceptors(acceptor_sup(Sup)).
12✔
140

141
get_max_connections(Sup) ->
142
    esockd_connection_sup:get_max_connections(connection_sup(Sup)).
15✔
143

144
set_max_connections(Sup, MaxConns) ->
145
    esockd_connection_sup:set_max_connections(connection_sup(Sup), MaxConns).
6✔
146

147
get_max_conn_rate(_Sup, Proto, ListenOn) ->
148
    case esockd_limiter:lookup({listener, Proto, ListenOn}) of
12✔
149
        undefined ->
UNCOV
150
            {error, not_found};
×
151
        #{capacity := Capacity, interval := Interval} ->
152
            {Capacity, Interval}
12✔
153
    end.
154

155
set_max_conn_rate(Sup, Proto, ListenOn, ConnRate) ->
156
    Limiter = conn_rate_limiter({listener, Proto, ListenOn}, ConnRate),
6✔
157
    [ok = Mod:set_conn_limiter(Acceptor, Limiter)
6✔
158
     || {_, Acceptor, _, [Mod]} <- supervisor:which_children(acceptor_sup(Sup))],
6✔
159
    ok.
6✔
160

161
get_current_connections(Sup) ->
162
    esockd_connection_sup:count_connections(connection_sup(Sup)).
9✔
163

164
get_shutdown_count(Sup) ->
165
    esockd_connection_sup:get_shutdown_count(connection_sup(Sup)).
6✔
166

167
get_access_rules(Sup) ->
168
    esockd_connection_sup:access_rules(connection_sup(Sup)).
27✔
169

170
allow(Sup, CIDR) ->
171
    esockd_connection_sup:allow(connection_sup(Sup), CIDR).
9✔
172

173
deny(Sup, CIDR) ->
174
    esockd_connection_sup:deny(connection_sup(Sup), CIDR).
9✔
175

176
%%--------------------------------------------------------------------
177
%% Supervisor callbacks
178
%%--------------------------------------------------------------------
179

180
init([]) ->
181
    {ok, {{rest_for_one, 10, 3600}, []}}.
120✔
182

183
%%--------------------------------------------------------------------
184
%% Sock tune/upgrade functions
185
%%--------------------------------------------------------------------
186

187
tune_socket_fun(Opts) ->
188
    TuneOpts = [{tune_buffer, proplists:get_bool(tune_buffer, Opts)}],
120✔
189
    {fun ?MODULE:tune_socket/2, [TuneOpts]}.
120✔
190

191
tune_socket(Sock, []) ->
192
    {ok, Sock};
93✔
193
tune_socket(Sock, [{tune_buffer, true}|More]) ->
UNCOV
194
    case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
×
195
        {ok, BufSizes} ->
UNCOV
196
            BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
×
UNCOV
197
            _ = esockd_transport:setopts(Sock, [{buffer, BufSz}]),
×
UNCOV
198
            tune_socket(Sock, More);
×
UNCOV
199
        Error -> Error
×
200
   end;
201
tune_socket(Sock, [_|More]) ->
202
    tune_socket(Sock, More).
93✔
203

204
upgrade_funs(Type, Opts) ->
205
    lists:append([proxy_upgrade_fun(Opts), ssl_upgrade_fun(Type, Opts)]).
120✔
206

207
proxy_upgrade_fun(Opts) ->
208
    case proplists:get_bool(proxy_protocol, Opts) of
120✔
209
        false -> [];
102✔
210
        true  -> [esockd_transport:proxy_upgrade_fun(Opts)]
18✔
211
    end.
212

213
ssl_upgrade_fun(Type, Opts) ->
214
    Key = case Type of
120✔
215
              dtls -> dtls_options;
30✔
216
              _ -> ssl_options
90✔
217
          end,
218
    case proplists:get_value(Key, Opts) of
120✔
219
        undefined -> [];
102✔
220
        SslOpts ->
221
            %% validate ssl options and prevent the listener from starting if
222
            %% validation failed
223
            _ = ssl:handle_options(SslOpts, server),
18✔
224
            [esockd_transport:ssl_upgrade_fun(SslOpts)]
18✔
225
    end.
226

227
conn_rate_opt(Opts) ->
228
    proplists:get_value(max_conn_rate, Opts).
120✔
229

230
conn_rate_limiter(_Bucket, undefined) ->
231
    undefined;
138✔
232
conn_rate_limiter(Bucket, ConnRate) when is_integer(ConnRate) ->
233
    conn_rate_limiter(Bucket, {ConnRate, 1});
3✔
234
conn_rate_limiter(Bucket, {Capacity, Interval}) ->
235
    esockd_limiter:create(Bucket, Capacity, Interval),
21✔
236
    Bucket.
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc