• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.29
/apps/emqx/src/emqx_router_syncer.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_router_syncer).
18

19
-include_lib("emqx/include/logger.hrl").
20
-include_lib("snabbkaffe/include/trace.hrl").
21

22
-behaviour(gen_server).
23

24
-export([start_link/2]).
25

26
-export([push/4]).
27
-export([wait/1]).
28

29
-export([stats/0]).
30

31
-export([
32
    init/1,
33
    handle_call/3,
34
    handle_cast/2,
35
    handle_info/2,
36
    terminate/2
37
]).
38

39
-type action() :: add | delete.
40

41
-define(POOL, router_syncer_pool).
42

43
-define(MAX_BATCH_SIZE, 1000).
44

45
%% How long to idle (ms) after receiving a new operation before triggering batch sync?
46
%% Zero effectively just schedules out the process, so that it has a chance to receive
47
%% more operations, and introduce no minimum delay.
48
-define(MIN_SYNC_INTERVAL, 0).
49

50
%% How long (ms) to idle after observing a batch sync error?
51
%% Should help to avoid excessive retries in situations when errors are caused by
52
%% conditions that take some time to resolve (e.g. restarting an upstream core node).
53
-define(ERROR_DELAY, 10).
54

55
%% How soon (ms) to retry last failed batch sync attempt?
56
%% Only matter in absence of new operations, otherwise batch sync is triggered as
57
%% soon as `?ERROR_DELAY` is over.
58
-define(ERROR_RETRY_INTERVAL, 500).
59

60
-define(PRIO_HI, 1).
61
-define(PRIO_LO, 2).
62
-define(PRIO_BG, 3).
63

64
-define(PUSH(PRIO, OP), {PRIO, OP}).
65
-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}).
66

67
-define(ROUTEOP(ACT), {ACT, _, _}).
68
-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}).
69
-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}).
70

71
-ifdef(TEST).
72
-undef(MAX_BATCH_SIZE).
73
-undef(MIN_SYNC_INTERVAL).
74
-define(MAX_BATCH_SIZE, 40).
75
-define(MIN_SYNC_INTERVAL, 10).
76
-endif.
77

78
%%
79

80
-spec start_link(atom(), pos_integer()) ->
81
    {ok, pid()}.
82
start_link(Pool, Id) ->
83
    gen_server:start_link(
2,028✔
84
        {local, emqx_utils:proc_name(?MODULE, Id)},
85
        ?MODULE,
86
        [Pool, Id],
87
        []
88
    ).
89

90
-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) ->
91
    ok | _WaitRef :: reference()
92
when
93
    Opts :: #{reply => pid()}.
94
push(Action, Topic, Dest, Opts) ->
95
    Worker = gproc_pool:pick_worker(?POOL, Topic),
5,944✔
96
    Prio = designate_prio(Action, Opts),
5,944✔
97
    Context = mk_push_context(Opts),
5,944✔
98
    _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
5,944✔
99
    case Context of
5,944✔
100
        [{MRef, _}] ->
101
            MRef;
2,974✔
102
        [] ->
103
            ok
2,970✔
104
    end.
105

106
-spec wait(_WaitRef :: reference()) ->
107
    ok | {error, _Reason}.
108
wait(MRef) ->
109
    %% NOTE
110
    %% No timeouts here because (as in `emqx_broker:call/2` case) callers do not
111
    %% really expect this to fail with timeout exception. However, waiting
112
    %% indefinitely is not the best option since it blocks the caller from receiving
113
    %% other messages, so for instance channel (connection) process may not be able
114
    %% to react to socket close event in time. Better option would probably be to
115
    %% introduce cancellable operation, which will be able to check if the caller
116
    %% would still be interested in the result.
117
    receive
2,974✔
118
        {MRef, Result} ->
119
            Result
2,974✔
120
    end.
121

122
designate_prio(_, #{reply := _To}) ->
123
    ?PRIO_HI;
2,974✔
124
designate_prio(add, #{}) ->
125
    ?PRIO_LO;
×
126
designate_prio(delete, #{}) ->
127
    ?PRIO_BG.
2,970✔
128

129
mk_push_context(#{reply := To}) ->
130
    MRef = erlang:make_ref(),
2,974✔
131
    [{MRef, To}];
2,974✔
132
mk_push_context(_) ->
133
    [].
2,970✔
134

135
%%
136

137
-type stats() :: #{
138
    size := non_neg_integer(),
139
    n_add := non_neg_integer(),
140
    n_delete := non_neg_integer(),
141
    prio_highest := non_neg_integer() | undefined,
142
    prio_lowest := non_neg_integer() | undefined
143
}.
144

145
-spec stats() -> [stats()].
146
stats() ->
147
    Workers = gproc_pool:active_workers(?POOL),
24✔
148
    [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers].
24✔
149

150
%%
151

152
init([Pool, Id]) ->
153
    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
2,028✔
154
    {ok, #{stash => stash_new()}}.
2,028✔
155

156
handle_call(stats, _From, State = #{stash := Stash}) ->
157
    {reply, stash_stats(Stash), State};
48✔
158
handle_call(_Call, _From, State) ->
159
    {reply, ignored, State}.
×
160

161
handle_cast(_Msg, State) ->
162
    {noreply, State}.
×
163

164
handle_info({timeout, _TRef, retry}, State) ->
165
    NState = run_batch_loop([], maps:remove(retry_timer, State)),
6✔
166
    {noreply, NState};
6✔
167
handle_info(Push = ?PUSH(_, _), State) ->
168
    %% NOTE: Wait a bit to collect potentially overlapping operations.
169
    ok = timer:sleep(?MIN_SYNC_INTERVAL),
339✔
170
    NState = run_batch_loop([Push], State),
339✔
171
    {noreply, NState}.
339✔
172

173
terminate(_Reason, _State) ->
174
    ok.
×
175

176
%%
177

178
run_batch_loop(Incoming, State = #{stash := Stash0}) ->
179
    Stash1 = stash_add(Incoming, Stash0),
390✔
180
    Stash2 = stash_drain(Stash1),
390✔
181
    {Batch, Stash3} = mk_batch(Stash2),
390✔
182
    ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)),
390✔
183
    case run_batch(Batch) of
390✔
184
        Status = #{} ->
185
            ok = send_replies(Status, Batch),
371✔
186
            NState = cancel_retry_timer(State#{stash := Stash3}),
371✔
187
            %% NOTE
188
            %% We could postpone batches where only `?PRIO_BG` operations left, which
189
            %% would allow to do less work in situations when there are intermittently
190
            %% reconnecting clients with moderately unique subscriptions. However, this
191
            %% would also require us to forego the idempotency of batch syncs (see
192
            %% `merge_route_op/2`).
193
            case is_stash_empty(Stash3) of
371✔
194
                true ->
195
                    NState;
326✔
196
                false ->
197
                    run_batch_loop([], NState)
45✔
198
            end;
199
        BatchError ->
200
            ?SLOG(warning, #{
19✔
201
                msg => "router_batch_sync_failed",
202
                reason => BatchError,
203
                batch => batch_stats(Batch, Stash3)
204
            }),
19✔
205
            NState = State#{stash := Stash2},
19✔
206
            ok = timer:sleep(?ERROR_DELAY),
19✔
207
            ensure_retry_timer(NState)
19✔
208
    end.
209

210
ensure_retry_timer(State = #{retry_timer := _TRef}) ->
UNCOV
211
    State;
×
212
ensure_retry_timer(State) ->
213
    TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry),
19✔
214
    State#{retry_timer => TRef}.
19✔
215

216
cancel_retry_timer(State = #{retry_timer := TRef}) ->
217
    ok = emqx_utils:cancel_timer(TRef),
13✔
218
    maps:remove(retry_timer, State);
13✔
219
cancel_retry_timer(State) ->
220
    State.
358✔
221

222
%%
223

224
mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
225
    %% This is perfect situation, we just use stash as batch w/o extra reallocations.
226
    {Stash, stash_new()};
345✔
227
mk_batch(Stash) ->
228
    %% Take a subset of stashed operations to form a batch.
229
    %% Note that stash is an unordered map, it's not a queue. The order of operations is
230
    %% not preserved strictly, only loosely, because of how we start from high priority
231
    %% operations and go down to low priority ones. This might cause some operations to
232
    %% stay in stash for unfairly long time, when there are many high priority operations.
233
    %% However, it's unclear how likely this is to happen in practice.
234
    mk_batch(Stash, ?MAX_BATCH_SIZE).
45✔
235

236
mk_batch(Stash, BatchSize) ->
237
    mk_batch(?PRIO_HI, #{}, BatchSize, Stash).
46✔
238

239
mk_batch(Prio, Batch, SizeLeft, Stash) ->
240
    mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)).
105✔
241

242
mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 ->
243
    %% Iterating over stash, only taking operations with priority equal to `Prio`.
244
    case maps:next(It) of
5,635✔
245
        {Route, Op = ?ROUTEOP(_Action, Prio), NIt} ->
246
            NBatch = Batch#{Route => Op},
1,805✔
247
            NStash = maps:remove(Route, Stash),
1,805✔
248
            mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt);
1,805✔
249
        {_Route, _Op, NIt} ->
250
            %% This is lower priority operation, skip it.
251
            mk_batch(Prio, Batch, SizeLeft, Stash, NIt);
3,771✔
252
        none ->
253
            %% No more operations with priority `Prio`, go to the next priority level.
254
            true = Prio < ?PRIO_BG,
59✔
255
            mk_batch(Prio + 1, Batch, SizeLeft, Stash)
59✔
256
    end;
257
mk_batch(_Prio, Batch, _, Stash, _It) ->
258
    {Batch, Stash}.
46✔
259

260
send_replies(Errors, Batch) ->
261
    maps:foreach(
371✔
262
        fun(Route, {_Action, _Prio, Ctx}) ->
263
            case Ctx of
5,856✔
264
                [] ->
265
                    ok;
2,882✔
266
                _ ->
267
                    replyctx_send(maps:get(Route, Errors, ok), Ctx)
2,974✔
268
            end
269
        end,
270
        Batch
271
    ).
272

273
replyctx_send(_Result, []) ->
274
    noreply;
88✔
275
replyctx_send(Result, RefsPids) ->
276
    _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids),
2,980✔
277
    ok.
2,980✔
278

279
%%
280

281
run_batch(Batch) when map_size(Batch) > 0 ->
282
    catch emqx_router:do_batch(Batch);
390✔
283
run_batch(_Empty) ->
284
    #{}.
×
285

286
%%
287

288
stash_new() ->
289
    #{}.
2,374✔
290

291
is_stash_empty(Stash) ->
292
    maps:size(Stash) =:= 0.
371✔
293

294
stash_drain(Stash) ->
295
    receive
5,995✔
296
        ?PUSH(Prio, Op) ->
297
            stash_drain(stash_add(Prio, Op, Stash))
5,605✔
298
    after 0 ->
299
        Stash
390✔
300
    end.
301

302
stash_add(Pushes, Stash) ->
303
    lists:foldl(
391✔
304
        fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end,
355✔
305
        Stash,
306
        Pushes
307
    ).
308

309
stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
310
    Route = {Topic, Dest},
5,960✔
311
    case maps:get(Route, Stash, undefined) of
5,960✔
312
        undefined ->
313
            Stash#{Route => {Action, Prio, Ctx}};
5,863✔
314
        RouteOp ->
315
            RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)),
97✔
316
            Stash#{Route := RouteOpMerged}
97✔
317
    end.
318

319
merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) ->
320
    %% NOTE: This can happen as topic shard can be processed concurrently
321
    %% by different broker worker, see emqx_broker for more details.
322
    MergedCtx = Ctx1 ++ Ctx2,
3✔
323
    ?ROUTEOP(Action, Prio2, MergedCtx);
3✔
324
merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
325
    %% NOTE: Latter cancel the former.
326
    %% Strictly speaking, in ideal conditions we could just cancel both, because they
327
    %% essentially do not change the global state. However, we decided to stay on the
328
    %% safe side and cancel only the former, making batch syncs idempotent.
329
    _ = replyctx_send(ok, Ctx1),
94✔
330
    DestOp.
94✔
331

332
%%
333

334
batch_stats(Batch, Stash) ->
335
    BatchStats = stash_stats(Batch),
390✔
336
    BatchStats#{
390✔
337
        stashed => maps:size(Stash)
338
    }.
339

340
stash_stats(Stash) ->
341
    #{
438✔
342
        size => maps:size(Stash),
343
        n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)),
6,150✔
344
        n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)),
6,150✔
345
        prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash),
6,150✔
346
        prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash)
6,150✔
347
    }.
348

349
%%
350

351
-ifdef(TEST).
352
-include_lib("eunit/include/eunit.hrl").
353

354
batch_test() ->
355
    Dest = node(),
1✔
356
    Ctx = fun(N) -> [{N, self()}] end,
1✔
357
    Stash = stash_add(
1✔
358
        [
359
            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
360
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))),
361
            ?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))),
362
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))),
363
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))),
364
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))),
365
            ?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))),
366
            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))),
367
            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))),
368
            ?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))),
369
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))),
370
            ?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))),
371
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))),
372
            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))),
373
            ?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))),
374
            ?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16)))
375
        ],
376
        stash_new()
377
    ),
378
    {Batch, StashLeft} = mk_batch(Stash, 5),
1✔
379

380
    ?assertMatch(
1✔
381
        #{
382
            {<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
383
            {<<"t/3">>, Dest} := {add, ?PRIO_HI, _},
384
            {<<"t/2">>, Dest} := {delete, ?PRIO_LO, _},
385
            {<<"t/4">>, Dest} := {add, ?PRIO_HI, _},
386
            {<<"old/3">>, Dest} := {delete, ?PRIO_LO, _}
387
        },
1✔
388
        Batch
389
    ),
390
    ?assertMatch(
1✔
391
        #{
392
            {<<"old/1">>, Dest} := {delete, ?PRIO_BG, _},
393
            {<<"old/2">>, Dest} := {delete, ?PRIO_BG, _}
394
        },
1✔
395
        StashLeft
396
    ),
397

398
    %% Replies are only sent to superseded ops:
399
    ?assertEqual(
1✔
400
        [
401
            {1, ok},
402
            {5, ok},
403
            {4, ok},
404
            {9, ok},
405
            {7, ok},
406
            {8, ok},
407
            {11, ok}
408
        ],
1✔
409
        emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
1✔
410
    ).
411

412
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc