• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 422

12 Jul 2024 04:27PM UTC coverage: 67.306%. First build
422

Pull #190

github

web-flow
Merge 173fb2645 into 4ec038215
Pull Request #190: fix: fixed the proxy could not be released even if the RC was 0

0 of 24 new or added lines in 2 files covered. (0.0%)

877 of 1303 relevant lines covered (67.31%)

56.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/udp_proxy/esockd_udp_proxy.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp_proxy).
18

19
-behaviour(gen_server).
20

21
-include("include/esockd_proxy.hrl").
22

23
%% API
24
-export([start_link/3, send/2, close/1, takeover/2]).
25

26
%% gen_server callbacks
27
-export([
28
    init/1,
29
    handle_call/3,
30
    handle_cast/2,
31
    handle_info/2,
32
    terminate/2
33
]).
34

35
-export_type([connection_options/0]).
36

37
-define(NOW, erlang:system_time(second)).
38
-define(ERROR_MSG(Format, Args),
39
    error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])
40
).
41
-define(DEF_HEARTBEAT, 60).
42

43
-type timespan() :: non_neg_integer().
44

45
%%--------------------------------------------------------------------
46
%%  Definitions
47
%%--------------------------------------------------------------------
48

49
-type state() :: #{
50
    connection_mod := connection_module(),
51
    connection_id := connection_id() | undefined,
52
    connection_state := connection_state(),
53
    connection_pid := pid() | undefined,
54
    connection_options := connection_options(),
55
    %% last source's connection active time
56
    last_time := pos_integer(),
57
    transport := proxy_transport(),
58
    peer := peer()
59
}.
60

61
%%--------------------------------------------------------------------
62
%%- API
63
%%--------------------------------------------------------------------
64

65
start_link(Transport, Peer, Opts) ->
66
    gen_server:start_link(?MODULE, [Transport, Peer, Opts], []).
×
67

68
-spec send(proxy_id(), binary()) -> ok.
69
send(ProxyId, Data) ->
70
    gen_server:cast(ProxyId, {send, Data}).
×
71

72
close(ProxyId) ->
73
    case erlang:is_process_alive(ProxyId) of
×
74
        true ->
75
            gen_server:call(ProxyId, close);
×
76
        _ ->
77
            ok
×
78
    end.
79

80
takeover(ProxyId, CId) ->
NEW
81
    _ = gen_server:cast(ProxyId, {?FUNCTION_NAME, CId}),
×
NEW
82
    ok.
×
83

84
%%--------------------------------------------------------------------
85
%%- gen_server callbacks
86
%%--------------------------------------------------------------------
87

88
init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
89
    #{connection_mod := Mod} = Opts,
×
90
    heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)),
×
91
    init_transport(Transport, Peer, #{
×
92
        last_time => ?NOW,
93
        connection_mod => Mod,
94
        connection_options => COpts,
95
        connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
96
        connection_id => undefined,
97
        connection_pid => undefined
98
    }).
99

100
handle_call(close, _From, State) ->
101
    {stop, {shutdown, close_transport}, ok, State};
×
102
handle_call(Request, _From, State) ->
103
    ?ERROR_MSG("Unexpected call: ~p", [Request]),
×
104
    {reply, ok, State}.
×
105

106
handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
107
    case send(Transport, Peer, Data) of
×
108
        ok ->
109
            {noreply, State};
×
110
        {error, Reason} ->
111
            ?ERROR_MSG("Send failed, Reason: ~0p", [Reason]),
×
112
            {stop, {sock_error, Reason}, State}
×
113
    end;
114
handle_cast({takeover, CId}, #{connection_id := CId} = State) ->
NEW
115
    {stop, {shutdown, takeover}, State};
×
116
handle_cast({takeover, _CId}, State) ->
NEW
117
    {noreply, State};
×
118
handle_cast(Request, State) ->
119
    ?ERROR_MSG("Unexpected cast: ~p", [Request]),
×
120
    {noreply, State}.
×
121

122
handle_info({datagram, _SockPid, Data}, State) ->
123
    {noreply, handle_incoming(Data, State)};
×
124
handle_info({ssl, _Socket, Data}, State) ->
125
    {noreply, handle_incoming(Data, State)};
×
126
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
127
    Now = ?NOW,
×
128
    case Now - LastTime > Span of
×
129
        true ->
130
            {stop, normal, State};
×
131
        _ ->
132
            heartbeat(Span),
×
133
            {noreply, State}
×
134
    end;
135
handle_info({ssl_error, _Sock, Reason}, State) ->
136
    {stop, Reason, socket_exit(State)};
×
137
handle_info({ssl_closed, _Sock}, State) ->
138
    {stop, ssl_closed, socket_exit(State)};
×
139
handle_info(
140
    {'DOWN', _, process, Pid, _Reason},
141
    State
142
) ->
NEW
143
    ct:print(">>> DOWN:~p, Self:~p, ~p~n", [Pid, self(), ets:tab2list(esockd_udp_proxy_db)]),
×
144
    {stop, {shutdown, connection_closed}, State};
×
145
handle_info(Info, State) ->
146
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
×
147
    {noreply, State}.
×
148

149
terminate(Reason, #{transport := Transport} = State) ->
150
    close_transport(Transport),
×
151
    Clear =
×
152
        case Reason of
153
            close_transport ->
154
                false;
×
155
            connection_closed ->
156
                false;
×
157
            takeover ->
NEW
158
                false;
×
159
            _ ->
160
                true
×
161
        end,
162
    detach(State, Clear).
×
163

164
%%--------------------------------------------------------------------
165
%%- Internal functions
166
%%--------------------------------------------------------------------
167
-spec handle_incoming(socket_packet(), state()) -> state().
168
handle_incoming(
169
    Data,
170
    #{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} =
171
        State
172
) ->
173
    State2 = State#{last_time := ?NOW},
×
174
    case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of
×
175
        {ok, CId, Packet, CState2} ->
176
            dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
×
177
        invalid ->
178
            ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
×
179
                Transport, Peer, Mod
180
            ]),
181
            State2
×
182
    end.
183

184
-spec dispatch(
185
    connection_module(),
186
    esockd_transport:socket(),
187
    connection_id(),
188
    connection_packet(),
189
    state()
190
) ->
191
    state().
192
dispatch(
193
    Mod,
194
    CId,
195
    Data,
196
    Packet,
197
    #{
198
        transport := Transport,
199
        peer := Peer,
200
        connection_state := CState,
201
        connection_options := Opts
202
    } =
203
        State
204
) ->
205
    case lookup(Mod, Transport, Peer, CId, Opts) of
×
206
        {ok, Pid} ->
207
            esockd_udp_proxy_connection:dispatch(
×
208
                Mod, Pid, CState, {Transport, Data, Packet}
209
            ),
NEW
210
            attach(CId, State, Pid);
×
211
        {error, Reason} ->
212
            ?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]),
×
213
            State
×
214
    end.
215

216
-spec attach(connection_id(), state(), pid()) -> state().
217
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State, Pid) ->
NEW
218
    esockd_udp_proxy_db:attach(Mod, CId, Pid),
×
NEW
219
    _ = erlang:monitor(process, Pid),
×
NEW
220
    State#{connection_id := CId, connection_pid := Pid};
×
221
attach(CId, #{connection_id := OldId} = State, Pid) when CId =/= OldId ->
222
    State2 = detach(State),
×
NEW
223
    attach(CId, State2, Pid);
×
224
attach(_CId, State, _Pid) ->
225
    State.
×
226

227
detach(State) ->
228
    detach(State, true).
×
229

230
-spec detach(state()) -> state().
231
detach(#{connection_id := undefined} = State, _Clear) ->
232
    State;
×
233
detach(
234
    #{
235
        connection_id := CId,
236
        connection_pid := Pid,
237
        connection_mod := Mod,
238
        connection_state := CState
239
    } = State,
240
    Clear
241
) ->
NEW
242
    _ = erlang:demonitor(Pid),
×
243
    case esockd_udp_proxy_db:detach(Mod, CId) of
×
244
        {Clear, Pid} ->
245
            case erlang:is_process_alive(Pid) of
×
246
                true ->
247
                    esockd_udp_proxy_connection:close(Mod, Pid, CState);
×
248
                _ ->
249
                    ok
×
250
            end;
251
        _ ->
252
            ok
×
253
    end,
NEW
254
    State#{connection_id := undefined, connection_pid := undefined}.
×
255

256
-spec socket_exit(state()) -> state().
257
socket_exit(State) ->
258
    detach(State).
×
259

260
-spec heartbeat(timespan()) -> ok.
261
heartbeat(Span) ->
262
    erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}),
×
263
    ok.
×
264

265
-spec lookup(
266
    connection_module(),
267
    proxy_transport(),
268
    peer(),
269
    connection_id(),
270
    connection_options()
271
) -> {ok, pid()} | {error, Reason :: term()}.
272
lookup(Mod, Transport, Peer, CId, Opts) ->
273
    case esockd_udp_proxy_db:lookup(Mod, CId) of
×
274
        {ok, _} = Ok ->
275
            Ok;
×
276
        undefined ->
277
            case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of
×
278
                {ok, Pid} ->
279
                    {ok, Pid};
×
280
                ignore ->
281
                    {error, ignore};
×
282
                Error ->
283
                    Error
×
284
            end
285
    end.
286

287
-spec send(proxy_transport(), peer(), binary()) -> _.
288
send({?PROXY_TRANSPORT, _, Socket}, {IP, Port}, Data) when is_port(Socket) ->
289
    gen_udp:send(Socket, IP, Port, Data);
×
290
send({?PROXY_TRANSPORT, _, Socket}, _Peer, Data) ->
291
    esockd_transport:send(Socket, Data).
×
292

293
init_transport({udp, _, Sock}, Peer, State) ->
294
    {ok, State#{
×
295
        transport => {?PROXY_TRANSPORT, self(), Sock},
296
        peer => Peer
297
    }};
298
init_transport(esockd_transport, Sock, State) ->
299
    case esockd_transport:wait(Sock) of
×
300
        {ok, NSock} ->
301
            {ok, State#{
×
302
                transport => {?PROXY_TRANSPORT, self(), NSock},
303
                peer => esockd_transport:peername(NSock)
304
            }};
305
        Error ->
306
            Error
×
307
    end.
308

309
close_transport({?PROXY_TRANSPORT, _, Sock}) when is_port(Sock) ->
310
    ok;
×
311
close_transport({?PROXY_TRANSPORT, _, Sock}) ->
312
    esockd_transport:fast_close(Sock).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc