• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 435

13 Jul 2024 11:07AM UTC coverage: 66.998%. First build
435

Pull #190

github

web-flow
Merge 8478e6815 into 4ec038215
Pull Request #190: fix: refactor the udp proxy

0 of 46 new or added lines in 4 files covered. (0.0%)

877 of 1309 relevant lines covered (67.0%)

56.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/udp_proxy/esockd_udp_proxy.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp_proxy).
18

19
-behaviour(gen_server).
20

21
-include("include/esockd_proxy.hrl").
22

23
%% API
24
-export([start_link/3, send/2, close/1, takeover/2]).
25

26
%% gen_server callbacks
27
-export([
28
    init/1,
29
    handle_call/3,
30
    handle_cast/2,
31
    handle_info/2,
32
    terminate/2
33
]).
34

35
-export_type([connection_options/0]).
36

37
-define(NOW, erlang:system_time(second)).
38
-define(ERROR_MSG(Format, Args),
39
    error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])
40
).
41
-define(DEF_HEARTBEAT, 60).
42

43
-type timespan() :: non_neg_integer().
44

45
%%--------------------------------------------------------------------
46
%%  Definitions
47
%%--------------------------------------------------------------------
48

49
-type state() :: #{
50
    connection_mod := connection_module(),
51
    connection_id := connection_id() | undefined,
52
    connection_state := connection_state(),
53
    connection_pid := pid() | undefined,
54
    connection_ref := reference() | undefined,
55
    connection_options := connection_options(),
56
    %% last source's connection active time
57
    last_time := pos_integer(),
58
    transport := proxy_transport(),
59
    peer := peer()
60
}.
61

62
%%--------------------------------------------------------------------
63
%%- API
64
%%--------------------------------------------------------------------
65

66
start_link(Transport, Peer, Opts) ->
67
    gen_server:start_link(?MODULE, [Transport, Peer, Opts], []).
×
68

69
-spec send(proxy_id(), binary()) -> ok.
70
send(ProxyId, Data) ->
71
    gen_server:cast(ProxyId, {send, Data}).
×
72

73
close(ProxyId) ->
74
    case erlang:is_process_alive(ProxyId) of
×
75
        true ->
76
            gen_server:call(ProxyId, close);
×
77
        _ ->
78
            ok
×
79
    end.
80

81
takeover(ProxyId, CId) ->
NEW
82
    _ = gen_server:cast(ProxyId, {?FUNCTION_NAME, CId}),
×
NEW
83
    ok.
×
84

85
%%--------------------------------------------------------------------
86
%%- gen_server callbacks
87
%%--------------------------------------------------------------------
88

89
init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
90
    #{connection_mod := Mod} = Opts,
×
91
    heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)),
×
92
    init_transport(Transport, Peer, #{
×
93
        last_time => ?NOW,
94
        connection_mod => Mod,
95
        connection_options => COpts,
96
        connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
97
        connection_id => undefined,
98
        connection_pid => undefined,
99
        connection_ref => undefined
100
    }).
101

102
handle_call(close, _From, State) ->
103
    {stop, {shutdown, close_transport}, ok, State};
×
104
handle_call(Request, _From, State) ->
105
    ?ERROR_MSG("Unexpected call: ~p", [Request]),
×
106
    {reply, ok, State}.
×
107

108
handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
109
    case send(Transport, Peer, Data) of
×
110
        ok ->
111
            {noreply, State};
×
112
        {error, Reason} ->
113
            ?ERROR_MSG("Send failed, Reason: ~0p", [Reason]),
×
114
            {stop, {sock_error, Reason}, State}
×
115
    end;
116
handle_cast({takeover, CId}, #{connection_id := CId} = State) ->
NEW
117
    {stop, {shutdown, takeover}, State};
×
118
handle_cast({takeover, _CId}, State) ->
NEW
119
    {noreply, State};
×
120
handle_cast(Request, State) ->
121
    ?ERROR_MSG("Unexpected cast: ~p", [Request]),
×
122
    {noreply, State}.
×
123

124
handle_info({datagram, _SockPid, Data}, State) ->
NEW
125
    handle_incoming(Data, State);
×
126
handle_info({ssl, _Socket, Data}, State) ->
NEW
127
    handle_incoming(Data, State);
×
128
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
129
    Now = ?NOW,
×
130
    case Now - LastTime > Span of
×
131
        true ->
132
            {stop, normal, State};
×
133
        _ ->
134
            heartbeat(Span),
×
135
            {noreply, State}
×
136
    end;
137
handle_info({ssl_error, _Sock, Reason}, State) ->
138
    {stop, Reason, socket_exit(State)};
×
139
handle_info({ssl_closed, _Sock}, State) ->
140
    {stop, ssl_closed, socket_exit(State)};
×
141
handle_info(
142
    {'DOWN', _, process, _Pid, _Reason},
143
    State
144
) ->
145
    {stop, {shutdown, connection_closed}, State};
×
146
handle_info(Info, State) ->
147
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
×
148
    {noreply, State}.
×
149

150
terminate(Reason, #{transport := Transport} = State) ->
151
    close_transport(Transport),
×
152
    Clear =
×
153
        case Reason of
154
            close_transport ->
155
                false;
×
156
            connection_closed ->
157
                false;
×
158
            takeover ->
NEW
159
                false;
×
160
            _ ->
161
                true
×
162
        end,
163
    detach(State, Clear).
×
164

165
%%--------------------------------------------------------------------
166
%%- Internal functions
167
%%--------------------------------------------------------------------
168
-spec handle_incoming(socket_packet(), state()) -> _.
169
handle_incoming(
170
    Data,
171
    #{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} =
172
        State
173
) ->
174
    State2 = State#{last_time := ?NOW},
×
175
    case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of
×
176
        {ok, CId, Packet, CState2} ->
177
            dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
×
178
        {error, Reply} ->
NEW
179
            ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
×
180
                Transport, Peer, Mod
181
            ]),
NEW
182
            _ = send(Transport, Peer, Reply),
×
NEW
183
            {stop, {shutdown, no_clientid}, State2};
×
184
        invalid ->
185
            ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
×
186
                Transport, Peer, Mod
187
            ]),
NEW
188
            {stop, {shutdown, no_clientid}, State2}
×
189
    end.
190

191
-spec dispatch(
192
    connection_module(),
193
    esockd_transport:socket(),
194
    connection_id(),
195
    connection_packet(),
196
    state()
197
) -> _.
198
dispatch(
199
    Mod,
200
    CId,
201
    Data,
202
    Packet,
203
    #{
204
        transport := Transport,
205
        connection_state := CState
206
    } =
207
        State
208
) ->
NEW
209
    case lookup(CId, State) of
×
210
        {ok, Pid} ->
NEW
211
            Result = attach(CId, State, Pid),
×
212
            esockd_udp_proxy_connection:dispatch(
×
213
                Mod, Pid, CState, {Transport, Data, Packet}
214
            ),
NEW
215
            {noreply, Result};
×
216
        {error, Reason} ->
217
            ?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]),
×
NEW
218
            {noreply, State}
×
219
    end.
220

221
-spec attach(connection_id(), state(), pid()) -> state().
222
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State, Pid) ->
223
    esockd_udp_proxy_db:attach(Mod, CId),
×
NEW
224
    Ref = erlang:monitor(process, Pid),
×
NEW
225
    State#{connection_id := CId, connection_pid := Pid, connection_ref := Ref};
×
226
attach(CId, #{connection_id := OldId} = State, Pid) when CId =/= OldId ->
NEW
227
    State2 = detach(State, false),
×
NEW
228
    attach(CId, State2, Pid);
×
229
attach(_CId, State, _Pid) ->
230
    State.
×
231

232
detach(State) ->
233
    detach(State, true).
×
234

235
-spec detach(state()) -> state().
236
detach(#{connection_id := undefined} = State, _Clear) ->
237
    State;
×
238
detach(
239
    #{
240
        connection_id := CId,
241
        connection_pid := Pid,
242
        connection_ref := Ref,
243
        connection_mod := Mod,
244
        connection_state := CState
245
    } = State,
246
    Clear
247
) ->
NEW
248
    erlang:demonitor(Ref),
×
249

NEW
250
    Result = esockd_udp_proxy_db:detach(Mod, CId),
×
NEW
251
    case Clear andalso Result of
×
252
        true ->
253
            case erlang:is_process_alive(Pid) of
×
254
                true ->
255
                    esockd_udp_proxy_connection:close(Mod, Pid, CState);
×
256
                _ ->
257
                    ok
×
258
            end;
259
        _ ->
260
            ok
×
261
    end,
NEW
262
    State#{connection_id := undefined, connection_pid := undefined, connection_ref := undefined}.
×
263

264
-spec socket_exit(state()) -> state().
265
socket_exit(State) ->
266
    detach(State).
×
267

268
-spec heartbeat(timespan()) -> ok.
269
heartbeat(Span) ->
270
    erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}),
×
271
    ok.
×
272

273
-spec lookup(connection_id(), state()) -> {ok, pid()} | {error, Reason :: term()}.
274
lookup(_CId, #{connection_pid := Pid}) when is_pid(Pid) ->
NEW
275
    {ok, Pid};
×
276
lookup(CId, #{
277
    connection_pid := undefined,
278
    connection_mod := Mod,
279
    transport := Transport,
280
    peer := Peer,
281
    connection_options := Opts
282
}) ->
283
    %% TODO: use proc_lib:start_link to instead of this call
NEW
284
    Fun = fun() ->
×
NEW
285
        esockd_udp_proxy_connection:find_or_create(Mod, CId, Transport, Peer, Opts)
×
286
    end,
NEW
287
    case esockd_udp:proxy_request(Fun) of
×
288
        {ok, Pid} ->
NEW
289
            {ok, Pid};
×
290
        ignore ->
NEW
291
            {error, ignore};
×
292
        Error ->
NEW
293
            Error
×
294
    end.
295

296
-spec send(proxy_transport(), peer(), binary()) -> _.
297
send({?PROXY_TRANSPORT, _, Socket}, {IP, Port}, Data) when is_port(Socket) ->
298
    gen_udp:send(Socket, IP, Port, Data);
×
299
send({?PROXY_TRANSPORT, _, Socket}, _Peer, Data) ->
300
    esockd_transport:send(Socket, Data).
×
301

302
init_transport({udp, _, Sock}, Peer, State) ->
303
    {ok, State#{
×
304
        transport => {?PROXY_TRANSPORT, self(), Sock},
305
        peer => Peer
306
    }};
307
init_transport(esockd_transport, Sock, State) ->
308
    case esockd_transport:wait(Sock) of
×
309
        {ok, NSock} ->
310
            {ok, State#{
×
311
                transport => {?PROXY_TRANSPORT, self(), NSock},
312
                peer => esockd_transport:peername(NSock)
313
            }};
314
        Error ->
315
            Error
×
316
    end.
317

318
close_transport({?PROXY_TRANSPORT, _, Sock}) when is_port(Sock) ->
319
    ok;
×
320
close_transport({?PROXY_TRANSPORT, _, Sock}) ->
321
    esockd_transport:fast_close(Sock).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc