• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.03
/apps/emqx/src/emqx_quic_data_stream.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%%
18
%% @doc QUIC data stream
19
%% Following the behaviour of emqx_connection:
20
%%  The MQTT packets and their side effects are handled *atomically*.
21
%%
22

23
-module(emqx_quic_data_stream).
24

25
-ifndef(BUILD_WITHOUT_QUIC).
26
-behaviour(quicer_remote_stream).
27

28
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
29
-include_lib("quicer/include/quicer.hrl").
30
-include("emqx_mqtt.hrl").
31
-include("logger.hrl").
32

33
%% Connection Callbacks
34
-export([
35
    init_handoff/4,
36
    post_handoff/3,
37
    send_complete/3,
38
    peer_send_shutdown/3,
39
    peer_send_aborted/3,
40
    peer_receive_aborted/3,
41
    send_shutdown_complete/3,
42
    stream_closed/3,
43
    passive/3
44
]).
45

46
-export([handle_stream_data/4]).
47

48
%% gen_server API
49
-export([activate_data/2]).
50

51
-export([
52
    handle_call/3,
53
    handle_info/2,
54
    handle_continue/2
55
]).
56

57
-type cb_ret() :: quicer_stream:cb_ret().
58
-type cb_state() :: quicer_stream:cb_state().
59
-type error_code() :: quicer:error_code().
60
-type connection_handle() :: quicer:connection_handle().
61
-type stream_handle() :: quicer:stream_handle().
62
-type handoff_data() :: {
63
    emqx_frame:parse_state() | undefined,
64
    emqx_frame:serialize_opts() | undefined,
65
    emqx_channel:channel() | undefined
66
}.
67
%%
68
%% @doc Activate the data handling.
69
%%      Note, data handling is disabled before finishing the validation over control stream.
70
-spec activate_data(pid(), {
71
    emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
72
}) -> ok.
73
activate_data(StreamPid, {PS, Serialize, Channel}) ->
74
    gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity).
54✔
75

76
%%
77
%% @doc Handoff from previous owner, from the connection owner.
78
%%      Note, unlike control stream, there is no acceptor for data streams.
79
%%            The connection owner get new stream, spawn new proc and then handover to it.
80
%%
81
-spec init_handoff(stream_handle(), map(), connection_handle(), quicer:new_stream_props()) ->
82
    {ok, cb_state()}.
83
init_handoff(
84
    Stream,
85
    _StreamOpts,
86
    Connection,
87
    #{is_orphan := true, flags := Flags}
88
) ->
89
    {ok, init_state(Stream, Connection, Flags)}.
925✔
90

91
%%
92
%% @doc Post handoff data stream
93
%%
94
-spec post_handoff(stream_handle(), handoff_data(), cb_state()) -> cb_ret().
95
post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) ->
96
    %% When the channel isn't ready yet.
97
    %% Data stream should wait for activate call with ?MODULE:activate_data/2
98
    {ok, S};
54✔
99
post_handoff(Stream, {PS, Serialize, Channel}, S) ->
100
    ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
871✔
101
    _ = quicer:setopt(Stream, active, 10),
871✔
102
    {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
871✔
103

104
-spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
105
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
106
    %% we abort send with same reason
107
    _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
6✔
108
    {ok, S}.
6✔
109

110
-spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
111
peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
112
    %% we abort receive with same reason
113
    _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
2✔
114
    {ok, S}.
2✔
115

116
-spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret().
117
peer_send_shutdown(Stream, undefined, S) ->
118
    ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
19✔
119
    {ok, S}.
19✔
120

121
-spec send_complete(stream_handle(), IsCanceled :: boolean(), cb_state()) -> cb_ret().
122
send_complete(_Stream, false, S) ->
123
    {ok, S};
22,312✔
124
send_complete(_Stream, true = _IsCanceled, S) ->
125
    {ok, S}.
×
126

127
-spec send_shutdown_complete(stream_handle(), IsGraceful :: boolean(), cb_state()) -> cb_ret().
128
send_shutdown_complete(_Stream, _Flags, S) ->
129
    {ok, S}.
828✔
130

131
-spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_state()) ->
132
    cb_ret().
133
handle_stream_data(
134
    _Stream,
135
    Bin,
136
    _Flags,
137
    #{
138
        is_unidir := false,
139
        channel := Channel,
140
        parse_state := PS,
141
        data_queue := QueuedData,
142
        task_queue := TQ
143
    } = State
144
) when
145
    %% assert get stream data only after channel is created
146
    Channel =/= undefined
147
->
148
    {MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS),
17,624✔
149
    NewTQ = lists:foldl(
17,624✔
150
        fun(Item, Acc) ->
151
            queue:in(Item, Acc)
19,377✔
152
        end,
153
        TQ,
154
        [{incoming, P} || P <- lists:reverse(MQTTPackets)]
19,377✔
155
    ),
156
    {{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}.
17,624✔
157

158
-spec passive(stream_handle(), undefined, cb_state()) -> cb_ret().
159
passive(Stream, undefined, S) ->
160
    _ = quicer:setopt(Stream, active, 10),
1,617✔
161
    {ok, S}.
1,617✔
162

163
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret().
164
stream_closed(
165
    _Stream,
166
    #{
167
        is_conn_shutdown := IsConnShutdown,
168
        is_app_closing := IsAppClosing,
169
        is_shutdown_by_app := IsAppShutdown,
170
        is_closed_remotely := IsRemote,
171
        status := Status,
172
        error := Code
173
    },
174
    S
175
) when
176
    is_boolean(IsConnShutdown) andalso
177
        is_boolean(IsAppClosing) andalso
178
        is_boolean(IsAppShutdown) andalso
179
        is_boolean(IsRemote) andalso
180
        is_atom(Status) andalso
181
        is_integer(Code)
182
->
183
    {stop, normal, S}.
828✔
184

185
-spec handle_call(Request :: term(), From :: {pid(), term()}, cb_state()) -> cb_ret().
186
handle_call(Call, _From, S) ->
187
    do_handle_call(Call, S).
54✔
188

189
-spec handle_continue(Continue :: term(), cb_state()) -> cb_ret().
190
handle_continue(handle_appl_msg, #{task_queue := Q} = S) ->
191
    case queue:out(Q) of
70,912✔
192
        {{value, Item}, Q2} ->
193
            do_handle_appl_msg(Item, S#{task_queue := Q2});
42,205✔
194
        {empty, _Q} ->
195
            {ok, S}
28,707✔
196
    end.
197

198
%%% Internals
199
do_handle_appl_msg(
200
    {outgoing, Packets},
201
    #{
202
        channel := Channel,
203
        stream := _Stream,
204
        serialize := _Serialize
205
    } = S
206
) when
207
    Channel =/= undefined
208
->
209
    case handle_outgoing(Packets, S) of
22,367✔
210
        {ok, Size} ->
211
            ok = emqx_metrics:inc('bytes.sent', Size),
22,366✔
212
            {{continue, handle_appl_msg}, S};
22,366✔
213
        {error, E1, E2} ->
214
            {stop, {E1, E2}, S};
1✔
215
        {error, E} ->
UNCOV
216
            {stop, E, S}
×
217
    end;
218
do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when
219
    Channel =/= undefined
220
->
221
    ok = inc_incoming_stats(Packet),
19,323✔
222
    with_channel(handle_in, [Packet], S);
19,323✔
223
do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when
224
    Channel =/= undefined
225
->
226
    with_channel(handle_in, [FE], S);
54✔
227
do_handle_appl_msg({close, Reason}, S) ->
228
    %% @TODO shall we abort shutdown or graceful shutdown here?
229
    with_channel(handle_info, [{sock_closed, Reason}], S);
54✔
230
do_handle_appl_msg({event, updated}, S) ->
231
    %% Data stream don't care about connection state changes.
232
    {{continue, handle_appl_msg}, S}.
407✔
233

234
handle_info(Deliver = {deliver, _, _}, S) ->
235
    Delivers = [Deliver],
11,138✔
236
    with_channel(handle_deliver, [Delivers], S);
11,138✔
237
handle_info({timeout, Ref, Msg}, S) ->
238
    with_channel(handle_timeout, [Ref, Msg], S);
×
239
handle_info(Info, State) ->
240
    with_channel(handle_info, [Info], State).
×
241

242
with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when
243
    Channel =/= undefined
244
->
245
    case apply(emqx_channel, Fun, Args ++ [Channel]) of
30,569✔
246
        ok ->
247
            {{continue, handle_appl_msg}, S};
×
248
        {ok, Msgs, NewChannel} when is_list(Msgs) ->
249
            {{continue, handle_appl_msg}, S#{
461✔
250
                task_queue := queue:join(Q, queue:from_list(Msgs)),
251
                channel := NewChannel
252
            }};
253
        {ok, Msg, NewChannel} when is_record(Msg, mqtt_packet) ->
254
            {{continue, handle_appl_msg}, S#{
10,768✔
255
                task_queue := queue:in({outgoing, Msg}, Q), channel := NewChannel
256
            }};
257
        %% @FIXME WTH?
258
        {ok, {outgoing, _} = Msg, NewChannel} ->
259
            {{continue, handle_appl_msg}, S#{task_queue := queue:in(Msg, Q), channel := NewChannel}};
11,138✔
260
        {ok, NewChannel} ->
261
            {{continue, handle_appl_msg}, S#{channel := NewChannel}};
8,148✔
262
        %% @TODO optimisation for shutdown wrap
263
        {shutdown, Reason, NewChannel} ->
264
            {stop, {shutdown, Reason}, S#{channel := NewChannel}};
54✔
265
        {shutdown, Reason, Msgs, NewChannel} when is_list(Msgs) ->
266
            %% @TODO handle outgoing?
267
            {stop, {shutdown, Reason}, S#{
×
268
                channel := NewChannel,
269
                task_queue := queue:join(Q, queue:from_list(Msgs))
270
            }};
271
        {shutdown, Reason, Msg, NewChannel} ->
272
            {stop, {shutdown, Reason}, S#{
×
273
                channel := NewChannel,
274
                task_queue := queue:in(Msg, Q)
275
            }}
276
    end.
277

278
handle_outgoing(#mqtt_packet{} = P, S) ->
279
    handle_outgoing([P], S);
11,229✔
280
handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when
281
    is_list(Packets)
282
->
283
    OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)],
22,367✔
284
    %% Send data async but still want send feedback via {quic, send_complete, ...}
285
    Res = quicer:async_send(Stream, OutBin, ?QUICER_SEND_FLAG_SYNC),
22,367✔
286
    ?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}),
22,367✔
287
    [ok = inc_outgoing_stats(P) || P <- Packets],
22,367✔
288
    Res.
22,367✔
289

290
serialize_packet(Packet, Serialize) ->
291
    try emqx_frame:serialize_pkt(Packet, Serialize) of
22,313✔
292
        <<>> ->
293
            ?SLOG(warning, #{
×
294
                msg => "packet_is_discarded",
295
                reason => "frame_is_too_large",
296
                packet => emqx_packet:format(Packet, hidden)
297
            }),
×
298
            ok = emqx_metrics:inc('delivery.dropped.too_large'),
×
299
            ok = emqx_metrics:inc('delivery.dropped'),
×
300
            ok = inc_outgoing_stats({error, message_too_large}),
×
301
            <<>>;
×
302
        Data ->
303
            Data
22,313✔
304
    catch
305
        %% Maybe Never happen.
306
        throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
307
            ?SLOG(info, #{
×
308
                reason => Reason,
309
                input_packet => Packet
310
            }),
×
311
            erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
×
312
        error:Reason:Stacktrace ->
313
            ?SLOG(error, #{
×
314
                input_packet => Packet,
315
                exception => Reason,
316
                stacktrace => Stacktrace
317
            }),
×
318
            erlang:error(?FRAME_SERIALIZE_ERROR)
×
319
    end.
320

321
-spec init_state(
322
    quicer:stream_handle(),
323
    quicer:connection_handle(),
324
    non_neg_integer()
325
) ->
326
    % @TODO
327
    map().
328
init_state(Stream, Connection, OpenFlags) ->
329
    init_state(Stream, Connection, OpenFlags, undefined).
925✔
330

331
init_state(Stream, Connection, OpenFlags, PS) ->
332
    %% quic stream handle
333
    #{
925✔
334
        stream => Stream,
335
        %% quic connection handle
336
        conn => Connection,
337
        %% if it is QUIC unidi stream
338
        is_unidir => quicer:is_unidirectional(OpenFlags),
339
        %% Frame Parse State
340
        parse_state => PS,
341
        %% Peer Stream handle in a pair for type unidir only
342
        peer_stream => undefined,
343
        %% if the stream is locally initiated.
344
        is_local => false,
345
        %% queue binary data when is NOT connected, in reversed order.
346
        data_queue => [],
347
        %% Channel from connection
348
        %% `undefined' means the connection is not connected.
349
        channel => undefined,
350
        %% serialize opts for connection
351
        serialize => undefined,
352
        %% Current working queue
353
        task_queue => queue:new()
354
    }.
355

356
-spec do_handle_call(term(), cb_state()) -> cb_ret().
357
do_handle_call(
358
    {activate, {PS, Serialize, Channel}},
359
    #{
360
        channel := undefined,
361
        stream := Stream,
362
        serialize := undefined
363
    } = S
364
) ->
365
    NewS = S#{channel := Channel, serialize := Serialize, parse_state := PS},
54✔
366
    %% We use quic protocol for flow control, and we don't check return val
367
    case quicer:setopt(Stream, active, true) of
54✔
368
        ok ->
369
            {reply, ok, NewS};
54✔
370
        {error, E} ->
371
            ?SLOG(error, #{msg => "set_stream_active_failed", error => E}),
×
372
            {stop, E, NewS}
×
373
    end;
374
do_handle_call(_Call, _S) ->
375
    {error, unimpl}.
×
376

377
%% @doc return reserved order of Packets
378
parse_incoming(Data, PS) ->
379
    try
17,624✔
380
        do_parse_incoming(Data, [], PS)
17,624✔
381
    catch
382
        throw:{?FRAME_PARSE_ERROR, Reason} ->
383
            ?SLOG(info, #{
54✔
384
                reason => Reason,
385
                input_bytes => Data
386
            }),
54✔
387
            {[{frame_error, Reason}], PS};
54✔
388
        error:Reason:Stacktrace ->
389
            ?SLOG(error, #{
×
390
                input_bytes => Data,
391
                reason => Reason,
392
                stacktrace => Stacktrace
393
            }),
×
394
            {[{frame_error, Reason}], PS}
×
395
    end.
396

397
do_parse_incoming(<<>>, Packets, ParseState) ->
398
    {Packets, ParseState};
17,372✔
399
do_parse_incoming(Data, Packets, ParseState) ->
400
    case emqx_frame:parse(Data, ParseState) of
19,575✔
401
        {more, NParseState} ->
402
            {Packets, NParseState};
198✔
403
        {ok, Packet, Rest, NParseState} ->
404
            do_parse_incoming(Rest, [Packet | Packets], NParseState)
19,323✔
405
    end.
406

407
%% followings are copied from emqx_connection
408
-compile({inline, [inc_incoming_stats/1]}).
409
inc_incoming_stats(Packet = ?PACKET(Type)) ->
410
    inc_counter(recv_pkt, 1),
19,323✔
411
    case Type =:= ?PUBLISH of
19,323✔
412
        true ->
413
            inc_counter(recv_msg, 1),
9,572✔
414
            inc_qos_stats(recv_msg, Packet),
9,572✔
415
            inc_counter(incoming_pubs, 1);
9,572✔
416
        false ->
417
            ok
9,751✔
418
    end,
419
    emqx_metrics:inc_recv(Packet).
19,323✔
420

421
-compile({inline, [inc_outgoing_stats/1]}).
422
inc_outgoing_stats({error, message_too_large}) ->
423
    inc_counter('send_msg.dropped', 1),
×
424
    inc_counter('send_msg.dropped.too_large', 1);
×
425
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
426
    inc_counter(send_pkt, 1),
22,367✔
427
    case Type of
22,367✔
428
        ?PUBLISH ->
429
            inc_counter(send_msg, 1),
11,138✔
430
            inc_counter(outgoing_pubs, 1),
11,138✔
431
            inc_qos_stats(send_msg, Packet);
11,138✔
432
        _ ->
433
            ok
11,229✔
434
    end,
435
    emqx_metrics:inc_sent(Packet).
22,367✔
436

437
inc_counter(Key, Inc) ->
438
    _ = emqx_pd:inc_counter(Key, Inc),
103,820✔
439
    ok.
103,820✔
440

441
inc_qos_stats(Type, Packet) ->
442
    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
20,710✔
443
        undefined ->
444
            ignore;
×
445
        Key ->
446
            inc_counter(Key, 1)
20,710✔
447
    end.
448

449
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
6,202✔
450
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
3,702✔
451
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
1,234✔
452
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
3,212✔
453
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
3,180✔
454
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
3,180✔
455
%% for bad qos
456
inc_qos_stats_key(_, _) -> undefined.
×
457

458
filter_disallowed_out(Packets) ->
459
    lists:filter(fun is_datastream_out_pkt/1, Packets).
22,367✔
460

461
is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) when
462
    Type > 2 andalso Type < 12
463
->
464
    true;
22,313✔
465
is_datastream_out_pkt(_) ->
466
    false.
54✔
467
%% BUILD_WITHOUT_QUIC
468
-else.
469
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc