• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 396

08 Jul 2024 10:56AM UTC coverage: 69.194%. First build
396

Pull #188

github

web-flow
Merge 9c2399c81 into 313713eff
Pull Request #188: feat: add content-sensitive proxy behaviour for UDP

0 of 72 new or added lines in 3 files covered. (0.0%)

867 of 1253 relevant lines covered (69.19%)

59.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/udp_proxy/esockd_udp_proxy.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(esockd_udp_proxy).
18

19
-behaviour(gen_server).
20

21
-include("include/esockd_proxy.hrl").
22

23
%% API
24
-export([start_link/3, send/2]).
25

26
%% gen_server callbacks
27
-export([
28
    init/1,
29
    handle_call/3,
30
    handle_cast/2,
31
    handle_info/2,
32
    terminate/2
33
]).
34

35
-export_type([connection_options/0]).
36

37
-define(NOW, erlang:system_time(second)).
38
-define(ERROR_MSG(Format, Args),
39
    error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])
40
).
41
-define(DEF_HEARTBEAT, 60).
42

43
-type timespan() :: non_neg_integer().
44

45
%%--------------------------------------------------------------------
46
%%  Definitions
47
%%--------------------------------------------------------------------
48

49
-type state() :: #{
50
    connection_mod := connection_module(),
51
    connection_id := connection_id() | undefined,
52
    connection_state := connection_state(),
53
    connection_options := connection_options(),
54
    %% last source's connection active time
55
    last_time := pos_integer(),
56
    transport := transport(),
57

58
    peer := peer()
59
}.
60

61
%%--------------------------------------------------------------------
62
%%- API
63
%%--------------------------------------------------------------------
64

65
start_link(Transport, Peer, Opts) ->
NEW
66
    gen_server:start_link(?MODULE, [Transport, Peer, Opts], []).
×
67

68
-spec send(proxy_id(), binary()) -> ok.
69
send(ProxyId, Data) ->
NEW
70
    gen_server:cast(ProxyId, {send, Data}).
×
71

72
%%--------------------------------------------------------------------
73
%%- gen_server callbacks
74
%%--------------------------------------------------------------------
75

76
init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
NEW
77
    #{connection_mod := Mod} = Opts,
×
NEW
78
    heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)),
×
NEW
79
    {ok, #{
×
80
        last_time => ?NOW,
81
        peer => Peer,
82
        transport => Transport,
83
        connection_mod => Mod,
84
        connection_options => COpts,
85
        connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
86
        connection_id => undefined
87
    }}.
88

89
handle_call(Request, _From, State) ->
NEW
90
    ?ERROR_MSG("Unexpected call: ~p", [Request]),
×
NEW
91
    {reply, ok, State}.
×
92

93
handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
NEW
94
    send(Transport, Peer, Data),
×
NEW
95
    {noreply, State};
×
96
handle_cast(Request, State) ->
NEW
97
    ?ERROR_MSG("Unexpected cast: ~p", [Request]),
×
NEW
98
    {noreply, State}.
×
99

100
handle_info({udp, Socket, IP, Port, Data}, State) ->
NEW
101
    Transport = {udp, self(), Socket},
×
NEW
102
    Peer = {IP, Port},
×
NEW
103
    {noreply, handle_incoming(Transport, Peer, Socket, Data, State)};
×
104
handle_info({ssl, Socket, Data}, State) ->
NEW
105
    {noreply, handle_incoming(?SSL_TRANSPORT, Socket, Socket, Data, State)};
×
106
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
NEW
107
    Now = ?NOW,
×
NEW
108
    case Now - LastTime > Span of
×
109
        true ->
NEW
110
            {stop, normal, State};
×
111
        _ ->
NEW
112
            heartbeat(Span),
×
NEW
113
            {noreply, State}
×
114
    end;
115
handle_info({ssl_error, _Sock, Reason}, State) ->
NEW
116
    {stop, Reason, socket_exit(State)};
×
117
handle_info({ssl_closed, _Sock}, State) ->
NEW
118
    {stop, ssl_closed, socket_exit(State)};
×
119
handle_info(
120
    {'DOWN', _, process, _, _Reason},
121
    State
122
) ->
NEW
123
    {stop, connection_closed, State};
×
124
handle_info(Info, State) ->
NEW
125
    ?ERROR_MSG("Unexpected info: ~p", [Info]),
×
NEW
126
    {noreply, State}.
×
127

128
terminate(Reason, State) ->
NEW
129
    detach(State, Reason =/= connection_closed).
×
130

131
%%--------------------------------------------------------------------
132
%%- Internal functions
133
%%--------------------------------------------------------------------
134
-spec handle_incoming(transport(), peer(), socket(), socket_packet(), state()) -> state().
135
handle_incoming(
136
    Transport, Peer, Socket, Data, #{connection_mod := Mod, connection_state := CState} = State
137
) ->
NEW
138
    State2 = State#{peer := Peer, transport := Transport, last_time := ?NOW},
×
NEW
139
    case
×
140
        esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, Socket, CState, Data)
141
    of
142
        {ok, CId, Packet, CState2} ->
NEW
143
            dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
×
144
        invalid ->
NEW
145
            State2
×
146
    end.
147

148
-spec dispatch(
149
    connection_module(),
150
    connection_id(),
151
    socket_packet(),
152
    connection_packet(),
153
    state()
154
) ->
155
    state().
156
dispatch(
157
    Mod,
158
    CId,
159
    Data,
160
    Packet,
161
    #{connection_state := CState, connection_options := Opts} = State
162
) ->
NEW
163
    {ok, Pid} = lookup(Mod, CId, Opts),
×
NEW
164
    esockd_udp_proxy_connection:dispatch(
×
165
        Mod, Pid, CState, {?PROXY_TRANSPORT, self(), Data, Packet}
166
    ),
NEW
167
    attach(CId, State).
×
168

169
-spec attach(connection_id(), state()) -> state().
170
attach(CId, #{connection_id := undefined} = State) ->
NEW
171
    esockd_udp_proxy_db:attach(CId),
×
NEW
172
    State#{connection_id := CId};
×
173
attach(CId, #{connection_id := OldId} = State) when CId =/= OldId ->
NEW
174
    State2 = detach(State),
×
NEW
175
    attach(CId, State2);
×
176
attach(_CId, State) ->
NEW
177
    State.
×
178

179
-spec detach(state()) -> state().
180
detach(State) ->
NEW
181
    detach(State, true).
×
182

183
-spec detach(connection_id(), state()) -> state().
184
detach(#{connection_id := undefined} = State, _Clear) ->
NEW
185
    State;
×
186
detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) ->
NEW
187
    case esockd_udp_proxy_db:detach(CId) of
×
188
        {Clear, Pid} ->
NEW
189
            esockd_udp_proxy_connection:close(Mod, Pid, CState);
×
190
        _ ->
NEW
191
            ok
×
192
    end,
NEW
193
    State#{connection_id := undefined}.
×
194

195
-spec socket_exit(state()) -> state().
196
socket_exit(State) ->
NEW
197
    detach(State).
×
198

199
-spec heartbeat(timespan()) -> ok.
200
heartbeat(Span) ->
NEW
201
    erlang:send_after(self(), timer:seconds(Span), {?FUNCTION_NAME, Span}),
×
NEW
202
    ok.
×
203

204
-spec lookup(
205
    connection_module(),
206
    connection_id(),
207
    connection_options()
208
) -> {ok, pid()}.
209
lookup(Mod, CId, Opts) ->
NEW
210
    case esockd_udp_proxy_db:lookup(CId) of
×
211
        {ok, _} = Ok ->
NEW
212
            Ok;
×
213
        undefined ->
NEW
214
            {ok, Pid} = esockd_udp_proxy_connection:create(Mod, ?PROXY_TRANSPORT, self(), Opts),
×
NEW
215
            _ = erlang:monitor(process, Pid),
×
NEW
216
            {ok, Pid}
×
217
    end.
218

219
-spec send(transport(), peer(), binary()) -> _.
220
send({udp, _, Socket}, {IP, Port}, Data) ->
NEW
221
    gen_udp:send(Socket, IP, Port, Data);
×
222
send(?SSL_TRANSPORT, Socket, Data) ->
NEW
223
    socket:send(Socket, Data).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc