• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8702269235

16 Apr 2024 08:17AM UTC coverage: 67.82% (-0.01%) from 67.831%
8702269235

push

github

web-flow
Merge pull request #12881 from keynslug/fix/ds-repl-flaky

fix(dsrepl): make replication-related tests more stable

11 of 17 new or added lines in 1 file covered. (64.71%)

32 existing lines in 11 files now uncovered.

37936 of 55936 relevant lines covered (67.82%)

7895.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.47
/apps/emqx/src/emqx_persistent_session_ds.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_persistent_session_ds).
18

19
-behaviour(emqx_session).
20

21
-include("emqx.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("snabbkaffe/include/trace.hrl").
24
-include_lib("stdlib/include/ms_transform.hrl").
25

26
-include("emqx_mqtt.hrl").
27

28
-include("emqx_persistent_session_ds.hrl").
29

30
-ifdef(TEST).
31
-include_lib("proper/include/proper.hrl").
32
-include_lib("eunit/include/eunit.hrl").
33
-endif.
34

35
%% Session API
36
-export([
37
    create/4,
38
    open/4,
39
    destroy/1,
40
    kick_offline_session/1
41
]).
42

43
-export([
44
    info/2,
45
    stats/1
46
]).
47

48
-export([
49
    subscribe/3,
50
    unsubscribe/2,
51
    get_subscription/2
52
]).
53

54
-export([
55
    publish/3,
56
    puback/3,
57
    pubrec/2,
58
    pubrel/2,
59
    pubcomp/3
60
]).
61

62
-export([
63
    deliver/3,
64
    replay/3,
65
    handle_timeout/3,
66
    disconnect/2,
67
    terminate/2
68
]).
69

70
%% Will message handling
71
-export([
72
    clear_will_message/1,
73
    publish_will_message_now/2
74
]).
75

76
%% Managment APIs:
77
-export([
78
    list_client_subscriptions/1
79
]).
80

81
%% session table operations
82
-export([create_tables/0, sync/1]).
83

84
%% internal export used by session GC process
85
-export([destroy_session/1]).
86

87
%% Remove me later (satisfy checks for an unused BPAPI)
88
-export([
89
    do_open_iterator/3,
90
    do_ensure_iterator_closed/1,
91
    do_ensure_all_iterators_closed/1
92
]).
93

94
-export([print_session/1, seqno_diff/4]).
95

96
-ifdef(TEST).
97
-export([
98
    session_open/4,
99
    list_all_sessions/0
100
]).
101
-endif.
102

103
-export_type([
104
    id/0,
105
    seqno/0,
106
    timestamp/0,
107
    topic_filter/0,
108
    subscription_id/0,
109
    subscription/0,
110
    session/0,
111
    stream_state/0
112
]).
113

114
-type seqno() :: non_neg_integer().
115

116
%% Currently, this is the clientid.  We avoid `emqx_types:clientid()' because that can be
117
%% an atom, in theory (?).
118
-type id() :: binary().
119
-type topic_filter() :: emqx_types:topic().
120

121
-type subscription_id() :: integer().
122

123
-type subscription() :: #{
124
    id := subscription_id(),
125
    start_time := emqx_ds:time(),
126
    props := map(),
127
    deleted := boolean()
128
}.
129

130
-define(TIMER_PULL, timer_pull).
131
-define(TIMER_GET_STREAMS, timer_get_streams).
132
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
133
-define(TIMER_RETRY_REPLAY, timer_retry_replay).
134

135
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY.
136

137
%% TODO: Needs configuration?
138
-define(TIMEOUT_RETRY_REPLAY, 1000).
139

140
-type session() :: #{
141
    %% Client ID
142
    id := id(),
143
    %% Configuration:
144
    props := map(),
145
    %% Persistent state:
146
    s := emqx_persistent_session_ds_state:t(),
147
    %% Buffer:
148
    inflight := emqx_persistent_session_ds_inflight:t(),
149
    %% In-progress replay:
150
    %% List of stream replay states to be added to the inflight buffer.
151
    replay => [{_StreamKey, stream_state()}, ...],
152
    %% Timers:
153
    timer() => reference()
154
}.
155

156
-define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)).
157

158
-record(req_sync, {
159
    from :: pid(),
160
    ref :: reference()
161
}).
162

163
-type stream_state() :: #srs{}.
164

165
-type message() :: emqx_types:message().
166
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
167
-type millisecond() :: non_neg_integer().
168
-type clientinfo() :: emqx_types:clientinfo().
169
-type conninfo() :: emqx_session:conninfo().
170
-type replies() :: emqx_session:replies().
171

172
-define(STATS_KEYS, [
173
    durable,
174
    subscriptions_cnt,
175
    subscriptions_max,
176
    inflight_cnt,
177
    inflight_max,
178
    mqueue_len,
179
    mqueue_dropped,
180
    seqno_q1_comm,
181
    seqno_q1_dup,
182
    seqno_q1_next,
183
    seqno_q2_comm,
184
    seqno_q2_dup,
185
    seqno_q2_rec,
186
    seqno_q2_next,
187
    n_streams
188
]).
189

190
%%
191

192
-spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
193
    session().
194
create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
195
    ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
123✔
196

197
-spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
198
    {_IsPresent :: true, session(), []} | false.
199
open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
200
    %% NOTE
201
    %% The fact that we need to concern about discarding all live channels here
202
    %% is essentially a consequence of the in-memory session design, where we
203
    %% have disconnected channels holding onto session state. Ideally, we should
204
    %% somehow isolate those idling not-yet-expired sessions into a separate process
205
    %% space, and move this call back into `emqx_cm` where it belongs.
206
    ok = emqx_cm:takeover_kick(ClientID),
96✔
207
    case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
96✔
208
        Session0 = #{} ->
209
            Session = Session0#{props => Conf},
54✔
210
            {true, ensure_timers(Session), []};
54✔
211
        false ->
212
            false
42✔
213
    end.
214

215
-spec destroy(session() | clientinfo()) -> ok.
216
destroy(#{id := ClientID}) ->
217
    destroy_session(ClientID);
8✔
218
destroy(#{clientid := ClientID}) ->
219
    destroy_session(ClientID).
208✔
220

221
destroy_session(ClientID) ->
222
    session_drop(ClientID, destroy).
274✔
223

224
-spec kick_offline_session(emqx_types:clientid()) -> ok.
225
kick_offline_session(ClientID) ->
226
    case emqx_persistent_message:is_persistence_enabled() of
6✔
227
        true ->
228
            session_drop(ClientID, kicked);
×
229
        false ->
230
            ok
6✔
231
    end.
232

233
%%--------------------------------------------------------------------
234
%% Info, Stats
235
%%--------------------------------------------------------------------
236

237
info(Keys, Session) when is_list(Keys) ->
238
    [{Key, info(Key, Session)} || Key <- Keys];
295✔
239
info(id, #{id := ClientID}) ->
240
    ClientID;
584✔
241
info(clientid, #{id := ClientID}) ->
242
    ClientID;
×
243
info(durable, _) ->
244
    true;
295✔
245
info(created_at, #{s := S}) ->
246
    emqx_persistent_session_ds_state:get_created_at(S);
1,182✔
247
info(is_persistent, #{}) ->
248
    true;
584✔
249
info(subscriptions, #{s := S}) ->
250
    emqx_persistent_session_ds_subs:to_map(S);
586✔
251
info(subscriptions_cnt, #{s := S}) ->
252
    emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
295✔
253
info(subscriptions_max, #{props := Conf}) ->
254
    maps:get(max_subscriptions, Conf);
295✔
255
info(upgrade_qos, #{props := Conf}) ->
256
    maps:get(upgrade_qos, Conf);
587✔
257
info(inflight, #{inflight := Inflight}) ->
258
    Inflight;
2✔
259
info(inflight_cnt, #{inflight := Inflight}) ->
260
    emqx_persistent_session_ds_inflight:n_inflight(Inflight);
295✔
261
info(inflight_max, #{inflight := Inflight}) ->
262
    emqx_persistent_session_ds_inflight:receive_maximum(Inflight);
295✔
263
info(retry_interval, #{props := Conf}) ->
264
    maps:get(retry_interval, Conf);
584✔
265
% info(mqueue, #sessmem{mqueue = MQueue}) ->
266
%     MQueue;
267
info(mqueue_len, #{inflight := Inflight}) ->
268
    emqx_persistent_session_ds_inflight:n_buffered(all, Inflight);
295✔
269
% info(mqueue_max, #sessmem{mqueue = MQueue}) ->
270
%     emqx_mqueue:max_len(MQueue);
271
info(mqueue_dropped, _Session) ->
272
    0;
295✔
273
%% info(next_pkt_id, #{s := S}) ->
274
%%     {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S),
275
%%     PacketId;
276
% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) ->
277
%     AwaitingRel;
278
%% info(awaiting_rel_cnt, #{s := S}) ->
279
%%     seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
280
info(seqno_q1_comm, #{s := S}) ->
281
    emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
295✔
282
info(seqno_q1_dup, #{s := S}) ->
283
    emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S);
295✔
284
info(seqno_q1_next, #{s := S}) ->
285
    emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S);
295✔
286
info(seqno_q2_comm, #{s := S}) ->
287
    emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S);
295✔
288
info(seqno_q2_dup, #{s := S}) ->
289
    emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S);
295✔
290
info(seqno_q2_rec, #{s := S}) ->
291
    emqx_persistent_session_ds_state:get_seqno(?rec, S);
295✔
292
info(seqno_q2_next, #{s := S}) ->
293
    emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
295✔
294
info(n_streams, #{s := S}) ->
295
    emqx_persistent_session_ds_state:fold_streams(
295✔
296
        fun(_, _, Acc) -> Acc + 1 end,
72✔
297
        0,
298
        S
299
    );
300
info(awaiting_rel_max, #{props := Conf}) ->
301
    maps:get(max_awaiting_rel, Conf);
×
302
info(await_rel_timeout, #{props := _Conf}) ->
303
    %% TODO: currently this setting is ignored:
304
    %% maps:get(await_rel_timeout, Conf).
305
    0;
585✔
306
info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs ->
307
    {error, not_implemented}.
×
308

309
-spec stats(session()) -> emqx_types:stats().
310
stats(Session) ->
311
    info(?STATS_KEYS, Session).
295✔
312

313
%% Used by management API
314
-spec print_session(emqx_types:clientid()) -> map() | undefined.
315
print_session(ClientId) ->
316
    case try_get_live_session(ClientId) of
11✔
317
        {Pid, SessionState} ->
318
            maps:update_with(
7✔
319
                s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
320
                    '_alive' => {true, Pid}
321
                }
322
            );
323
        not_found ->
324
            case emqx_persistent_session_ds_state:print_session(ClientId) of
4✔
325
                undefined ->
326
                    undefined;
1✔
327
                S ->
328
                    #{s => S, '_alive' => false}
3✔
329
            end;
330
        not_persistent ->
331
            undefined
×
332
    end.
333

334
%%--------------------------------------------------------------------
335
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
336
%%--------------------------------------------------------------------
337

338
-spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
339
    {ok, session()} | {error, emqx_types:reason_code()}.
340
subscribe(
341
    TopicFilter,
342
    SubOpts,
343
    Session = #{id := ID, s := S0}
344
) ->
345
    case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
107✔
346
        undefined ->
347
            %% TODO: max subscriptions
348

349
            %% N.B.: we chose to update the router before adding the
350
            %% subscription to the session/iterator table. The
351
            %% reasoning for this is as follows:
352
            %%
353
            %% Messages matching this topic filter should start to be
354
            %% persisted as soon as possible to avoid missing
355
            %% messages. If this is the first such persistent session
356
            %% subscription, it's important to do so early on.
357
            %%
358
            %% This could, in turn, lead to some inconsistency: if
359
            %% such a route gets created but the session/iterator data
360
            %% fails to be updated accordingly, we have a dangling
361
            %% route. To remove such dangling routes, we may have a
362
            %% periodic GC process that removes routes that do not
363
            %% have a matching persistent subscription. Also, route
364
            %% operations use dirty mnesia operations, which
365
            %% inherently have room for inconsistencies.
366
            %%
367
            %% In practice, we use the iterator reference table as a
368
            %% source of truth, since it is guarded by a transaction
369
            %% context: we consider a subscription operation to be
370
            %% successful if it ended up changing this table. Both
371
            %% router and iterator information can be reconstructed
372
            %% from this table, if needed.
373
            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
95✔
374
            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
95✔
375
            Subscription = #{
95✔
376
                start_time => now_ms(),
377
                props => SubOpts,
378
                id => SubId,
379
                deleted => false
380
            },
381
            IsNew = true;
95✔
382
        Subscription0 = #{} ->
383
            Subscription = Subscription0#{props => SubOpts},
12✔
384
            IsNew = false,
12✔
385
            S1 = S0
12✔
386
    end,
387
    S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1),
107✔
388
    ?tp(persistent_session_ds_subscription_added, #{
107✔
389
        topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
390
    }),
391
    {ok, Session#{s => S}}.
107✔
392

393
-spec unsubscribe(topic_filter(), session()) ->
394
    {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
395
unsubscribe(
396
    TopicFilter,
397
    Session = #{id := ID, s := S0}
398
) ->
399
    case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
8✔
400
        undefined ->
401
            {error, ?RC_NO_SUBSCRIPTION_EXISTED};
×
402
        Subscription = #{props := SubOpts} ->
403
            S = do_unsubscribe(ID, TopicFilter, Subscription, S0),
8✔
404
            {ok, Session#{s => S}, SubOpts}
8✔
405
    end.
406

407
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
408
    emqx_persistent_session_ds_state:t().
409
do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) ->
410
    S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0),
47✔
411
    ?tp(persistent_session_ds_subscription_delete, #{
47✔
412
        session_id => SessionId, topic_filter => TopicFilter
413
    }),
414
    S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
47✔
415
    ?tp_span(
47✔
416
        persistent_session_ds_subscription_route_delete,
417
        #{session_id => SessionId, topic_filter => TopicFilter},
47✔
418
        ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
47✔
419
    ),
420
    S.
47✔
421

422
-spec get_subscription(topic_filter(), session()) ->
423
    emqx_types:subopts() | undefined.
424
get_subscription(TopicFilter, #{s := S}) ->
425
    case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
110✔
426
        _Subscription = #{props := SubOpts} ->
427
            SubOpts;
12✔
428
        undefined ->
429
            undefined
98✔
430
    end.
431

432
%%--------------------------------------------------------------------
433
%% Client -> Broker: PUBLISH
434
%%--------------------------------------------------------------------
435

436
-spec publish(emqx_types:packet_id(), emqx_types:message(), session()) ->
437
    {ok, emqx_types:publish_result(), session()}
438
    | {error, emqx_types:reason_code()}.
439
publish(_PacketId, Msg, Session) ->
440
    %% TODO: QoS2
441
    Result = emqx_broker:publish(Msg),
1✔
442
    {ok, Result, Session}.
1✔
443

444
%%--------------------------------------------------------------------
445
%% Client -> Broker: PUBACK
446
%%--------------------------------------------------------------------
447

448
-spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
449
    {ok, emqx_types:message(), replies(), session()}
450
    | {error, emqx_types:reason_code()}.
451
puback(_ClientInfo, PacketId, Session0) ->
452
    case update_seqno(puback, PacketId, Session0) of
559✔
453
        {ok, Msg, Session} ->
454
            {ok, Msg, [], pull_now(Session)};
559✔
455
        Error ->
456
            Error
×
457
    end.
458

459
%%--------------------------------------------------------------------
460
%% Client -> Broker: PUBREC
461
%%--------------------------------------------------------------------
462

463
-spec pubrec(emqx_types:packet_id(), session()) ->
464
    {ok, emqx_types:message(), session()}
465
    | {error, emqx_types:reason_code()}.
466
pubrec(PacketId, Session0) ->
467
    case update_seqno(pubrec, PacketId, Session0) of
51✔
468
        {ok, Msg, Session} ->
469
            {ok, Msg, Session};
51✔
470
        Error = {error, _} ->
471
            Error
×
472
    end.
473

474
%%--------------------------------------------------------------------
475
%% Client -> Broker: PUBREL
476
%%--------------------------------------------------------------------
477

478
-spec pubrel(emqx_types:packet_id(), session()) ->
479
    {ok, session()} | {error, emqx_types:reason_code()}.
480
pubrel(_PacketId, Session = #{}) ->
481
    % TODO: stub
482
    {ok, Session}.
1✔
483

484
%%--------------------------------------------------------------------
485
%% Client -> Broker: PUBCOMP
486
%%--------------------------------------------------------------------
487

488
-spec pubcomp(clientinfo(), emqx_types:packet_id(), session()) ->
489
    {ok, emqx_types:message(), replies(), session()}
490
    | {error, emqx_types:reason_code()}.
491
pubcomp(_ClientInfo, PacketId, Session0) ->
492
    case update_seqno(pubcomp, PacketId, Session0) of
48✔
493
        {ok, Msg, Session} ->
494
            {ok, Msg, [], pull_now(Session)};
39✔
495
        Error = {error, _} ->
496
            Error
9✔
497
    end.
498

499
%%--------------------------------------------------------------------
500

501
-spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
502
    {ok, replies(), session()}.
503
deliver(ClientInfo, Delivers, Session0) ->
504
    %% Durable sessions still have to handle some transient messages.
505
    %% For example, retainer sends messages to the session directly.
506
    Session = lists:foldl(
3✔
507
        fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers
3✔
508
    ),
509
    {ok, [], pull_now(Session)}.
3✔
510

511
-spec handle_timeout(clientinfo(), _Timeout, session()) ->
512
    {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
513
handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
514
    {Publishes, Session1} =
2,016✔
515
        case ?IS_REPLAY_ONGOING(Session0) of
516
            false ->
517
                drain_buffer(fetch_new_messages(Session0, ClientInfo));
1,957✔
518
            true ->
519
                {[], Session0}
59✔
520
        end,
521
    Timeout =
2,016✔
522
        case Publishes of
523
            [] ->
524
                get_config(ClientInfo, [idle_poll_interval]);
1,923✔
525
            [_ | _] ->
526
                0
93✔
527
        end,
528
    Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
2,016✔
529
    {ok, Publishes, Session};
2,016✔
530
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
531
    Session = replay_streams(Session0, ClientInfo),
6✔
532
    {ok, [], Session};
6✔
533
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
534
    S1 = emqx_persistent_session_ds_subs:gc(S0),
1,453✔
535
    S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
1,453✔
536
    Interval = get_config(ClientInfo, [renew_streams_interval]),
1,453✔
537
    Session = emqx_session:ensure_timer(
1,453✔
538
        ?TIMER_GET_STREAMS,
539
        Interval,
540
        Session0#{s => S}
541
    ),
542
    {ok, [], Session};
1,453✔
543
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
544
    S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)),
1,455✔
545
    Session = emqx_session:ensure_timer(
1,455✔
546
        ?TIMER_BUMP_LAST_ALIVE_AT,
547
        bump_interval(),
548
        Session0#{s => S}
549
    ),
550
    {ok, [], Session};
1,455✔
551
handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S0}) ->
552
    S = emqx_persistent_session_ds_state:commit(S0),
×
553
    From ! Ref,
×
554
    {ok, [], Session#{s => S}};
×
555
handle_timeout(_ClientInfo, Timeout, Session) ->
556
    ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
×
557
    {ok, [], Session}.
×
558

559
bump_last_alive(S0) ->
560
    %% Note: we take a pessimistic approach here and assume that the client will be alive
561
    %% until the next bump timeout.  With this, we avoid garbage collecting this session
562
    %% too early in case the session/connection/node crashes earlier without having time
563
    %% to commit the time.
564
    EstimatedLastAliveAt = now_ms() + bump_interval(),
1,578✔
565
    emqx_persistent_session_ds_state:set_last_alive_at(EstimatedLastAliveAt, S0).
1,578✔
566

567
-spec replay(clientinfo(), [], session()) ->
568
    {ok, replies(), session()}.
569
replay(ClientInfo, [], Session0 = #{s := S0}) ->
570
    Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0),
54✔
571
    Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
54✔
572
    {ok, [], Session}.
54✔
573

574
replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
575
    case replay_batch(Srs0, Session0, ClientInfo) of
34✔
576
        Session = #{} ->
577
            replay_streams(Session#{replay := Rest}, ClientInfo);
28✔
578
        {error, recoverable, Reason} ->
579
            RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
6✔
580
            ?SLOG(warning, #{
6✔
581
                msg => "failed_to_fetch_replay_batch",
582
                stream => StreamKey,
583
                reason => Reason,
584
                class => recoverable,
585
                retry_in_ms => RetryTimeout
586
            }),
×
587
            emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0)
6✔
588
        %% TODO: Handle unrecoverable errors.
589
    end;
590
replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
591
    Session = maps:remove(replay, Session0),
54✔
592
    %% Note: we filled the buffer with the historical messages, and
593
    %% from now on we'll rely on the normal inflight/flow control
594
    %% mechanisms to replay them:
595
    pull_now(Session).
54✔
596

597
-spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_).
598
replay_batch(Srs0, Session0, ClientInfo) ->
599
    #srs{batch_size = BatchSize} = Srs0,
34✔
600
    case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of
34✔
601
        {ok, Srs, Session} ->
602
            %% Assert:
603
            Srs =:= Srs0 orelse
28✔
604
                ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
×
605
                    expected => Srs0,
606
                    got => Srs
607
                }),
608
            Session;
28✔
609
        {error, _, _} = Error ->
610
            Error
6✔
611
    end.
612

613
%%--------------------------------------------------------------------
614

615
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
616
disconnect(Session = #{s := S0}, ConnInfo) ->
617
    S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
145✔
618
    S2 =
145✔
619
        case ConnInfo of
620
            #{expiry_interval := EI} when is_number(EI) ->
621
                emqx_persistent_session_ds_state:set_expiry_interval(EI, S1);
145✔
622
            _ ->
623
                S1
×
624
        end,
625
    S = emqx_persistent_session_ds_state:commit(S2),
145✔
626
    {shutdown, Session#{s => S}}.
143✔
627

628
-spec terminate(Reason :: term(), session()) -> ok.
629
terminate(_Reason, Session = #{id := Id, s := S}) ->
630
    maybe_set_will_message_timer(Session),
169✔
631
    _ = emqx_persistent_session_ds_state:commit(S),
169✔
632
    ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
169✔
633
    ok.
169✔
634

635
%%--------------------------------------------------------------------
636
%% Management APIs (dashboard)
637
%%--------------------------------------------------------------------
638

639
-spec list_client_subscriptions(emqx_types:clientid()) ->
640
    {node() | undefined, [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]}
641
    | {error, not_found}.
642
list_client_subscriptions(ClientId) ->
643
    case emqx_persistent_message:is_persistence_enabled() of
15✔
644
        true ->
645
            %% TODO: this is not the most optimal implementation, since it
646
            %% should be possible to avoid reading extra data (streams, etc.)
647
            case print_session(ClientId) of
3✔
648
                Sess = #{s := #{subscriptions := Subs}} ->
649
                    Node =
2✔
650
                        case Sess of
651
                            #{'_alive' := {true, Pid}} ->
652
                                node(Pid);
1✔
653
                            _ ->
654
                                undefined
1✔
655
                        end,
656
                    SubList =
2✔
657
                        maps:fold(
658
                            fun(Topic, #{props := SubProps}, Acc) ->
659
                                Elem = {Topic, SubProps},
6✔
660
                                [Elem | Acc]
6✔
661
                            end,
662
                            [],
663
                            Subs
664
                        ),
665
                    {Node, SubList};
2✔
666
                undefined ->
667
                    {error, not_found}
1✔
668
            end;
669
        false ->
670
            {error, not_found}
12✔
671
    end.
672

673
%%--------------------------------------------------------------------
674
%% Session tables operations
675
%%--------------------------------------------------------------------
676

677
create_tables() ->
678
    emqx_persistent_session_ds_state:create_tables().
28✔
679

680
%% @doc Force syncing of the transient state to persistent storage
681
sync(ClientId) ->
682
    case emqx_cm:lookup_channels(ClientId) of
×
683
        [Pid] ->
684
            Ref = monitor(process, Pid),
×
685
            Pid ! {emqx_session, #req_sync{from = self(), ref = Ref}},
×
686
            receive
×
687
                {'DOWN', Ref, process, _Pid, Reason} ->
688
                    {error, Reason};
×
689
                Ref ->
690
                    demonitor(Ref, [flush]),
×
691
                    ok
×
692
            end;
693
        [] ->
694
            {error, noproc}
×
695
    end.
696

697
%% @doc Called when a client connects. This function looks up a
698
%% session or returns `false` if previous one couldn't be found.
699
%%
700
%% Note: session API doesn't handle session takeovers, it's the job of
701
%% the broker.
702
-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
703
    session() | false.
704
session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
705
    NowMS = now_ms(),
98✔
706
    case emqx_persistent_session_ds_state:open(SessionId) of
98✔
707
        {ok, S0} ->
708
            EI = emqx_persistent_session_ds_state:get_expiry_interval(S0),
61✔
709
            LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0),
61✔
710
            case NowMS >= LastAliveAt + EI of
61✔
711
                true ->
712
                    session_drop(SessionId, expired),
5✔
713
                    false;
5✔
714
                false ->
715
                    ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}),
56✔
716
                    %% New connection being established
717
                    S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
56✔
718
                    S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
56✔
719
                    S3 = emqx_persistent_session_ds_state:set_peername(
56✔
720
                        maps:get(peername, NewConnInfo), S2
721
                    ),
722
                    S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
56✔
723
                    S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4),
56✔
724
                    S = emqx_persistent_session_ds_state:commit(S5),
56✔
725
                    Inflight = emqx_persistent_session_ds_inflight:new(
56✔
726
                        receive_maximum(NewConnInfo)
727
                    ),
728
                    #{
56✔
729
                        id => SessionId,
730
                        s => S,
731
                        inflight => Inflight,
732
                        props => #{}
733
                    }
734
            end;
735
        undefined ->
736
            false
37✔
737
    end.
738

739
-spec session_ensure_new(
740
    id(),
741
    emqx_types:clientinfo(),
742
    emqx_types:conninfo(),
743
    emqx_maybe:t(message()),
744
    emqx_session:conf()
745
) ->
746
    session().
747
session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
748
    ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
123✔
749
    Now = now_ms(),
123✔
750
    S0 = emqx_persistent_session_ds_state:create_new(Id),
123✔
751
    S1 = emqx_persistent_session_ds_state:set_expiry_interval(expiry_interval(ConnInfo), S0),
123✔
752
    S2 = bump_last_alive(S1),
123✔
753
    S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2),
123✔
754
    S4 = lists:foldl(
123✔
755
        fun(Track, Acc) ->
756
            emqx_persistent_session_ds_state:put_seqno(Track, 0, Acc)
861✔
757
        end,
758
        S3,
759
        [
760
            ?next(?QOS_1),
761
            ?dup(?QOS_1),
762
            ?committed(?QOS_1),
763
            ?next(?QOS_2),
764
            ?dup(?QOS_2),
765
            ?rec,
766
            ?committed(?QOS_2)
767
        ]
768
    ),
769
    S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
123✔
770
    S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5),
123✔
771
    S = emqx_persistent_session_ds_state:commit(S6),
123✔
772
    #{
123✔
773
        id => Id,
774
        props => Conf,
775
        s => S,
776
        inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
777
    }.
778

779
%% @doc Called when a client reconnects with `clean session=true' or
780
%% during session GC
781
-spec session_drop(id(), _Reason) -> ok.
782
session_drop(ID, Reason) ->
783
    case emqx_persistent_session_ds_state:open(ID) of
279✔
784
        {ok, S0} ->
785
            ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
79✔
786
            _S = emqx_persistent_session_ds_subs:fold(
79✔
787
                fun(TopicFilter, Subscription, S) ->
788
                    do_unsubscribe(ID, TopicFilter, Subscription, S)
39✔
789
                end,
790
                S0,
791
                S0
792
            ),
793
            emqx_persistent_session_ds_state:delete(ID);
79✔
794
        undefined ->
795
            ok
200✔
796
    end.
797

798
now_ms() ->
799
    erlang:system_time(millisecond).
2,039✔
800

801
%%--------------------------------------------------------------------
802
%% RPC targets (v1)
803
%%--------------------------------------------------------------------
804

805
%% RPC target.
806
-spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) ->
807
    {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}.
808
do_open_iterator(_TopicFilter, _StartMS, _IteratorID) ->
809
    {error, not_implemented}.
×
810

811
%% RPC target.
812
-spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok.
813
do_ensure_iterator_closed(_IteratorID) ->
814
    ok.
×
815

816
%% RPC target.
817
-spec do_ensure_all_iterators_closed(id()) -> ok.
818
do_ensure_all_iterators_closed(_DSSessionID) ->
819
    ok.
×
820

821
%%--------------------------------------------------------------------
822
%% Normal replay:
823
%%--------------------------------------------------------------------
824

825
fetch_new_messages(Session = #{s := S}, ClientInfo) ->
826
    Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S),
1,957✔
827
    fetch_new_messages(Streams, Session, ClientInfo).
1,957✔
828

829
fetch_new_messages([], Session, _ClientInfo) ->
830
    Session;
1,957✔
831
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
832
    BatchSize = get_config(ClientInfo, [batch_size]),
2,180✔
833
    case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
2,180✔
834
        true ->
835
            %% Buffer is full:
836
            Session0;
×
837
        false ->
838
            Session = new_batch(I, BatchSize, Session0, ClientInfo),
2,180✔
839
            fetch_new_messages(Streams, Session, ClientInfo)
2,180✔
840
    end.
841

842
new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
843
    SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
2,180✔
844
    SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
2,180✔
845
    Srs1 = Srs0#srs{
2,180✔
846
        first_seqno_qos1 = SN1,
847
        first_seqno_qos2 = SN2,
848
        batch_size = 0,
849
        last_seqno_qos1 = SN1,
850
        last_seqno_qos2 = SN2
851
    },
852
    case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
2,180✔
853
        {ok, Srs, Session} ->
854
            S1 = emqx_persistent_session_ds_state:put_seqno(
2,180✔
855
                ?next(?QOS_1),
856
                Srs#srs.last_seqno_qos1,
857
                S0
858
            ),
859
            S2 = emqx_persistent_session_ds_state:put_seqno(
2,180✔
860
                ?next(?QOS_2),
861
                Srs#srs.last_seqno_qos2,
862
                S1
863
            ),
864
            S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
2,180✔
865
            Session#{s => S};
2,180✔
866
        {error, Class, Reason} ->
867
            %% TODO: Handle unrecoverable error.
868
            ?SLOG(info, #{
×
869
                msg => "failed_to_fetch_batch",
870
                stream => StreamKey,
871
                reason => Reason,
872
                class => Class
873
            }),
×
874
            Session0
×
875
    end.
876

877
enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) ->
878
    #srs{
2,214✔
879
        it_begin = ItBegin0,
880
        it_end = ItEnd0,
881
        first_seqno_qos1 = FirstSeqnoQos1,
882
        first_seqno_qos2 = FirstSeqnoQos2
883
    } = Srs0,
884
    ItBegin =
2,214✔
885
        case IsReplay of
886
            true -> ItBegin0;
34✔
887
            false -> ItEnd0
2,180✔
888
        end,
889
    case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
2,214✔
890
        {ok, ItEnd, Messages} ->
891
            {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
2,208✔
892
                IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
893
            ),
894
            Srs = Srs0#srs{
2,208✔
895
                it_begin = ItBegin,
896
                it_end = ItEnd,
897
                %% TODO: it should be possible to avoid calling
898
                %% length here by diffing size of inflight before
899
                %% and after inserting messages:
900
                batch_size = length(Messages),
901
                last_seqno_qos1 = LastSeqnoQos1,
902
                last_seqno_qos2 = LastSeqnoQos2
903
            },
904
            {ok, Srs, Session#{inflight := Inflight}};
2,208✔
905
        {ok, end_of_stream} ->
906
            %% No new messages; just update the end iterator:
907
            Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0},
×
908
            {ok, Srs, Session#{inflight := Inflight0}};
×
909
        {error, _, _} = Error ->
910
            Error
6✔
911
    end.
912

913
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
914
%%     K.
915

916
process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) ->
917
    {Inflight, LastSeqNoQos1, LastSeqNoQos2};
2,208✔
918
process_batch(
919
    IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0
920
) ->
921
    #{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session,
706✔
922
    {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV,
706✔
923
    Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
706✔
924
    Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
706✔
925
    Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S),
706✔
926
    Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S),
706✔
927
    Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S),
706✔
928
    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
706✔
929
    Msgs = [
706✔
930
        Msg
712✔
931
     || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []),
706✔
932
        Msg <- begin
712✔
933
            #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
712✔
934
            emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
712✔
935
        end
936
    ],
937
    {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl(
706✔
938
        fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) ->
939
            case Qos of
712✔
940
                ?QOS_0 ->
941
                    SeqNoQos1 = SeqNoQos10,
18✔
942
                    SeqNoQos2 = SeqNoQos20;
18✔
943
                ?QOS_1 ->
944
                    SeqNoQos1 = inc_seqno(?QOS_1, SeqNoQos10),
614✔
945
                    SeqNoQos2 = SeqNoQos20;
614✔
946
                ?QOS_2 ->
947
                    SeqNoQos1 = SeqNoQos10,
80✔
948
                    SeqNoQos2 = inc_seqno(?QOS_2, SeqNoQos20)
80✔
949
            end,
950
            {
712✔
951
                case Qos of
952
                    ?QOS_0 when IsReplay ->
953
                        %% We ignore QoS 0 messages during replay:
954
                        Acc;
×
955
                    ?QOS_0 ->
956
                        emqx_persistent_session_ds_inflight:push({undefined, Msg}, Acc);
18✔
957
                    ?QOS_1 when SeqNoQos1 =< Comm1 ->
958
                        %% QoS1 message has been acked by the client, ignore:
959
                        Acc;
15✔
960
                    ?QOS_1 when SeqNoQos1 =< Dup1 ->
961
                        %% QoS1 message has been sent but not
962
                        %% acked. Retransmit:
963
                        Msg1 = emqx_message:set_flag(dup, true, Msg),
27✔
964
                        emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg1}, Acc);
27✔
965
                    ?QOS_1 ->
966
                        emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg}, Acc);
572✔
967
                    ?QOS_2 when SeqNoQos2 =< Comm2 ->
968
                        %% QoS2 message has been PUBCOMP'ed by the client, ignore:
969
                        Acc;
×
970
                    ?QOS_2 when SeqNoQos2 =< Rec ->
971
                        %% QoS2 message has been PUBREC'ed by the client, resend PUBREL:
972
                        emqx_persistent_session_ds_inflight:push({pubrel, SeqNoQos2}, Acc);
9✔
973
                    ?QOS_2 when SeqNoQos2 =< Dup2 ->
974
                        %% QoS2 message has been sent, but we haven't received PUBREC.
975
                        %%
976
                        %% TODO: According to the MQTT standard 4.3.3:
977
                        %% DUP flag is never set for QoS2 messages? We
978
                        %% do so for mem sessions, though.
979
                        Msg1 = emqx_message:set_flag(dup, true, Msg),
14✔
980
                        emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg1}, Acc);
14✔
981
                    ?QOS_2 ->
982
                        emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg}, Acc)
57✔
983
                end,
984
                SeqNoQos1,
985
                SeqNoQos2
986
            }
987
        end,
988
        {Inflight0, FirstSeqNoQos1, FirstSeqNoQos2},
989
        Msgs
990
    ),
991
    process_batch(
706✔
992
        IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
993
    ).
994

995
%%--------------------------------------------------------------------
996
%% Transient messages
997
%%--------------------------------------------------------------------
998

999
enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) ->
1000
    %% TODO: Such messages won't be retransmitted, should the session
1001
    %% reconnect before transient messages are acked.
1002
    %%
1003
    %% Proper solution could look like this: session publishes
1004
    %% transient messages to a separate DS DB that serves as a queue,
1005
    %% then subscribes to a special system topic that contains the
1006
    %% queued messages. Since streams in this DB are exclusive to the
1007
    %% session, messages from the queue can be dropped as soon as they
1008
    %% are acked.
1009
    Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
3✔
1010
    Msgs = [
3✔
1011
        Msg
3✔
1012
     || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []),
3✔
1013
        Msg <- begin
3✔
1014
            #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
3✔
1015
            emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
3✔
1016
        end
1017
    ],
1018
    lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
3✔
1019

1020
do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->
1021
    case Qos of
3✔
1022
        ?QOS_0 ->
1023
            S = S0,
1✔
1024
            Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0);
1✔
1025
        QoS when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
1026
            SeqNo = inc_seqno(
2✔
1027
                QoS, emqx_persistent_session_ds_state:get_seqno(?next(QoS), S0)
1028
            ),
1029
            S = emqx_persistent_session_ds_state:put_seqno(?next(QoS), SeqNo, S0),
2✔
1030
            Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0)
2✔
1031
    end,
1032
    Session#{
3✔
1033
        inflight => Inflight,
1034
        s => S
1035
    }.
1036

1037
%%--------------------------------------------------------------------
1038
%% Buffer drain
1039
%%--------------------------------------------------------------------
1040

1041
drain_buffer(Session = #{inflight := Inflight0, s := S0}) ->
1042
    {Publishes, Inflight, S} = do_drain_buffer(Inflight0, S0, []),
1,957✔
1043
    {Publishes, Session#{inflight => Inflight, s := S}}.
1,957✔
1044

1045
do_drain_buffer(Inflight0, S0, Acc) ->
1046
    case emqx_persistent_session_ds_inflight:pop(Inflight0) of
2,657✔
1047
        undefined ->
1048
            {lists:reverse(Acc), Inflight0, S0};
1,957✔
1049
        {{pubrel, SeqNo}, Inflight} ->
1050
            Publish = {pubrel, seqno_to_packet_id(?QOS_2, SeqNo)},
9✔
1051
            do_drain_buffer(Inflight, S0, [Publish | Acc]);
9✔
1052
        {{SeqNo, Msg}, Inflight} ->
1053
            case Msg#message.qos of
691✔
1054
                ?QOS_0 ->
1055
                    do_drain_buffer(Inflight, S0, [{undefined, Msg} | Acc]);
19✔
1056
                Qos ->
1057
                    S = emqx_persistent_session_ds_state:put_seqno(?dup(Qos), SeqNo, S0),
672✔
1058
                    Publish = {seqno_to_packet_id(Qos, SeqNo), Msg},
672✔
1059
                    do_drain_buffer(Inflight, S, [Publish | Acc])
672✔
1060
            end
1061
    end.
1062

1063
%%--------------------------------------------------------------------------------
1064

1065
%% TODO: find a more reliable way to perform actions that have side
1066
%% effects. Add `CBM:init' callback to the session behavior?
1067
-spec ensure_timers(session()) -> session().
1068
ensure_timers(Session0) ->
1069
    Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0),
177✔
1070
    Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
177✔
1071
    emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
177✔
1072

1073
-spec pull_now(session()) -> session().
1074
pull_now(Session) ->
1075
    emqx_session:reset_timer(?TIMER_PULL, 0, Session).
655✔
1076

1077
-spec receive_maximum(conninfo()) -> pos_integer().
1078
receive_maximum(ConnInfo) ->
1079
    %% Note: the default value should be always set by the channel
1080
    %% with respect to the zone configuration, but the type spec
1081
    %% indicates that it's optional.
1082
    maps:get(receive_maximum, ConnInfo, 65_535).
179✔
1083

1084
-spec expiry_interval(conninfo()) -> millisecond().
1085
expiry_interval(ConnInfo) ->
1086
    maps:get(expiry_interval, ConnInfo, 0).
123✔
1087

1088
%% Note: we don't allow overriding `last_alive_update_interval' per
1089
%% zone, since the GC process is responsible for all sessions
1090
%% regardless of the zone.
1091
bump_interval() ->
1092
    emqx_config:get([session_persistence, last_alive_update_interval]).
3,033✔
1093

1094
get_config(#{zone := Zone}, Key) ->
1095
    emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
5,556✔
1096

1097
-spec try_get_live_session(emqx_types:clientid()) ->
1098
    {pid(), session()} | not_found | not_persistent.
1099
try_get_live_session(ClientId) ->
1100
    case emqx_cm:lookup_channels(local, ClientId) of
11✔
1101
        [Pid] ->
1102
            try
8✔
1103
                #{channel := ChanState} = emqx_connection:get_state(Pid),
8✔
1104
                case emqx_channel:info(impl, ChanState) of
7✔
1105
                    ?MODULE ->
1106
                        {Pid, emqx_channel:info(session_state, ChanState)};
7✔
1107
                    _ ->
1108
                        not_persistent
×
1109
                end
1110
            catch
1111
                _:_ ->
1112
                    not_found
1✔
1113
            end;
1114
        _ ->
1115
            not_found
3✔
1116
    end.
1117

1118
%%--------------------------------------------------------------------
1119
%% SeqNo tracking
1120
%% --------------------------------------------------------------------
1121

1122
-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
1123
    {ok, emqx_types:message(), session()} | {error, _}.
1124
update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) ->
1125
    SeqNo = packet_id_to_seqno(PacketId, S),
658✔
1126
    case Track of
658✔
1127
        puback ->
1128
            SeqNoKey = ?committed(?QOS_1),
559✔
1129
            Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0);
559✔
1130
        pubrec ->
1131
            SeqNoKey = ?rec,
51✔
1132
            Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0);
51✔
1133
        pubcomp ->
1134
            SeqNoKey = ?committed(?QOS_2),
48✔
1135
            Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0)
48✔
1136
    end,
1137
    case Result of
658✔
1138
        {ok, Inflight} ->
1139
            %% TODO: we pass a bogus message into the hook:
1140
            Msg = emqx_message:make(SessionId, <<>>, <<>>),
649✔
1141
            {ok, Msg, Session#{
649✔
1142
                s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
1143
                inflight => Inflight
1144
            }};
1145
        {error, Expected} ->
1146
            ?SLOG(warning, #{
9✔
1147
                msg => "out-of-order_commit",
1148
                track => Track,
1149
                packet_id => PacketId,
1150
                seqno => SeqNo,
1151
                expected => Expected
1152
            }),
×
1153
            {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
9✔
1154
    end.
1155

1156
%%--------------------------------------------------------------------
1157
%% Functions for dealing with the sequence number and packet ID
1158
%% generation
1159
%% --------------------------------------------------------------------
1160

1161
-define(EPOCH_BITS, 15).
1162
-define(PACKET_ID_MASK, 2#111_1111_1111_1111).
1163

1164
%% Epoch size = `16#10000 div 2' since we generate different sets of
1165
%% packet IDs for QoS1 and QoS2:
1166
-define(EPOCH_SIZE, 16#8000).
1167

1168
%% Reconstruct session counter by adding most significant bits from
1169
%% the current counter to the packet id:
1170
-spec packet_id_to_seqno(emqx_types:packet_id(), emqx_persistent_session_ds_state:t()) ->
1171
    seqno().
1172
packet_id_to_seqno(PacketId, S) ->
1173
    NextSeqNo = emqx_persistent_session_ds_state:get_seqno(?next(packet_id_to_qos(PacketId)), S),
1,658✔
1174
    Epoch = NextSeqNo bsr ?EPOCH_BITS,
1,658✔
1175
    SeqNo = (Epoch bsl ?EPOCH_BITS) + (PacketId band ?PACKET_ID_MASK),
1,658✔
1176
    case SeqNo =< NextSeqNo of
1,658✔
1177
        true ->
1178
            SeqNo;
1,220✔
1179
        false ->
1180
            SeqNo - ?EPOCH_SIZE
438✔
1181
    end.
1182

1183
-spec inc_seqno(?QOS_1 | ?QOS_2, seqno()) -> emqx_types:packet_id().
1184
inc_seqno(Qos, SeqNo) ->
1185
    NextSeqno = SeqNo + 1,
50,912✔
1186
    case seqno_to_packet_id(Qos, NextSeqno) of
50,912✔
1187
        0 ->
1188
            %% We skip sequence numbers that lead to PacketId = 0 to
1189
            %% simplify math. Note: it leads to occasional gaps in the
1190
            %% sequence numbers.
UNCOV
1191
            NextSeqno + 1;
×
1192
        _ ->
1193
            NextSeqno
50,912✔
1194
    end.
1195

1196
%% Note: we use the most significant bit to store the QoS.
1197
seqno_to_packet_id(?QOS_1, SeqNo) ->
1198
    SeqNo band ?PACKET_ID_MASK;
27,670✔
1199
seqno_to_packet_id(?QOS_2, SeqNo) ->
1200
    SeqNo band ?PACKET_ID_MASK bor ?EPOCH_SIZE.
26,923✔
1201

1202
packet_id_to_qos(PacketId) ->
1203
    PacketId bsr ?EPOCH_BITS + 1.
1,658✔
1204

1205
seqno_diff(Qos, A, B, S) ->
1206
    seqno_diff(
×
1207
        Qos,
1208
        emqx_persistent_session_ds_state:get_seqno(A, S),
1209
        emqx_persistent_session_ds_state:get_seqno(B, S)
1210
    ).
1211

1212
%% Dialyzer complains about the second clause, since it's currently
1213
%% unused, shut it up:
1214
-dialyzer({nowarn_function, seqno_diff/3}).
1215
seqno_diff(?QOS_1, A, B) ->
1216
    %% For QoS1 messages we skip a seqno every time the epoch changes,
1217
    %% we need to substract that from the diff:
1218
    EpochA = A bsr ?EPOCH_BITS,
492✔
1219
    EpochB = B bsr ?EPOCH_BITS,
492✔
1220
    A - B - (EpochA - EpochB);
492✔
1221
seqno_diff(?QOS_2, A, B) ->
1222
    A - B.
508✔
1223

1224
%%--------------------------------------------------------------------
1225
%% Will message handling
1226
%%--------------------------------------------------------------------
1227

1228
-spec clear_will_message(session()) -> session().
1229
clear_will_message(#{s := S0} = Session) ->
1230
    S = emqx_persistent_session_ds_state:clear_will_message(S0),
105✔
1231
    Session#{s := S}.
105✔
1232

1233
-spec publish_will_message_now(session(), message()) -> session().
1234
publish_will_message_now(#{} = Session, WillMsg = #message{}) ->
1235
    _ = emqx_broker:publish(WillMsg),
14✔
1236
    clear_will_message(Session).
14✔
1237

1238
maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
1239
    case emqx_persistent_session_ds_state:get_will_message(S) of
169✔
1240
        #message{} = WillMsg ->
1241
            WillDelayInterval = emqx_channel:will_delay_interval(WillMsg),
24✔
1242
            WillDelayInterval > 0 andalso
24✔
1243
                emqx_persistent_session_ds_gc_worker:check_session_after(
17✔
1244
                    SessionId,
1245
                    timer:seconds(WillDelayInterval)
1246
                ),
1247
            ok;
24✔
1248
        _ ->
1249
            ok
145✔
1250
    end.
1251

1252
%%--------------------------------------------------------------------
1253
%% Tests
1254
%%--------------------------------------------------------------------
1255

1256
-ifdef(TEST).
1257

1258
%% Warning: the below functions may return out-of-date results because
1259
%% the sessions commit data to mria asynchronously.
1260

1261
list_all_sessions() ->
1262
    maps:from_list(
×
1263
        [
1264
            {Id, print_session(Id)}
×
1265
         || Id <- emqx_persistent_session_ds_state:list_sessions()
×
1266
        ]
1267
    ).
1268

1269
%%%% Proper generators:
1270

1271
%% Generate a sequence number that smaller than the given `NextSeqNo'
1272
%% number by at most `?EPOCH_SIZE':
1273
seqno_gen(NextSeqNo) ->
1274
    WindowSize = ?EPOCH_SIZE - 1,
1,000✔
1275
    Min = max(0, NextSeqNo - WindowSize),
1,000✔
1276
    Max = max(0, NextSeqNo - 1),
1,000✔
1277
    range(Min, Max).
1,000✔
1278

1279
%% Generate a sequence number:
1280
next_seqno_gen() ->
1281
    ?LET(
3✔
1282
        {Epoch, Offset},
1283
        {non_neg_integer(), range(0, ?EPOCH_SIZE)},
1284
        Epoch bsl ?EPOCH_BITS + Offset
3,000✔
1285
    ).
1286

1287
%%%% Property-based tests:
1288

1289
%% erlfmt-ignore
1290
packet_id_to_seqno_prop() ->
1291
    ?FORALL(
1✔
1292
        {Qos, NextSeqNo}, {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
1293
        ?FORALL(
1,000✔
1294
            ExpectedSeqNo, seqno_gen(NextSeqNo),
1295
            begin
1,000✔
1296
                PacketId = seqno_to_packet_id(Qos, ExpectedSeqNo),
1,000✔
1297
                SeqNo = packet_id_to_seqno(PacketId, NextSeqNo),
1,000✔
1298
                ?WHENFAIL(
1,000✔
1299
                    begin
×
1300
                        io:format(user, " *** PacketID = ~p~n", [PacketId]),
×
1301
                        io:format(user, " *** SeqNo = ~p -> ~p~n", [ExpectedSeqNo, SeqNo]),
×
1302
                        io:format(user, " *** NextSeqNo = ~p~n", [NextSeqNo])
×
1303
                    end,
1304
                    PacketId < 16#10000 andalso SeqNo =:= ExpectedSeqNo
1,000✔
1305
                )
1306
            end)).
1307

1308
inc_seqno_prop() ->
1309
    ?FORALL(
1✔
1310
        {Qos, SeqNo},
1311
        {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
1312
        begin
1,000✔
1313
            NewSeqNo = inc_seqno(Qos, SeqNo),
1,000✔
1314
            PacketId = seqno_to_packet_id(Qos, NewSeqNo),
1,000✔
1315
            ?WHENFAIL(
1,000✔
1316
                begin
×
1317
                    io:format(user, " *** QoS = ~p~n", [Qos]),
×
1318
                    io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]),
×
1319
                    io:format(user, " *** PacketId = ~p~n", [PacketId])
×
1320
                end,
1321
                PacketId > 0 andalso PacketId < 16#10000
1,000✔
1322
            )
1323
        end
1324
    ).
1325

1326
seqno_diff_prop() ->
1327
    ?FORALL(
1✔
1328
        {Qos, SeqNo, N},
1329
        {oneof([?QOS_1, ?QOS_2]), next_seqno_gen(), range(0, 100)},
1330
        ?IMPLIES(
1,000✔
1331
            seqno_to_packet_id(Qos, SeqNo) > 0,
1332
            begin
1,000✔
1333
                NewSeqNo = apply_n_times(N, fun(A) -> inc_seqno(Qos, A) end, SeqNo),
1,000✔
1334
                Diff = seqno_diff(Qos, NewSeqNo, SeqNo),
1,000✔
1335
                ?WHENFAIL(
1,000✔
1336
                    begin
×
1337
                        io:format(user, " *** QoS = ~p~n", [Qos]),
×
1338
                        io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]),
×
1339
                        io:format(user, " *** N : ~p == ~p~n", [N, Diff])
×
1340
                    end,
1341
                    N =:= Diff
1,000✔
1342
                )
1343
            end
1344
        )
1345
    ).
1346

1347
seqno_proper_test_() ->
1348
    Props = [packet_id_to_seqno_prop(), inc_seqno_prop(), seqno_diff_prop()],
1✔
1349
    Opts = [{numtests, 1000}, {to_file, user}],
1✔
1350
    {timeout, 30,
1✔
1351
        {setup,
1352
            fun() ->
1353
                meck:new(emqx_persistent_session_ds_state, [no_history]),
1✔
1354
                ok = meck:expect(emqx_persistent_session_ds_state, get_seqno, fun(_Track, Seqno) ->
1✔
1355
                    Seqno
1,000✔
1356
                end)
1357
            end,
1358
            fun(_) ->
1359
                meck:unload(emqx_persistent_session_ds_state)
1✔
1360
            end,
1361
            [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}.
3✔
1362

1363
apply_n_times(0, _Fun, A) ->
1364
    A;
1,000✔
1365
apply_n_times(N, Fun, A) when N > 0 ->
1366
    apply_n_times(N - 1, Fun, Fun(A)).
49,216✔
1367

1368
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc