• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 407

09 Jul 2024 08:15AM UTC coverage: 67.729%. First build
407

Pull #188

github

web-flow
Merge b3bce0374 into 313713eff
Pull Request #188: feat: add content-sensitive proxy behaviour for UDP

7 of 108 new or added lines in 4 files covered. (6.48%)

871 of 1286 relevant lines covered (67.73%)

57.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/udp_proxy/esockd_udp_proxy.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp_proxy).
18

19
-behaviour(gen_server).
20

21
-include("include/esockd_proxy.hrl").
22

23
%% API
24
-export([start_link/3, send/2, close/1]).
25

26
%% gen_server callbacks
27
-export([
28
    init/1,
29
    handle_call/3,
30
    handle_cast/2,
31
    handle_info/2,
32
    terminate/2
33
]).
34

35
-export_type([connection_options/0]).
36

37
-define(NOW, erlang:system_time(second)).
38
-define(ERROR_MSG(Format, Args),
39
    error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])
40
).
41
-define(DEF_HEARTBEAT, 60).
42

43
-type timespan() :: non_neg_integer().
44

45
%%--------------------------------------------------------------------
46
%%  Definitions
47
%%--------------------------------------------------------------------
48

49
-type state() :: #{
50
    connection_mod := connection_module(),
51
    connection_id := connection_id() | undefined,
52
    connection_state := connection_state(),
53
    connection_options := connection_options(),
54
    %% last source's connection active time
55
    last_time := pos_integer(),
56
    transport := proxy_transport(),
57
    peer := peer()
58
}.
59

60
%%--------------------------------------------------------------------
61
%%- API
62
%%--------------------------------------------------------------------
63

64
start_link(Transport, Peer, Opts) ->
NEW
65
    gen_server:start_link(?MODULE, [Transport, Peer, Opts], []).
×
66

67
-spec send(proxy_id(), binary()) -> ok.
68
send(ProxyId, Data) ->
NEW
69
    gen_server:cast(ProxyId, {send, Data}).
×
70

71
close(ProxyId) ->
NEW
72
    case erlang:is_process_alive(ProxyId) of
×
73
        true ->
NEW
74
            gen_server:call(ProxyId, close);
×
75
        _ ->
NEW
76
            ok
×
77
    end.
78

79
%%--------------------------------------------------------------------
80
%%- gen_server callbacks
81
%%--------------------------------------------------------------------
82

83
init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
NEW
84
    #{connection_mod := Mod} = Opts,
×
NEW
85
    heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)),
×
NEW
86
    init_transport(Transport, Peer, #{
×
87
        last_time => ?NOW,
88
        connection_mod => Mod,
89
        connection_options => COpts,
90
        connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
91
        connection_id => undefined
92
    }).
93

94
handle_call(close, _From, State) ->
NEW
95
    {stop, {shutdown, close_transport}, ok, State};
×
96
handle_call(Request, _From, State) ->
NEW
97
    ?ERROR_MSG("Unexpected call: ~p", [Request]),
×
NEW
98
    {reply, ok, State}.
×
99

100
handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
NEW
101
    send(Transport, Peer, Data),
×
NEW
102
    {noreply, State};
×
103
handle_cast(Request, State) ->
NEW
104
    ?ERROR_MSG("Unexpected cast: ~p", [Request]),
×
NEW
105
    {noreply, State}.
×
106

107
handle_info({datagram, _SockPid, Data}, State) ->
NEW
108
    {noreply, handle_incoming(Data, State)};
×
109
handle_info({ssl, _Socket, Data}, State) ->
NEW
110
    {noreply, handle_incoming(Data, State)};
×
111
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
NEW
112
    Now = ?NOW,
×
NEW
113
    case Now - LastTime > Span of
×
114
        true ->
NEW
115
            {stop, normal, State};
×
116
        _ ->
NEW
117
            heartbeat(Span),
×
NEW
118
            {noreply, State}
×
119
    end;
120
handle_info({ssl_error, _Sock, Reason}, State) ->
NEW
121
    {stop, Reason, socket_exit(State)};
×
122
handle_info({ssl_closed, _Sock}, State) ->
NEW
123
    {stop, ssl_closed, socket_exit(State)};
×
124
handle_info(
125
    {'DOWN', _, process, _, _Reason},
126
    State
127
) ->
NEW
128
    {stop, {shutdown, connection_closed}, State};
×
129
handle_info(Info, State) ->
NEW
130
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
×
NEW
131
    {noreply, State}.
×
132

133
terminate(Reason, #{transport := Transport} = State) ->
NEW
134
    close_transport(Transport),
×
NEW
135
    Clear =
×
136
        case Reason of
137
            close_transport ->
NEW
138
                false;
×
139
            connection_closed ->
NEW
140
                false;
×
141
            _ ->
NEW
142
                true
×
143
        end,
NEW
144
    detach(State, Clear).
×
145

146
%%--------------------------------------------------------------------
147
%%- Internal functions
148
%%--------------------------------------------------------------------
149
-spec handle_incoming(socket_packet(), state()) -> state().
150
handle_incoming(
151
    Data,
152
    #{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} =
153
        State
154
) ->
NEW
155
    State2 = State#{last_time := ?NOW},
×
NEW
156
    case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of
×
157
        {ok, CId, Packet, CState2} ->
NEW
158
            dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
×
159
        invalid ->
NEW
160
            ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
×
161
                Transport, Peer, Mod
162
            ]),
NEW
163
            State2
×
164
    end.
165

166
-spec dispatch(
167
    connection_module(),
168
    esockd_transport:socket(),
169
    connection_id(),
170
    connection_packet(),
171
    state()
172
) ->
173
    state().
174
dispatch(
175
    Mod,
176
    CId,
177
    Data,
178
    Packet,
179
    #{
180
        transport := Transport,
181
        peer := Peer,
182
        connection_state := CState,
183
        connection_options := Opts
184
    } =
185
        State
186
) ->
NEW
187
    case lookup(Mod, Transport, Peer, CId, Opts) of
×
188
        {ok, Pid} ->
NEW
189
            esockd_udp_proxy_connection:dispatch(
×
190
                Mod, Pid, CState, {Transport, Data, Packet}
191
            ),
NEW
192
            attach(CId, State);
×
193
        {error, Reason} ->
NEW
194
            ?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]),
×
NEW
195
            State
×
196
    end.
197

198
-spec attach(connection_id(), state()) -> state().
199
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State) ->
NEW
200
    esockd_udp_proxy_db:attach(Mod, CId),
×
NEW
201
    State#{connection_id := CId};
×
202
attach(CId, #{connection_id := OldId} = State) when CId =/= OldId ->
NEW
203
    State2 = detach(State),
×
NEW
204
    attach(CId, State2);
×
205
attach(_CId, State) ->
NEW
206
    State.
×
207

208
-spec detach(state()) -> state().
209
detach(State) ->
NEW
210
    detach(State, true).
×
211

212
-spec detach(connection_id(), state()) -> state().
213
detach(#{connection_id := undefined} = State, _Clear) ->
NEW
214
    State;
×
215
detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) ->
NEW
216
    case esockd_udp_proxy_db:detach(Mod, CId) of
×
217
        {Clear, Pid} ->
NEW
218
            case erlang:is_process_alive(Pid) of
×
219
                true ->
NEW
220
                    esockd_udp_proxy_connection:close(Mod, Pid, CState);
×
221
                _ ->
NEW
222
                    ok
×
223
            end;
224
        _ ->
NEW
225
            ok
×
226
    end,
NEW
227
    State#{connection_id := undefined}.
×
228

229
-spec socket_exit(state()) -> state().
230
socket_exit(State) ->
NEW
231
    detach(State).
×
232

233
-spec heartbeat(timespan()) -> ok.
234
heartbeat(Span) ->
NEW
235
    erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}),
×
NEW
236
    ok.
×
237

238
-spec lookup(
239
    connection_module(),
240
    proxy_transport(),
241
    peer(),
242
    connection_id(),
243
    connection_options()
244
) -> {ok, pid()}.
245
lookup(Mod, Transport, Peer, CId, Opts) ->
NEW
246
    case esockd_udp_proxy_db:lookup(Mod, CId) of
×
247
        {ok, _} = Ok ->
NEW
248
            Ok;
×
249
        undefined ->
NEW
250
            case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of
×
251
                {ok, Pid} ->
NEW
252
                    esockd_udp_proxy_db:insert(Mod, CId, Pid),
×
NEW
253
                    _ = erlang:monitor(process, Pid),
×
NEW
254
                    {ok, Pid};
×
255
                ignore ->
NEW
256
                    {error, ignore};
×
257
                Error ->
NEW
258
                    Error
×
259
            end
260
    end.
261

262
-spec send(transport(), peer(), binary()) -> _.
263
send({?PROXY_TRANSPORT, _, Socket}, {IP, Port}, Data) when is_port(Socket) ->
NEW
264
    gen_udp:send(Socket, IP, Port, Data);
×
265
send({?PROXY_TRANSPORT, _, Socket}, _Peer, Data) ->
NEW
266
    socket:send(Socket, Data).
×
267

268
init_transport({udp, _, Sock}, Peer, State) ->
NEW
269
    {ok, State#{
×
270
        transport => {?PROXY_TRANSPORT, self(), Sock},
271
        peer => Peer
272
    }};
273
init_transport(esockd_transport, Sock, State) ->
NEW
274
    case esockd_transport:wait(Sock) of
×
275
        {ok, NSock} ->
NEW
276
            {ok, State#{
×
277
                transport => {?PROXY_TRANSPORT, self(), NSock},
278
                peer => esockd_transport:peername(NSock)
279
            }};
280
        Error ->
NEW
281
            Error
×
282
    end.
283

284
close_transport({?PROXY_TRANSPORT, _, Sock}) when is_port(Sock) ->
NEW
285
    ok;
×
286
close_transport({?PROXY_TRANSPORT, _, Sock}) ->
NEW
287
    esockd_transport:fast_close(Sock).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc