• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 329

20 Nov 2023 06:44AM UTC coverage: 67.117% (-3.7%) from 70.781%
329

push

github

terry-xiaoyu
chore: bump vsn to 5.8.11

696 of 1037 relevant lines covered (67.12%)

160.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/src/esockd_dtls_acceptor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_dtls_acceptor).
18

19
-behaviour(gen_statem).
20

21
-include("esockd.hrl").
22

23
-export([start_link/7]).
24

25
-export([ accepting/3
26
        , suspending/3
27
        , set_conn_limiter/2
28
        ]).
29

30
%% gen_statem callbacks
31
-export([ init/1
32
        , callback_mode/0
33
        , terminate/3
34
        , code_change/4
35
        ]).
36

37
-record(state, {
38
          proto        :: atom(),
39
          listen_on    :: esockd:listen_on(),
40
          lsock        :: ssl:sslsocket(),
41
          sockmod      :: module(), %% FIXME: NOT-USE
42
          sockname     :: {inet:ip_address(), inet:port_number()},
43
          tune_fun     :: esockd:sock_fun(),
44
          upgrade_funs :: [esockd:sock_fun()],
45
          conn_limiter :: undefined | esockd_limiter:bucket_name(),
46
          conn_sup     :: pid(),
47
          accept_ref   :: term()  %% FIXME: NOT-USE
48
         }).
49

50
%% @doc Start an acceptor
51
-spec(start_link(atom(), esockd:listen_on(), pid(),
52
                 esockd:sock_fun(), [esockd:sock_fun()],
53
                 esockd_limiter:bucket_name(), inet:socket())
54
      -> {ok, pid()} | {error, term()}).
55
start_link(Proto, ListenOn, ConnSup,
56
           TuneFun, UpgradeFuns, Limiter, LSock) ->
57
    gen_statem:start_link(?MODULE, [Proto, ListenOn, ConnSup,
204✔
58
                                    TuneFun, UpgradeFuns, Limiter, LSock], []).
59

60
set_conn_limiter(Acceptor, Limiter) ->
61
    %% NOTE: the acceptor process will blocked at `accept` function call
62
    gen_statem:cast(Acceptor, {set_conn_limiter, Limiter}).
24✔
63

64
%%--------------------------------------------------------------------
65
%% gen_statem callbacks
66
%%--------------------------------------------------------------------
67

68
init([Proto, ListenOn, ConnSup,
69
      TuneFun, UpgradeFuns, Limiter, LSock]) ->
70
    _ = rand:seed(exsplus, erlang:timestamp()),
204✔
71
    {ok, Sockname} = ssl:sockname(LSock),
204✔
72
    {ok, accepting, #state{proto        = Proto,
204✔
73
                           listen_on    = ListenOn,
74
                           lsock        = LSock,
75
                           sockname     = Sockname,
76
                           tune_fun     = TuneFun,
77
                           upgrade_funs = UpgradeFuns,
78
                           conn_limiter = Limiter,
79
                           conn_sup     = ConnSup},
80
     {next_event, internal, accept}}.
81

82
callback_mode() -> state_functions.
204✔
83

84
accepting(internal, accept,
85
          State = #state{proto        = Proto,
86
                         listen_on    = ListenOn,
87
                         lsock        = LSock,
88
                         sockname     = Sockname,
89
                         tune_fun     = TuneFun,
90
                         upgrade_funs = UpgradeFuns,
91
                         conn_sup     = ConnSup}) ->
92
    case ssl:transport_accept(LSock) of
222✔
93
        {ok, Sock} ->
94
            %% Inc accepted stats.
95
            esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),
18✔
96
            _ = case eval_tune_socket_fun(TuneFun, Sock) of
18✔
97
                {ok, Sock} ->
98
                    case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
18✔
99
                        {ok, _Pid} -> ok;
18✔
100
                        {error, enotconn} ->
101
                            close(Sock); %% quiet...issue #10
×
102
                        {error, einval} ->
103
                            close(Sock); %% quiet... haproxy check
×
104
                        {error, Reason} ->
105
                            error_logger:error_msg("Failed to start connection on ~s: ~p",
×
106
                                                   [esockd:format(Sockname), Reason]),
107
                            close(Sock)
×
108
                        end;
109
                {error, enotconn} ->
110
                    close(Sock);
×
111
                {error, einval} ->
112
                    close(Sock);
×
113
                {error, closed} ->
114
                    close(Sock);
×
115
                {error, Reason} ->
116
                    error_logger:error_msg("Tune buffer failed on ~s: ~s",
×
117
                                           [esockd:format(Sockname), Reason]),
118
                    close(Sock)
×
119
            end,
120
            rate_limit(State);
18✔
121
        {error, Reason} when Reason =:= emfile;
122
                             Reason =:= enfile ->
123
            {next_state, suspending, State, 1000};
×
124
        {error, closed} ->
125
            {stop, normal, State};
204✔
126
        {error, Reason} ->
127
            {stop, Reason, State}
×
128
    end;
129

130
accepting(cast, {set_conn_limiter, Limiter}, State) ->
131
    {keep_state, State#state{conn_limiter = Limiter}}.
×
132

133
suspending(timeout, _Timeout, State) ->
134
    {next_state, accepting, State, {next_event, internal, accept}};
×
135

136
suspending(cast, {set_conn_limiter, Limiter}, State) ->
137
    {keep_state, State#state{conn_limiter = Limiter}}.
×
138

139
terminate(_Reason, _StateName, _State) ->
140
    ok.
204✔
141

142
code_change(_OldVsn, StateName, State, _Extra) ->
143
    {ok, StateName, State}.
×
144

145
%%--------------------------------------------------------------------
146
%% Internal funcs
147
%%--------------------------------------------------------------------
148

149
close(Sock) -> ssl:close(Sock).
×
150

151
rate_limit(State = #state{conn_limiter = Limiter}) ->
152
    case esockd_limiter:consume(Limiter, 1) of
18✔
153
        {I, Pause} when I =< 0 ->
154
            {next_state, suspending, State, Pause};
×
155
        _ ->
156
            {keep_state, State, {next_event, internal, accept}}
18✔
157
    end.
158

159
eval_tune_socket_fun({Fun, Args1}, Sock) ->
160
    apply(Fun, [Sock|Args1]).
18✔
161

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc