• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 352

12 Dec 2023 09:17PM UTC coverage: 70.175%. First build
352

Pull #183

github

web-flow
Merge c60472aab into 5cb22a8b1
Pull Request #183: feat(listener): support changing options on the fly

143 of 177 new or added lines in 10 files covered. (80.79%)

800 of 1140 relevant lines covered (70.18%)

59.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.67
/src/esockd_acceptor_sup.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_acceptor_sup).
18

19
-behaviour(supervisor).
20

21
-export([ start_link/6
22
        , start_supervised/1
23
        ]).
24

25
-export([ start_acceptors/2
26
        , start_acceptor/2
27
        , count_acceptors/1
28
        ]).
29

30
%% Supervisor callbacks
31
-export([init/1]).
32

33
%% callbacks
34
-export([tune_socket/2]).
35

36
-define(ACCEPTOR_POOL, 16).
37

38
%%--------------------------------------------------------------------
39
%% API
40
%%--------------------------------------------------------------------
41

42
%% @doc Start Acceptor Supervisor.
43
-spec(start_link(atom(), esockd:listen_on(), pid(),
44
                 esockd:sock_fun(), [esockd:sock_fun()], esockd_generic_limiter:limiter())
45
     -> {ok, pid()}).
46
start_link(Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter) ->
NEW
47
    supervisor:start_link(?MODULE, { esockd_acceptor
×
48
                                   , [Proto, ListenOn, ConnSup,
49
                                      TuneFun, UpgradeFuns, Limiter]}).
50

51
%% @doc Start Acceptor Supervisor.
52
-spec start_supervised(esockd:listener_ref()) -> {ok, pid()}.
53
start_supervised(ListenerRef = {Proto, ListenOn}) ->
54
    Type = esockd_server:get_listener_prop(ListenerRef, type),
64✔
55
    Opts = esockd_server:get_listener_prop(ListenerRef, options),
64✔
56
    TuneFun = tune_socket_fun(Opts),
64✔
57
    UpgradeFuns = upgrade_funs(Type, Opts),
64✔
58
    LimiterOpts = esockd_listener_sup:conn_limiter_opts(Opts, {listener, Proto, ListenOn}),
64✔
59
    Limiter = esockd_listener_sup:conn_rate_limiter(LimiterOpts),
64✔
60
    AcceptorMod = case Type of
64✔
61
                         dtls -> esockd_dtls_acceptor;
12✔
62
                         _ -> esockd_acceptor
52✔
63
                     end,
64
    ConnSup = esockd_server:get_listener_prop(ListenerRef, connection_sup),
64✔
65
    AcceptorArgs = [Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter],
64✔
66
    case supervisor:start_link(?MODULE, {AcceptorMod, AcceptorArgs}) of
64✔
67
        {ok, Pid} ->
68
            _ = esockd_server:set_listener_prop(ListenerRef, acceptor_sup, Pid),
64✔
69
            {ok, Pid};
64✔
70
        {error, _} = Error ->
NEW
71
            Error
×
72
    end.
73

74
%% @doc Start acceptors.
75
-spec start_acceptors(esockd:listener_ref(), inet:socket()) -> ok.
76
start_acceptors(ListenerRef, LSock) ->
77
    Opts = esockd_server:get_listener_prop(ListenerRef, options),
64✔
78
    AcceptorNum = proplists:get_value(acceptors, Opts, ?ACCEPTOR_POOL),
64✔
79
    AcceptorSup = esockd_server:get_listener_prop(ListenerRef, acceptor_sup),
64✔
80
    lists:foreach(
64✔
81
        fun(_) ->
82
            %% FIXME
83
            {ok, _} = supervisor:start_child(AcceptorSup, [LSock])
916✔
84
        end,
85
        lists:seq(1, AcceptorNum)
86
    ).
87

88
%% @doc Start a acceptor.
89
-spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | {error, term()}).
90
start_acceptor(AcceptorSup, LSock) ->
91
    supervisor:start_child(AcceptorSup, [LSock]).
×
92

93
%% @doc Count acceptors.
94
-spec(count_acceptors(AcceptorSup :: pid()) -> pos_integer()).
95
count_acceptors(AcceptorSup) ->
96
    proplists:get_value(active, supervisor:count_children(AcceptorSup), 0).
6✔
97

98
%%--------------------------------------------------------------------
99
%% Supervisor callbacks
100
%%--------------------------------------------------------------------
101

102
init({AcceptorMod, AcceptorArgs}) ->
103
    SupFlags = #{strategy => simple_one_for_one,
64✔
104
                 intensity => 100000,
105
                 period => 1
106
                },
107
    Acceptor = #{id => acceptor,
64✔
108
                 start => {AcceptorMod, start_link, AcceptorArgs},
109
                 restart => transient,
110
                 shutdown => 1000,
111
                 type => worker,
112
                 modules => [AcceptorMod]
113
                },
114
    {ok, {SupFlags, [Acceptor]}}.
64✔
115

116
%%--------------------------------------------------------------------
117
%% Internal functions
118
%% -------------------------------------------------------------------
119

120
tune_socket_fun(Opts) ->
121
    TuneOpts = [ {tune_buffer, proplists:get_bool(tune_buffer, Opts)}
64✔
122
                 %% optional callback, returns ok | {error, Reason}
123
               , {tune_fun, proplists:get_value(tune_fun, Opts, undefined)}],
124
    {fun ?MODULE:tune_socket/2, [TuneOpts]}.
64✔
125

126
upgrade_funs(Type, Opts) ->
127
    lists:append([proxy_upgrade_fun(Opts), ssl_upgrade_fun(Type, Opts)]).
64✔
128

129
proxy_upgrade_fun(Opts) ->
130
    case proplists:get_bool(proxy_protocol, Opts) of
64✔
131
        false -> [];
51✔
132
        true  -> [esockd_transport:proxy_upgrade_fun(Opts)]
13✔
133
    end.
134

135
ssl_upgrade_fun(Type, Opts) ->
136
    Key = case Type of
64✔
137
              dtls -> dtls_options;
12✔
138
              _ -> ssl_options
52✔
139
          end,
140
    case proplists:get_value(Key, Opts) of
64✔
141
        undefined -> [];
51✔
142
        SslOpts -> [esockd_transport:ssl_upgrade_fun(SslOpts)]
13✔
143
    end.
144

145
tune_socket(Sock, []) ->
146
    {ok, Sock};
48✔
147
tune_socket(Sock, [{tune_buffer, true}|More]) ->
NEW
148
    case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
×
149
        {ok, BufSizes} ->
NEW
150
            BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
×
NEW
151
            _ = esockd_transport:setopts(Sock, [{buffer, BufSz}]),
×
NEW
152
            tune_socket(Sock, More);
×
NEW
153
        Error -> Error
×
154
   end;
155
tune_socket(Sock, [{tune_fun, undefined} | More]) ->
156
    tune_socket(Sock, More);
47✔
157
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
158
    case apply(M, F, A) of
3✔
159
        ok ->
160
            tune_socket(Sock, More);
1✔
161
        Error -> Error
2✔
162
    end;
163
tune_socket(Sock, [_|More]) ->
164
    tune_socket(Sock, More).
50✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc