• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8702269235

16 Apr 2024 08:17AM UTC coverage: 67.82% (-0.01%) from 67.831%
8702269235

push

github

web-flow
Merge pull request #12881 from keynslug/fix/ds-repl-flaky

fix(dsrepl): make replication-related tests more stable

11 of 17 new or added lines in 1 file covered. (64.71%)

32 existing lines in 11 files now uncovered.

37936 of 55936 relevant lines covered (67.82%)

7895.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.24
/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% This module implements async message sending, disk message queuing,
18
%%  and message batching using ReplayQ.
19

20
-module(emqx_resource_buffer_worker).
21

22
-include("emqx_resource.hrl").
23
-include("emqx_resource_errors.hrl").
24
-include_lib("emqx/include/logger.hrl").
25
-include_lib("stdlib/include/ms_transform.hrl").
26
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
27

28
-behaviour(gen_statem).
29

30
-export([
31
    start_link/3,
32
    sync_query/3,
33
    async_query/3,
34
    block/1,
35
    resume/1,
36
    flush_worker/1
37
]).
38

39
-export([
40
    simple_sync_query/2,
41
    simple_sync_query/3,
42
    simple_async_query/3,
43
    simple_sync_internal_buffer_query/3
44
]).
45

46
-export([
47
    callback_mode/0,
48
    init/1,
49
    terminate/2,
50
    code_change/3
51
]).
52

53
-export([running/3, blocked/3]).
54

55
-export([queue_item_marshaller/1, estimate_size/1]).
56

57
-export([
58
    handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
59
]).
60

61
-export([clear_disk_queue_dir/2]).
62

63
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
64

65
-define(COLLECT_REQ_LIMIT, 1000).
66
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
67
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
68
-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
69
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
70
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
71
    {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
72
).
73
-define(ITEM_IDX, 2).
74
-define(RETRY_IDX, 3).
75
-define(WORKER_MREF_IDX, 4).
76

77
-type id() :: binary().
78
-type index() :: pos_integer().
79
-type expire_at() :: infinity | integer().
80
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
81
-type request() :: term().
82
-type request_from() :: undefined | gen_statem:from().
83
-type timeout_ms() :: emqx_schema:timeout_duration_ms().
84
-type request_ttl() :: emqx_schema:timeout_duration_ms().
85
-type health_check_interval() :: pos_integer().
86
-type state() :: blocked | running.
87
-type inflight_key() :: integer().
88
-type counters() :: #{
89
    dropped_expired => non_neg_integer(),
90
    dropped_queue_full => non_neg_integer(),
91
    dropped_resource_not_found => non_neg_integer(),
92
    dropped_resource_stopped => non_neg_integer(),
93
    success => non_neg_integer(),
94
    failed => non_neg_integer(),
95
    retried_success => non_neg_integer(),
96
    retried_failed => non_neg_integer()
97
}.
98
-type inflight_table() :: ets:tid() | atom() | reference().
99
-type data() :: #{
100
    id := id(),
101
    index := index(),
102
    inflight_tid := inflight_table(),
103
    async_workers := #{pid() => reference()},
104
    batch_size := pos_integer(),
105
    batch_time := timeout_ms(),
106
    counters := counters(),
107
    metrics_flush_interval := timeout_ms(),
108
    queue := replayq:q(),
109
    resume_interval := timeout_ms(),
110
    tref := undefined | {reference(), reference()},
111
    metrics_tref := undefined | {reference(), reference()}
112
}.
113

114
callback_mode() -> [state_functions, state_enter].
13,296✔
115

116
start_link(Id, Index, Opts) ->
117
    gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
13,296✔
118

119
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
120
sync_query(Id, Request, Opts0) ->
121
    ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}),
3,702✔
122
    Opts1 = ensure_timeout_query_opts(Opts0, sync),
3,702✔
123
    Opts = ensure_expire_at(Opts1),
3,702✔
124
    PickKey = maps:get(pick_key, Opts, self()),
3,702✔
125
    Timeout = maps:get(timeout, Opts),
3,702✔
126
    emqx_resource_metrics:matched_inc(Id),
3,702✔
127
    pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
3,702✔
128

129
-spec async_query(id(), request(), query_opts()) -> Result :: term().
130
async_query(Id, Request, Opts0) ->
131
    ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}),
63,731✔
132
    Opts1 = ensure_timeout_query_opts(Opts0, async),
63,731✔
133
    Opts = ensure_expire_at(Opts1),
63,731✔
134
    PickKey = maps:get(pick_key, Opts, self()),
63,731✔
135
    emqx_resource_metrics:matched_inc(Id),
63,731✔
136
    pick_cast(Id, PickKey, {query, Request, Opts}).
63,731✔
137

138
%% simple query the resource without batching and queuing.
139
-spec simple_sync_query(id(), request()) -> term().
140
simple_sync_query(Id, Request) ->
141
    simple_sync_query(Id, Request, #{}).
307✔
142

143
-spec simple_sync_query(id(), request(), query_opts()) -> term().
144
simple_sync_query(Id, Request, QueryOpts0) ->
145
    %% Note: since calling this function implies in bypassing the
146
    %% buffer workers, and each buffer worker index is used when
147
    %% collecting gauge metrics, we use this dummy index.  If this
148
    %% call ends up calling buffering functions, that's a bug and
149
    %% would mess up the metrics anyway.  `undefined' is ignored by
150
    %% `emqx_resource_metrics:*_shift/3'.
151
    ?tp(simple_sync_query, #{id => Id, request => Request}),
309✔
152
    Index = undefined,
309✔
153
    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
309✔
154
    emqx_resource_metrics:matched_inc(Id),
309✔
155
    Ref = make_request_ref(),
309✔
156
    ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
309✔
157
    Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
309✔
158
    _ = handle_query_result(Id, Result, _HasBeenSent = false),
309✔
159
    Result.
309✔
160

161
%% simple async-query the resource without batching and queuing.
162
-spec simple_async_query(id(), request(), query_opts()) -> term().
163
simple_async_query(Id, Request, QueryOpts0) ->
164
    ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
201✔
165
    Index = undefined,
201✔
166
    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
201✔
167
    emqx_resource_metrics:matched_inc(Id),
201✔
168
    Ref = make_request_ref(),
201✔
169
    ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
201✔
170
    Result = call_query(
201✔
171
        async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
172
    ),
173
    _ = handle_query_result(Id, Result, _HasBeenSent = false),
201✔
174
    Result.
201✔
175

176
%% This is a hack to handle cases where the underlying connector has internal buffering
177
%% (e.g.: Kafka and Pulsar producers).  Since the message may be inernally retried at a
178
%% later time, we can't bump metrics immediatelly if the return value is not a success
179
%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
180
-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
181
simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
182
    ?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
119✔
183
    ReplyAlias = alias([reply]),
119✔
184
    try
119✔
185
        MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
119✔
186
        QueryOpts1 = QueryOpts0#{
119✔
187
            reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
188
        },
189
        QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
119✔
190
        case simple_async_query(Id, Request, QueryOpts) of
119✔
191
            {error, _} = Error ->
192
                Error;
3✔
193
            {async_return, {error, _} = Error} ->
194
                Error;
×
195
            {async_return, {ok, _Pid}} ->
196
                receive
116✔
197
                    {ReplyAlias, Response} ->
198
                        Response
116✔
199
                after Timeout ->
200
                    _ = unalias(ReplyAlias),
×
201
                    receive
×
202
                        {ReplyAlias, Response} ->
203
                            Response
×
204
                    after 0 -> {error, timeout}
×
205
                    end
206
                end
207
        end
208
    after
209
        _ = unalias(ReplyAlias)
119✔
210
    end.
211

212
simple_query_opts() ->
213
    ensure_expire_at(#{simple_query => true, timeout => infinity}).
629✔
214

215
-spec block(pid()) -> ok.
216
block(ServerRef) ->
217
    gen_statem:cast(ServerRef, block).
23✔
218

219
-spec resume(pid()) -> ok.
220
resume(ServerRef) ->
221
    gen_statem:cast(ServerRef, resume).
182✔
222

223
-spec flush_worker(pid()) -> ok.
224
flush_worker(ServerRef) ->
225
    gen_statem:cast(ServerRef, flush).
1,116✔
226

227
-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
228
init({Id, Index, Opts}) ->
229
    process_flag(trap_exit, true),
13,296✔
230
    true = gproc_pool:connect_worker(Id, {Id, Index}),
13,296✔
231
    BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
13,296✔
232
    QueueOpts = replayq_opts(Id, Index, Opts),
13,296✔
233
    Queue = replayq:open(QueueOpts),
13,296✔
234
    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
13,296✔
235
    emqx_resource_metrics:inflight_set(Id, Index, 0),
13,296✔
236
    InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
13,296✔
237
    InflightTID = inflight_new(InflightWinSize),
13,296✔
238
    HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
13,296✔
239
    RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL),
13,296✔
240
    BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
13,296✔
241
    BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0),
13,296✔
242
    DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval),
13,296✔
243
    ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
13,296✔
244
    MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL),
13,296✔
245
    Data0 = #{
13,296✔
246
        id => Id,
247
        index => Index,
248
        inflight_tid => InflightTID,
249
        async_workers => #{},
250
        batch_size => BatchSize,
251
        batch_time => BatchTime,
252
        counters => #{},
253
        metrics_flush_interval => MetricsFlushInterval,
254
        queue => Queue,
255
        resume_interval => ResumeInterval,
256
        tref => undefined,
257
        metrics_tref => undefined
258
    },
259
    Data = ensure_metrics_flush_timer(Data0),
13,296✔
260
    ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
13,296✔
261
    {ok, running, Data}.
13,296✔
262

263
running(enter, _, #{tref := _Tref} = Data) ->
264
    ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}),
13,325✔
265
    %% According to `gen_statem' laws, we mustn't call `maybe_flush'
266
    %% directly because it may decide to return `{next_state, blocked, _}',
267
    %% and that's an invalid response for a state enter call.
268
    %% Returning a next event from a state enter call is also
269
    %% prohibited.
270
    {keep_state, ensure_flush_timer(Data, 0)};
13,325✔
271
running(cast, resume, _St) ->
272
    keep_state_and_data;
179✔
273
running(cast, flush, Data) ->
274
    flush(Data);
1,102✔
275
running(cast, block, St) ->
276
    {next_state, blocked, St};
21✔
277
running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
278
    handle_query_requests(Request0, Data);
22,116✔
279
running(info, {flush, Ref}, Data = #{tref := {_TRef, Ref}}) ->
280
    flush(Data#{tref := undefined});
14,509✔
281
running(info, {flush, _Ref}, _Data) ->
282
    ?tp(discarded_stale_flush, #{}),
3✔
283
    keep_state_and_data;
3✔
284
running(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
285
    Data = flush_metrics(Data0#{metrics_tref := undefined}),
3,930✔
286
    {keep_state, Data};
3,930✔
287
running(info, {flush_metrics, _Ref}, _Data) ->
288
    keep_state_and_data;
×
289
running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
290
    is_map_key(Pid, AsyncWorkers0)
291
->
292
    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}),
5✔
293
    handle_async_worker_down(Data0, Pid);
5✔
294
running(info, Info, _St) ->
295
    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}),
×
296
    keep_state_and_data.
×
297

298
blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
299
    ?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}),
68✔
300
    %% discard the old timer, new timer will be started when entering running state again
301
    St = cancel_flush_timer(St0),
68✔
302
    {keep_state, St, {state_timeout, ResumeT, unblock}};
68✔
303
blocked(cast, block, _St) ->
304
    keep_state_and_data;
2✔
305
blocked(cast, resume, St) ->
306
    resume_from_blocked(St);
3✔
307
blocked(cast, flush, St) ->
308
    resume_from_blocked(St);
×
309
blocked(state_timeout, unblock, St) ->
310
    resume_from_blocked(St);
182✔
311
blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
312
    Data = collect_and_enqueue_query_requests(Request0, Data0),
5✔
313
    {keep_state, Data};
5✔
314
blocked(info, {flush, _Ref}, _Data) ->
315
    %% ignore stale timer
316
    keep_state_and_data;
×
317
blocked(info, {flush_metrics, Ref}, Data0 = #{metrics_tref := {_TRef, Ref}}) ->
318
    Data = flush_metrics(Data0#{metrics_tref := undefined}),
39✔
319
    {keep_state, Data};
39✔
320
blocked(info, {flush_metrics, _Ref}, _Data) ->
321
    keep_state_and_data;
×
322
blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
323
    is_map_key(Pid, AsyncWorkers0)
324
->
325
    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}),
×
326
    handle_async_worker_down(Data0, Pid);
×
327
blocked(info, Info, _Data) ->
328
    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}),
×
329
    keep_state_and_data.
×
330

331
terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
332
    _ = replayq:close(Q),
×
333
    emqx_resource_metrics:inflight_set(Id, Index, 0),
×
334
    %% since we want volatile queues, this will be 0 after
335
    %% termination.
336
    emqx_resource_metrics:queuing_set(Id, Index, 0),
×
337
    gproc_pool:disconnect_worker(Id, {Id, Index}),
×
338
    ok.
×
339

340
code_change(_OldVsn, State, _Extra) ->
341
    {ok, State}.
×
342

343
%%==============================================================================
344
-define(PICK(ID, KEY, PID, EXPR),
345
    try
346
        case gproc_pool:pick_worker(ID, KEY) of
347
            PID when is_pid(PID) ->
348
                EXPR;
349
            _ ->
350
                ?RESOURCE_ERROR(worker_not_created, "resource not created")
351
        end
352
    catch
353
        error:badarg ->
354
            ?RESOURCE_ERROR(worker_not_created, "resource not created");
355
        error:timeout ->
356
            ?RESOURCE_ERROR(timeout, "call resource timeout")
357
    end
358
).
359

360
pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
361
    ?PICK(Id, Key, Pid, begin
3,702✔
362
        MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
3,702✔
363
        ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
3,702✔
364
        erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
3,702✔
365
        receive
3,702✔
366
            {MRef, Response} ->
367
                erlang:demonitor(MRef, [flush]),
3,667✔
368
                maybe_reply_to(Response, QueryOpts);
3,667✔
369
            {'DOWN', MRef, process, Pid, Reason} ->
UNCOV
370
                error({worker_down, Reason})
×
371
        after Timeout ->
372
            erlang:demonitor(MRef, [flush]),
35✔
373
            receive
35✔
374
                {MRef, Response} ->
375
                    maybe_reply_to(Response, QueryOpts)
×
376
            after 0 ->
377
                error(timeout)
35✔
378
            end
379
        end
380
    end).
×
381

382
pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) ->
383
    ?PICK(Id, Key, Pid, begin
63,731✔
384
        ReplyTo = maps:get(reply_to, QueryOpts, undefined),
63,731✔
385
        erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
63,731✔
386
        ok
63,731✔
387
    end).
×
388

389
resume_from_blocked(Data) ->
390
    ?tp(buffer_worker_resume_from_blocked_enter, #{}),
211✔
391
    #{inflight_tid := InflightTID} = Data,
211✔
392
    Now = now_(),
211✔
393
    case inflight_get_first_retriable(InflightTID, Now) of
211✔
394
        none ->
395
            case is_inflight_full(InflightTID) of
29✔
396
                true ->
397
                    {keep_state, Data};
×
398
                false ->
399
                    {next_state, running, Data}
29✔
400
            end;
401
        {expired, Ref, Batch} ->
402
            BufferWorkerPid = self(),
20✔
403
            IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
20✔
404
            Counters =
20✔
405
                case IsAcked of
406
                    true -> #{dropped_expired => length(Batch)};
20✔
407
                    false -> #{}
×
408
                end,
409
            batch_reply_dropped(Batch, {error, request_expired}),
20✔
410
            NData = aggregate_counters(Data, Counters),
20✔
411
            ?tp(buffer_worker_retry_expired, #{expired => Batch}),
20✔
412
            resume_from_blocked(NData);
20✔
413
        {single, Ref, Query} ->
414
            %% We retry msgs in inflight window sync, as if we send them
415
            %% async, they will be appended to the end of inflight window again.
416
            retry_inflight_sync(Ref, Query, Data);
127✔
417
        {batch, Ref, NotExpired, []} ->
418
            retry_inflight_sync(Ref, NotExpired, Data);
35✔
419
        {batch, Ref, NotExpired, Expired} ->
420
            NumExpired = length(Expired),
×
421
            ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
×
422
            batch_reply_dropped(Expired, {error, request_expired}),
×
423
            NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
×
424
            ?tp(buffer_worker_retry_expired, #{expired => Expired}),
×
425
            %% We retry msgs in inflight window sync, as if we send them
426
            %% async, they will be appended to the end of inflight window again.
427
            retry_inflight_sync(Ref, NotExpired, NData)
×
428
    end.
429

430
retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
431
    #{
162✔
432
        id := Id,
433
        inflight_tid := InflightTID,
434
        index := Index,
435
        resume_interval := ResumeT
436
    } = Data0,
437
    ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
162✔
438
    QueryOpts = #{simple_query => false},
162✔
439
    Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
162✔
440
    {ShouldAck, PostFn, DeltaCounters} =
162✔
441
        case QueryOrBatch of
442
            ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
443
                Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
127✔
444
                reply_caller_defer_metrics(Id, Reply, QueryOpts);
127✔
445
            [?QUERY(_, _, _, _) | _] = Batch ->
446
                batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
35✔
447
        end,
448
    Data1 = aggregate_counters(Data0, DeltaCounters),
162✔
449
    case ShouldAck of
162✔
450
        %% Send failed because resource is down
451
        nack ->
452
            PostFn(),
156✔
453
            ?tp(
156✔
454
                buffer_worker_retry_inflight_failed,
455
                #{
456
                    ref => Ref,
457
                    query_or_batch => QueryOrBatch,
458
                    result => Result
459
                }
460
            ),
461
            {keep_state, Data1, {state_timeout, ResumeT, unblock}};
156✔
462
        %% Send ok or failed but the resource is working
463
        ack ->
464
            BufferWorkerPid = self(),
6✔
465
            IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
6✔
466
            %% we need to defer bumping the counters after
467
            %% `inflight_drop' to avoid the race condition when an
468
            %% inflight request might get completed concurrently with
469
            %% the retry, bumping them twice.  Since both inflight
470
            %% requests (repeated and original) have the safe `Ref',
471
            %% we bump the counter when removing it from the table.
472
            IsAcked andalso PostFn(),
6✔
473
            ?tp(
6✔
474
                buffer_worker_retry_inflight_succeeded,
475
                #{
476
                    ref => Ref,
477
                    query_or_batch => QueryOrBatch
478
                }
479
            ),
480
            resume_from_blocked(Data1)
6✔
481
    end.
482

483
%% Called during the `running' state only.
484
-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
485
    gen_statem:event_handler_result(state(), data()).
486
handle_query_requests(Request0, Data0) ->
487
    Data = collect_and_enqueue_query_requests(Request0, Data0),
22,116✔
488
    maybe_flush(Data).
22,116✔
489

490
collect_and_enqueue_query_requests(Request0, Data0) ->
491
    #{
22,121✔
492
        id := Id,
493
        index := Index,
494
        queue := Q
495
    } = Data0,
496
    Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
22,121✔
497
    Queries =
22,121✔
498
        lists:map(
499
            fun
500
                (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) ->
501
                    ReplyFun = maps:get(async_reply_fun, Opts, undefined),
63,565✔
502
                    HasBeenSent = false,
63,565✔
503
                    ExpireAt = maps:get(expire_at, Opts),
63,565✔
504
                    ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
63,565✔
505
                (?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
506
                    HasBeenSent = false,
3,868✔
507
                    ExpireAt = maps:get(expire_at, Opts),
3,868✔
508
                    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
3,868✔
509
            end,
510
            Requests
511
        ),
512
    {Overflown, NewQ, DeltaCounters} = append_queue(Id, Index, Q, Queries),
22,121✔
513
    ok = reply_overflown(Overflown),
22,121✔
514
    aggregate_counters(Data0#{queue := NewQ}, DeltaCounters).
22,121✔
515

516
reply_overflown([]) ->
517
    ok;
22,121✔
518
reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
519
    do_reply_caller(ReplyTo, {error, buffer_overflow}),
×
520
    reply_overflown(More).
×
521

522
do_reply_caller(undefined, _Result) ->
523
    ok;
119✔
524
do_reply_caller({F, Args}, {async_return, Result}) ->
525
    %% this is an early return to async caller, the retry
526
    %% decision has to be made by the caller
527
    do_reply_caller({F, Args}, Result);
35✔
528
do_reply_caller({F, Args}, Result) when is_function(F) ->
529
    _ = erlang:apply(F, Args ++ [Result]),
3,814✔
530
    ok;
3,814✔
531
do_reply_caller({F, Args, _Context}, Result) when is_function(F) ->
532
    _ = erlang:apply(F, Args ++ [Result]),
306✔
533
    ok.
306✔
534

535
maybe_flush(Data0) ->
536
    #{
22,116✔
537
        batch_size := BatchSize,
538
        queue := Q
539
    } = Data0,
540
    QueueCount = queue_count(Q),
22,116✔
541
    case QueueCount >= BatchSize of
22,116✔
542
        true ->
543
            flush(Data0);
1,172✔
544
        false ->
545
            {keep_state, ensure_flush_timer(Data0)}
20,944✔
546
    end.
547

548
%% Called during the `running' state only.
549
-spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
550
flush(Data0) ->
551
    #{
16,783✔
552
        batch_size := BatchSize,
553
        inflight_tid := InflightTID,
554
        queue := Q0
555
    } = Data0,
556
    Data1 = cancel_flush_timer(Data0),
16,783✔
557
    CurrentCount = queue_count(Q0),
16,783✔
558
    IsFull = is_inflight_full(InflightTID),
16,783✔
559
    ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{
16,783✔
560
        queued => CurrentCount,
561
        is_inflight_full => IsFull,
562
        inflight => inflight_count(InflightTID)
563
    }),
564
    case {CurrentCount, IsFull} of
16,783✔
565
        {0, _} ->
566
            ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
13,340✔
567
                inflight => inflight_count(InflightTID)
568
            }),
569
            {keep_state, Data1};
13,340✔
570
        {_, true} ->
571
            ?tp(buffer_worker_flush_but_inflight_full, #{}),
×
572
            {keep_state, Data1};
×
573
        {_, false} ->
574
            ?tp(buffer_worker_flush_before_pop, #{}),
3,443✔
575
            {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
3,443✔
576
            Data2 = Data1#{queue := Q1},
3,443✔
577
            ?tp(buffer_worker_flush_before_sieve_expired, #{}),
3,443✔
578
            Now = now_(),
3,443✔
579
            %% if the request has expired, the caller is no longer
580
            %% waiting for a response.
581
            case sieve_expired_requests(Batch, Now) of
3,443✔
582
                {[], _AllExpired} ->
583
                    ok = replayq:ack(Q1, QAckRef),
×
584
                    NumExpired = length(Batch),
×
585
                    batch_reply_dropped(Batch, {error, request_expired}),
×
586
                    Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
×
587
                    ?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
×
588
                    flush(Data3);
×
589
                {NotExpired, Expired} ->
590
                    NumExpired = length(Expired),
3,443✔
591
                    batch_reply_dropped(Expired, {error, request_expired}),
3,443✔
592
                    Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
3,443✔
593
                    IsBatch = (BatchSize > 1),
3,443✔
594
                    %% We *must* use the new queue, because we currently can't
595
                    %% `nack' a `pop'.
596
                    %% Maybe we could re-open the queue?
597
                    ?tp(
3,443✔
598
                        buffer_worker_flush_potentially_partial,
599
                        #{expired => Expired, not_expired => NotExpired}
600
                    ),
601
                    Ref = make_request_ref(),
3,443✔
602
                    do_flush(Data3, #{
3,443✔
603
                        is_batch => IsBatch,
604
                        batch => NotExpired,
605
                        ref => Ref,
606
                        ack_ref => QAckRef
607
                    })
608
            end
609
    end.
610

611
-spec do_flush(data(), #{
612
    is_batch := boolean(),
613
    batch := [queue_query()],
614
    ack_ref := replayq:ack_ref(),
615
    ref := inflight_key()
616
}) ->
617
    gen_statem:event_handler_result(state(), data()).
618
do_flush(
619
    #{queue := Q1} = Data0,
620
    #{
621
        is_batch := false,
622
        batch := Batch,
623
        ref := Ref,
624
        ack_ref := QAckRef
625
    }
626
) ->
627
    #{
2,205✔
628
        id := Id,
629
        index := Index,
630
        inflight_tid := InflightTID
631
    } = Data0,
632
    %% unwrap when not batching (i.e., batch size == 1)
633
    [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
2,205✔
634
    QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
2,205✔
635
    Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
2,205✔
636
    Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
2,195✔
637
    {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts),
2,195✔
638
    Data1 = aggregate_counters(Data0, DeltaCounters),
2,195✔
639
    case ShouldAck of
2,195✔
640
        %% Failed; remove the request from the queue, as we cannot pop
641
        %% from it again, but we'll retry it using the inflight table.
642
        nack ->
643
            ok = replayq:ack(Q1, QAckRef),
26✔
644
            %% we set it atomically just below; a limitation of having
645
            %% to use tuples for atomic ets updates
646
            IsRetriable = true,
26✔
647
            AsyncWorkerMRef0 = undefined,
26✔
648
            InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, AsyncWorkerMRef0),
26✔
649
            %% we must append again to the table to ensure that the
650
            %% request will be retried (i.e., it might not have been
651
            %% inserted during `call_query' if the resource was down
652
            %% and/or if it was a sync request).
653
            inflight_append(InflightTID, InflightItem),
26✔
654
            mark_inflight_as_retriable(InflightTID, Ref),
26✔
655
            {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
26✔
656
            store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
26✔
657
            ?tp(
26✔
658
                buffer_worker_flush_nack,
659
                #{
660
                    ref => Ref,
661
                    is_retriable => IsRetriable,
662
                    batch_or_query => Request,
663
                    result => Result
664
                }
665
            ),
666
            {next_state, blocked, Data2};
26✔
667
        %% Success; just ack.
668
        ack ->
669
            ok = replayq:ack(Q1, QAckRef),
2,169✔
670
            %% Async requests are acked later when the async worker
671
            %% calls the corresponding callback function.  Also, we
672
            %% must ensure the async worker is being monitored for
673
            %% such requests.
674
            IsUnrecoverableError = is_unrecoverable_error(Result),
2,169✔
675
            BufferWorkerPid = self(),
2,169✔
676
            case is_async_return(Result) of
2,169✔
677
                true when IsUnrecoverableError ->
678
                    ack_inflight(InflightTID, Ref, BufferWorkerPid);
36✔
679
                true ->
680
                    ok;
251✔
681
                false ->
682
                    ack_inflight(InflightTID, Ref, BufferWorkerPid)
1,882✔
683
            end,
684
            {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
2,169✔
685
            store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
2,169✔
686
            ?tp(
2,169✔
687
                buffer_worker_flush_ack,
688
                #{
689
                    batch_or_query => Request,
690
                    result => Result
691
                }
692
            ),
693
            CurrentCount = queue_count(Q1),
2,169✔
694
            case CurrentCount > 0 of
2,169✔
695
                true ->
696
                    ?tp(buffer_worker_flush_ack_reflush, #{
1,111✔
697
                        batch_or_query => Request, result => Result, queue_count => CurrentCount
698
                    }),
699
                    flush_worker(self());
1,111✔
700
                false ->
701
                    ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
1,058✔
702
                        inflight => inflight_count(InflightTID)
703
                    }),
704
                    ok
1,058✔
705
            end,
706
            {keep_state, Data2}
2,169✔
707
    end;
708
do_flush(#{queue := Q1} = Data0, #{
709
    is_batch := true,
710
    batch := Batch,
711
    ref := Ref,
712
    ack_ref := QAckRef
713
}) ->
714
    #{
1,238✔
715
        id := Id,
716
        index := Index,
717
        batch_size := BatchSize,
718
        inflight_tid := InflightTID
719
    } = Data0,
720
    QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
1,238✔
721
    Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts),
1,238✔
722
    {ShouldAck, DeltaCounters} = batch_reply_caller(Id, Result, Batch, QueryOpts),
1,237✔
723
    Data1 = aggregate_counters(Data0, DeltaCounters),
1,237✔
724
    case ShouldAck of
1,237✔
725
        %% Failed; remove the request from the queue, as we cannot pop
726
        %% from it again, but we'll retry it using the inflight table.
727
        nack ->
728
            ok = replayq:ack(Q1, QAckRef),
16✔
729
            %% we set it atomically just below; a limitation of having
730
            %% to use tuples for atomic ets updates
731
            IsRetriable = true,
16✔
732
            AsyncWorkerMRef0 = undefined,
16✔
733
            InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef0),
16✔
734
            %% we must append again to the table to ensure that the
735
            %% request will be retried (i.e., it might not have been
736
            %% inserted during `call_query' if the resource was down
737
            %% and/or if it was a sync request).
738
            inflight_append(InflightTID, InflightItem),
16✔
739
            mark_inflight_as_retriable(InflightTID, Ref),
16✔
740
            {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
16✔
741
            store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
16✔
742
            ?tp(
16✔
743
                buffer_worker_flush_nack,
744
                #{
745
                    ref => Ref,
746
                    is_retriable => IsRetriable,
747
                    batch_or_query => Batch,
748
                    result => Result
749
                }
750
            ),
751
            {next_state, blocked, Data2};
16✔
752
        %% Success; just ack.
753
        ack ->
754
            ok = replayq:ack(Q1, QAckRef),
1,221✔
755
            %% Async requests are acked later when the async worker
756
            %% calls the corresponding callback function.  Also, we
757
            %% must ensure the async worker is being monitored for
758
            %% such requests.
759
            IsUnrecoverableError = is_unrecoverable_error(Result),
1,221✔
760
            BufferWorkerPid = self(),
1,221✔
761
            case is_async_return(Result) of
1,221✔
762
                true when IsUnrecoverableError ->
763
                    ack_inflight(InflightTID, Ref, BufferWorkerPid);
29✔
764
                true ->
765
                    ok;
199✔
766
                false ->
767
                    ack_inflight(InflightTID, Ref, BufferWorkerPid)
993✔
768
            end,
769
            {Data2, AsyncWorkerMRef} = ensure_async_worker_monitored(Data1, Result),
1,221✔
770
            store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef),
1,221✔
771
            CurrentCount = queue_count(Q1),
1,221✔
772
            ?tp(
1,221✔
773
                buffer_worker_flush_ack,
774
                #{
775
                    batch_or_query => Batch,
776
                    result => Result,
777
                    queue_count => CurrentCount
778
                }
779
            ),
780
            Data3 =
1,220✔
781
                case {CurrentCount > 0, CurrentCount >= BatchSize} of
782
                    {false, _} ->
783
                        ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{
1,205✔
784
                            inflight => inflight_count(InflightTID)
785
                        }),
786
                        Data2;
1,205✔
787
                    {true, true} ->
788
                        ?tp(buffer_worker_flush_ack_reflush, #{
3✔
789
                            batch_or_query => Batch,
790
                            result => Result,
791
                            queue_count => CurrentCount,
792
                            batch_size => BatchSize
793
                        }),
794
                        flush_worker(self()),
3✔
795
                        Data2;
3✔
796
                    {true, false} ->
797
                        ensure_flush_timer(Data2)
12✔
798
                end,
799
            {keep_state, Data3}
1,220✔
800
    end.
801

802
batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
803
    {ShouldBlock, PostFn, DeltaCounters} =
1,237✔
804
        batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
805
    PostFn(),
1,237✔
806
    {ShouldBlock, DeltaCounters}.
1,237✔
807

808
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
809
    Replies = expand_batch_reply(BatchResult, Batch),
1,469✔
810
    {ShouldAck, PostFns, Counters} =
1,469✔
811
        lists:foldl(
812
            fun(Reply, {_ShouldAck, PostFns, OldCounters}) ->
813
                %% _ShouldAck should be the same as ShouldAck starting from the second reply
814
                {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(
63,986✔
815
                    Id, Reply, QueryOpts
816
                ),
817
                {ShouldAck, [PostFn | PostFns], merge_counters(OldCounters, DeltaCounters)}
63,986✔
818
            end,
819
            {ack, [], #{}},
820
            Replies
821
        ),
822
    PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
1,469✔
823
    {ShouldAck, PostFn, Counters}.
1,469✔
824

825
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
826
    lists:map(
×
827
        fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) ->
828
            ?REPLY(FROM, SENT, Result)
×
829
        end,
830
        lists:zip(Batch, BatchResults)
831
    );
832
expand_batch_reply(BatchResult, Batch) ->
833
    lists:map(
1,469✔
834
        fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
835
            ?REPLY(FROM, SENT, BatchResult)
63,986✔
836
        end,
837
        Batch
838
    ).
839

840
reply_caller(Id, Reply, QueryOpts) ->
841
    {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
2,195✔
842
    PostFn(),
2,195✔
843
    {ShouldAck, DeltaCounters}.
2,195✔
844

845
%% Should only reply to the caller when the decision is final (not
846
%% retriable).  See comment on `handle_query_result_pure'.
847
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
848
    handle_query_result_pure(Id, Result, HasBeenSent);
62,236✔
849
reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
850
    IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
4,506✔
851
    IsUnrecoverableError = is_unrecoverable_error(Result),
4,506✔
852
    {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
4,506✔
853
    case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
4,506✔
854
        {ack, {async_return, _}, true, _} ->
855
            ok = do_reply_caller(ReplyTo, Result);
56✔
856
        {ack, {async_return, _}, false, _} ->
857
            ok;
287✔
858
        {_, _, _, true} ->
859
            ok = do_reply_caller(ReplyTo, Result);
186✔
860
        {nack, _, _, _} ->
861
            ok;
190✔
862
        {ack, _, _, _} ->
863
            ok = do_reply_caller(ReplyTo, Result)
3,787✔
864
    end,
865
    {ShouldAck, PostFn, DeltaCounters}.
4,506✔
866

867
%% This is basically used only by rule actions.  To avoid rule action metrics from
868
%% becoming inconsistent when we drop messages, we need a way to signal rule engine that
869
%% this action has reached a conclusion.
870
-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok.
871
reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when
872
    is_function(Fn), is_list(Args)
873
->
874
    %% We want to avoid bumping metrics inside the buffer worker, since it's costly.
875
    emqx_pool:async_submit(Fn, Args ++ [Result]),
10✔
876
    ok;
10✔
877
reply_dropped(_ReplyTo, _Result) ->
878
    ok.
18✔
879

880
-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
881
batch_reply_dropped(Batch, Result) ->
882
    lists:foreach(
3,463✔
883
        fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
884
            reply_dropped(ReplyTo, Result)
20✔
885
        end,
886
        Batch
887
    ).
888

889
%% This is only called by `simple_{,a}sync_query', so we can bump the
890
%% counters here.
891
handle_query_result(Id, Result, HasBeenSent) ->
892
    {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
510✔
893
    PostFn(),
510✔
894
    bump_counters(Id, DeltaCounters),
510✔
895
    ShouldBlock.
510✔
896

897
%% We should always retry (nack), except when:
898
%%   * resource is not found
899
%%   * resource is stopped
900
%%   * the result is a success (or at least a delayed result)
901
%% We also retry even sync requests.  In that case, we shouldn't reply
902
%% the caller until one of those final results above happen.
903
-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) ->
904
    {ack | nack, function(), counters()}.
905
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
906
    PostFn = fun() ->
21✔
907
        ?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}),
21✔
908
        ok
21✔
909
    end,
910
    {nack, PostFn, #{}};
21✔
911
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
912
    NotWorking == not_connected; NotWorking == blocked
913
->
914
    {nack, fun() -> ok end, #{}};
34✔
915
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
916
    PostFn = fun() ->
2✔
917
        ?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}),
2✔
918
        ok
2✔
919
    end,
920
    {ack, PostFn, #{dropped_resource_not_found => 1}};
2✔
921
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
922
    PostFn = fun() ->
×
923
        ?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}),
×
924
        ok
×
925
    end,
926
    {ack, PostFn, #{dropped_resource_stopped => 1}};
×
927
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
928
    PostFn = fun() ->
×
929
        ?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}),
×
930
        ok
×
931
    end,
932
    {nack, PostFn, #{}};
×
933
handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
934
    case is_unrecoverable_error(Error) of
364✔
935
        true ->
936
            PostFn =
195✔
937
                fun() ->
938
                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
195✔
939
                    ok
195✔
940
                end,
941
            Counters =
195✔
942
                case HasBeenSent of
943
                    true -> #{retried_failed => 1};
×
944
                    false -> #{failed => 1}
195✔
945
                end,
946
            {ack, PostFn, Counters};
195✔
947
        false ->
948
            PostFn =
169✔
949
                fun() ->
950
                    ?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}),
144✔
951
                    ok
144✔
952
                end,
953
            {nack, PostFn, #{}}
169✔
954
    end;
955
handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
956
    handle_query_async_result_pure(Id, Result, HasBeenSent);
739✔
957
handle_query_result_pure(_Id, Result, HasBeenSent) ->
958
    PostFn = fun() ->
66,092✔
959
        assert_ok_result(Result),
66,092✔
960
        ok
66,092✔
961
    end,
962
    Counters =
66,092✔
963
        case HasBeenSent of
964
            true -> #{retried_success => 1};
8✔
965
            false -> #{success => 1}
66,084✔
966
        end,
967
    {ack, PostFn, Counters}.
66,092✔
968

969
-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) ->
970
    {ack | nack, function(), counters()}.
971
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
972
    case is_unrecoverable_error(Error) of
72✔
973
        true ->
974
            PostFn =
67✔
975
                fun() ->
976
                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
67✔
977
                    ok
67✔
978
                end,
979
            Counters =
67✔
980
                case HasBeenSent of
981
                    true -> #{retried_failed => 1};
×
982
                    false -> #{failed => 1}
67✔
983
                end,
984
            {ack, PostFn, Counters};
67✔
985
        false ->
986
            PostFn = fun() ->
5✔
987
                ?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}),
5✔
988
                ok
5✔
989
            end,
990
            {nack, PostFn, #{}}
5✔
991
    end;
992
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
993
    {ack, fun() -> ok end, #{}};
613✔
994
handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
995
    {ack, fun() -> ok end, #{}}.
54✔
996

997
-spec aggregate_counters(data(), counters()) -> data().
998
aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->
999
    Counters = merge_counters(OldCounters, DeltaCounters),
29,178✔
1000
    Data#{counters := Counters}.
29,178✔
1001

1002
-spec merge_counters(counters(), counters()) -> counters().
1003
merge_counters(OldCounters, DeltaCounters) ->
1004
    maps:fold(
93,164✔
1005
        fun(Metric, Val, Acc) ->
1006
            maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
70,110✔
1007
        end,
1008
        OldCounters,
1009
        DeltaCounters
1010
    ).
1011

1012
-spec flush_metrics(data()) -> data().
1013
flush_metrics(Data = #{id := Id, counters := Counters}) ->
1014
    bump_counters(Id, Counters),
3,969✔
1015
    set_gauges(Data),
3,969✔
1016
    log_expired_message_count(Data),
3,969✔
1017
    ensure_metrics_flush_timer(Data#{counters := #{}}).
3,969✔
1018

1019
-spec ensure_metrics_flush_timer(data()) -> data().
1020
ensure_metrics_flush_timer(Data = #{metrics_tref := undefined, metrics_flush_interval := T}) ->
1021
    Ref = make_ref(),
17,265✔
1022
    TRef = erlang:send_after(T, self(), {flush_metrics, Ref}),
17,265✔
1023
    Data#{metrics_tref := {TRef, Ref}}.
17,265✔
1024

1025
-spec bump_counters(id(), counters()) -> ok.
1026
bump_counters(Id, Counters) ->
1027
    Iter = maps:iterator(Counters),
5,087✔
1028
    do_bump_counters(Iter, Id).
5,087✔
1029

1030
do_bump_counters(Iter, Id) ->
1031
    case maps:next(Iter) of
6,154✔
1032
        {Key, Val, NIter} ->
1033
            do_bump_counters1(Key, Val, Id),
1,067✔
1034
            do_bump_counters(NIter, Id);
1,067✔
1035
        none ->
1036
            ok
5,087✔
1037
    end.
1038

1039
do_bump_counters1(dropped_expired, Val, Id) ->
1040
    emqx_resource_metrics:dropped_expired_inc(Id, Val);
125✔
1041
do_bump_counters1(dropped_queue_full, Val, Id) ->
1042
    emqx_resource_metrics:dropped_queue_full_inc(Id, Val);
×
1043
do_bump_counters1(failed, Val, Id) ->
1044
    emqx_resource_metrics:failed_inc(Id, Val);
95✔
1045
do_bump_counters1(retried_failed, Val, Id) ->
1046
    emqx_resource_metrics:retried_failed_inc(Id, Val);
×
1047
do_bump_counters1(success, Val, Id) ->
1048
    emqx_resource_metrics:success_inc(Id, Val);
842✔
1049
do_bump_counters1(retried_success, Val, Id) ->
1050
    emqx_resource_metrics:retried_success_inc(Id, Val);
3✔
1051
do_bump_counters1(dropped_resource_not_found, Val, Id) ->
1052
    emqx_resource_metrics:dropped_resource_not_found_inc(Id, Val);
2✔
1053
do_bump_counters1(dropped_resource_stopped, Val, Id) ->
1054
    emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val).
×
1055

1056
-spec log_expired_message_count(data()) -> ok.
1057
log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) ->
1058
    ExpiredCount = maps:get(dropped_expired, Counters, 0),
3,969✔
1059
    case ExpiredCount > 0 of
3,969✔
1060
        false ->
1061
            ok;
3,967✔
1062
        true ->
1063
            ?SLOG(info, #{
2✔
1064
                msg => "buffer_worker_dropped_expired_messages",
1065
                resource_id => Id,
1066
                worker_index => Index,
1067
                expired_count => ExpiredCount
1068
            }),
2✔
1069
            ok
2✔
1070
    end.
1071

1072
-spec set_gauges(data()) -> ok.
1073
set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
1074
    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
3,969✔
1075
    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
3,969✔
1076
    ok.
3,969✔
1077

1078
handle_async_worker_down(Data0, Pid) ->
1079
    #{async_workers := AsyncWorkers0} = Data0,
5✔
1080
    {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
5✔
1081
    Data = Data0#{async_workers := AsyncWorkers},
5✔
1082
    mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
5✔
1083
    {next_state, blocked, Data}.
5✔
1084

1085
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
1086
call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
1087
    ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
4,115✔
1088
    case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of
4,115✔
1089
        %% This seems to be the only place where the `rm_status_stopped' status matters,
1090
        %% to distinguish from the `disconnected' status.
1091
        {ok, _Group, #{status := ?rm_status_stopped}} ->
1092
            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
×
1093
        {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
1094
            {error, {unrecoverable_error, unhealthy_target}};
×
1095
        {ok, _Group, Resource} ->
1096
            do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
4,113✔
1097
        {error, not_found} ->
1098
            ?RESOURCE_ERROR(not_found, "resource not found")
2✔
1099
    end.
1100

1101
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
1102
extract_connector_id(Id) when is_binary(Id) ->
1103
    case binary:split(Id, <<":">>, [global]) of
8,196✔
1104
        [
1105
            _ChannelGlobalType,
1106
            _ChannelSubType,
1107
            _ChannelName,
1108
            <<"connector">>,
1109
            ConnectorType,
1110
            ConnectorName
1111
        ] ->
1112
            <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>;
7,589✔
1113
        _ ->
1114
            Id
607✔
1115
    end;
1116
extract_connector_id(Id) ->
1117
    Id.
156✔
1118

1119
is_channel_id(Id) ->
1120
    extract_connector_id(Id) =/= Id.
191✔
1121

1122
%% Check if channel is installed in the connector state.
1123
%% There is no need to query the conncector if the channel is not
1124
%% installed as the query will fail anyway.
1125
pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
1126
    is_map_key(Id, Channels)
1127
->
1128
    ChannelStatus = maps:get(Id, Channels),
3,782✔
1129
    case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of
3,782✔
1130
        true ->
1131
            ok;
3,747✔
1132
        false ->
1133
            error_if_channel_is_not_installed(Id, QueryOpts)
35✔
1134
    end;
1135
pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
1136
    error_if_channel_is_not_installed(Id, QueryOpts);
156✔
1137
pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
1138
    ok.
143✔
1139

1140
error_if_channel_is_not_installed(Id, QueryOpts) ->
1141
    %% Fail with a recoverable error if the channel is not installed and there are buffer
1142
    %% workers involved so that the operation can be retried.  Otherwise, this is
1143
    %% unrecoverable.  It is emqx_resource_manager's responsibility to ensure that the
1144
    %% channel installation is retried.
1145
    IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
191✔
1146
    case is_channel_id(Id) of
191✔
1147
        true when IsSimpleQuery ->
1148
            {error,
4✔
1149
                {unrecoverable_error,
1150
                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
1151
        true ->
1152
            {error,
31✔
1153
                {recoverable_error,
1154
                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
1155
        false ->
1156
            ok
156✔
1157
    end.
1158

1159
do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
1160
    ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer
1161
->
1162
    %% The query overrides the query mode of the resource, send even in disconnected state
1163
    ?tp(simple_query_override, #{query_mode => ReqQM}),
199✔
1164
    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
199✔
1165
    CallMode = call_mode(QM, CBM),
199✔
1166
    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
199✔
1167
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
1168
    ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
1169
->
1170
    %% The connector supports buffer, send even in disconnected state
1171
    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
11✔
1172
    CallMode = call_mode(QM, CBM),
11✔
1173
    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
11✔
1174
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
1175
    %% when calling from the buffer worker or other simple queries,
1176
    %% only apply the query fun when it's at connected status
1177
    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
3,871✔
1178
    CallMode = call_mode(QM, CBM),
3,871✔
1179
    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
3,871✔
1180
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
1181
    ?RESOURCE_ERROR(not_connected, "resource not connected").
32✔
1182

1183
-define(APPLY_RESOURCE(NAME, EXPR, REQ),
1184
    try
1185
        %% if the callback module (connector) wants to return an error that
1186
        %% makes the current resource goes into the `blocked` state, it should
1187
        %% return `{error, {recoverable_error, Reason}}`
1188
        EXPR
1189
    catch
1190
        %% For convenience and to make the code in the callbacks cleaner an
1191
        %% error exception with the two following formats are translated to the
1192
        %% corresponding return values. The receiver of the return values
1193
        %% recognizes these special return formats and use them to decided if a
1194
        %% request should be retried.
1195
        error:{unrecoverable_error, Msg} ->
1196
            {error, {unrecoverable_error, Msg}};
1197
        error:{recoverable_error, Msg} ->
1198
            {error, {recoverable_error, Msg}};
1199
        ERR:REASON:STACKTRACE ->
1200
            ?RESOURCE_ERROR(exception, #{
1201
                name => NAME,
1202
                id => Id,
1203
                request => REQ,
1204
                error => {ERR, REASON},
1205
                stacktrace => STACKTRACE
1206
            })
1207
    end
1208
).
1209

1210
apply_query_fun(
1211
    sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts
1212
) ->
1213
    ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
2,333✔
1214
    maybe_reply_to(
2,333✔
1215
        ?APPLY_RESOURCE(
1216
            call_query,
1217
            begin
2,333✔
1218
                case pre_query_channel_check(Request, Channels, QueryOpts) of
2,333✔
1219
                    ok ->
1220
                        Mod:on_query(extract_connector_id(Id), Request, ResSt);
2,301✔
1221
                    Error ->
1222
                        Error
32✔
1223
                end
1224
            end,
75✔
1225
            Request
1226
        ),
1227
        QueryOpts
1228
    );
1229
apply_query_fun(
1230
    async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts
1231
) ->
1232
    ?tp(call_query_async, #{
490✔
1233
        id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
1234
    }),
1235
    InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
490✔
1236
    ?APPLY_RESOURCE(
490✔
1237
        call_query_async,
1238
        begin
490✔
1239
            ReplyFun = fun ?MODULE:handle_async_reply/2,
490✔
1240
            ReplyContext = #{
490✔
1241
                buffer_worker => self(),
1242
                resource_id => Id,
1243
                worker_index => Index,
1244
                inflight_tid => InflightTID,
1245
                request_ref => Ref,
1246
                query_opts => QueryOpts,
1247
                min_query => minimize(Query)
1248
            },
1249
            IsRetriable = false,
490✔
1250
            AsyncWorkerMRef = undefined,
490✔
1251
            InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
490✔
1252
            ok = inflight_append(InflightTID, InflightItem),
490✔
1253
            case pre_query_channel_check(Request, Channels, QueryOpts) of
490✔
1254
                ok ->
1255
                    Result = Mod:on_query_async(
487✔
1256
                        extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
1257
                    ),
1258
                    {async_return, Result};
487✔
1259
                Error ->
1260
                    maybe_reply_to(Error, QueryOpts)
3✔
1261
            end
UNCOV
1262
        end,
×
1263
        Request
1264
    );
1265
apply_query_fun(
1266
    sync,
1267
    Mod,
1268
    Id,
1269
    _Index,
1270
    _Ref,
1271
    [?QUERY(_, FirstRequest, _, _) | _] = Batch,
1272
    ResSt,
1273
    Channels,
1274
    QueryOpts
1275
) ->
1276
    ?tp(call_batch_query, #{
1,028✔
1277
        id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
1278
    }),
1279
    Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
1,028✔
1280
    maybe_reply_to(
1,028✔
1281
        ?APPLY_RESOURCE(
1282
            call_batch_query,
1283
            begin
1,028✔
1284
                case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
1,028✔
1285
                    ok ->
1286
                        Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
1,028✔
1287
                    Error ->
1288
                        Error
×
1289
                end
1290
            end,
7✔
1291
            Batch
1292
        ),
1293
        QueryOpts
1294
    );
1295
apply_query_fun(
1296
    async,
1297
    Mod,
1298
    Id,
1299
    Index,
1300
    Ref,
1301
    [?QUERY(_, FirstRequest, _, _) | _] = Batch,
1302
    ResSt,
1303
    Channels,
1304
    QueryOpts
1305
) ->
1306
    ?tp(call_batch_query_async, #{
230✔
1307
        id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
1308
    }),
1309
    InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
230✔
1310
    ?APPLY_RESOURCE(
230✔
1311
        call_batch_query_async,
1312
        begin
230✔
1313
            ReplyFun = fun ?MODULE:handle_async_batch_reply/2,
230✔
1314
            ReplyContext = #{
230✔
1315
                buffer_worker => self(),
1316
                resource_id => Id,
1317
                worker_index => Index,
1318
                inflight_tid => InflightTID,
1319
                request_ref => Ref,
1320
                query_opts => QueryOpts,
1321
                min_batch => minimize(Batch)
1322
            },
1323
            Requests = lists:map(
230✔
1324
                fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
253✔
1325
            ),
1326
            IsRetriable = false,
230✔
1327
            AsyncWorkerMRef = undefined,
230✔
1328
            InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
230✔
1329
            ok = inflight_append(InflightTID, InflightItem),
230✔
1330
            case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
230✔
1331
                ok ->
1332
                    Result = Mod:on_batch_query_async(
230✔
1333
                        extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
1334
                    ),
1335
                    {async_return, Result};
229✔
1336
                Error ->
1337
                    maybe_reply_to(Error, QueryOpts)
×
1338
            end
1339
        end,
1✔
1340
        Batch
1341
    ).
1342

1343
maybe_reply_to(Result, #{reply_to := ReplyTo}) ->
1344
    do_reply_caller(ReplyTo, Result),
91✔
1345
    Result;
91✔
1346
maybe_reply_to(Result, _) ->
1347
    Result.
6,929✔
1348

1349
handle_async_reply(
1350
    #{
1351
        request_ref := Ref,
1352
        inflight_tid := InflightTID,
1353
        query_opts := Opts
1354
    } = ReplyContext,
1355
    Result
1356
) ->
1357
    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
443✔
1358
        discard ->
1359
            ok;
1✔
1360
        continue ->
1361
            handle_async_reply1(ReplyContext, Result)
442✔
1362
    end.
1363

1364
handle_async_reply1(
1365
    #{
1366
        request_ref := Ref,
1367
        inflight_tid := InflightTID,
1368
        resource_id := Id,
1369
        buffer_worker := BufferWorkerPid,
1370
        min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
1371
    } = ReplyContext,
1372
    Result
1373
) ->
1374
    ?tp(
442✔
1375
        handle_async_reply_enter,
1376
        #{batch_or_query => [_Query], ref => Ref, result => Result}
1377
    ),
1378
    Now = now_(),
442✔
1379
    case is_expired(ExpireAt, Now) of
442✔
1380
        true ->
1381
            IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
8✔
1382
            %% evalutate metrics call here since we're not inside
1383
            %% buffer worker
1384
            IsAcked andalso
8✔
1385
                begin
8✔
1386
                    emqx_resource_metrics:late_reply_inc(Id),
8✔
1387
                    reply_dropped(ReplyTo, {error, late_reply})
8✔
1388
                end,
1389
            ?tp(handle_async_reply_expired, #{expired => [_Query]}),
8✔
1390
            ok;
8✔
1391
        false ->
1392
            do_handle_async_reply(ReplyContext, Result)
434✔
1393
    end.
1394

1395
do_handle_async_reply(
1396
    #{
1397
        query_opts := QueryOpts,
1398
        resource_id := Id,
1399
        request_ref := Ref,
1400
        buffer_worker := BufferWorkerPid,
1401
        inflight_tid := InflightTID,
1402
        min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
1403
    },
1404
    Result
1405
) ->
1406
    %% NOTE: 'inflight' is the count of messages that were sent async
1407
    %% but received no ACK, NOT the number of messages queued in the
1408
    %% inflight window.
1409
    {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics(
434✔
1410
        Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
1411
    ),
1412

1413
    ?tp(handle_async_reply, #{
434✔
1414
        action => Action,
1415
        batch_or_query => [_Query],
1416
        ref => Ref,
1417
        result => Result
1418
    }),
1419
    case Action of
434✔
1420
        nack ->
1421
            %% Keep retrying.
1422
            ok = mark_inflight_as_retriable(InflightTID, Ref),
13✔
1423
            ok = ?MODULE:block(BufferWorkerPid),
13✔
1424
            blocked;
13✔
1425
        ack ->
1426
            ok = do_async_ack(
421✔
1427
                InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
1428
            )
1429
    end.
1430

1431
handle_async_batch_reply(
1432
    #{
1433
        inflight_tid := InflightTID,
1434
        request_ref := Ref,
1435
        query_opts := Opts
1436
    } = ReplyContext,
1437
    Result
1438
) ->
1439
    case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
197✔
1440
        discard ->
1441
            ok;
×
1442
        continue ->
1443
            handle_async_batch_reply1(ReplyContext, Result)
197✔
1444
    end.
1445

1446
handle_async_batch_reply1(
1447
    #{
1448
        inflight_tid := InflightTID,
1449
        request_ref := Ref,
1450
        min_batch := Batch
1451
    } = ReplyContext,
1452
    Result
1453
) ->
1454
    ?tp(
197✔
1455
        handle_async_reply_enter,
1456
        #{batch_or_query => Batch, ref => Ref, result => Result}
1457
    ),
1458
    Now = now_(),
197✔
1459
    case sieve_expired_requests(Batch, Now) of
197✔
1460
        {_NotExpired, []} ->
1461
            %% this is the critical code path,
1462
            %% we try not to do ets:lookup in this case
1463
            %% because the batch can be quite big
1464
            do_handle_async_batch_reply(ReplyContext, Result);
197✔
1465
        {_NotExpired, _Expired} ->
1466
            %% at least one is expired
1467
            %% the batch from reply context is minimized, so it cannot be used
1468
            %% to update the inflight items, hence discard Batch and lookup the RealBatch
1469
            ?tp(handle_async_reply_expired, #{expired => _Expired}),
×
1470
            handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
×
1471
    end.
1472

1473
handle_async_batch_reply2([], _, _, _) ->
1474
    %% this usually should never happen unless the async callback is being evaluated concurrently
1475
    ok;
×
1476
handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) ->
1477
    ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight,
×
1478
    #{
×
1479
        resource_id := Id,
1480
        buffer_worker := BufferWorkerPid,
1481
        inflight_tid := InflightTID,
1482
        request_ref := Ref,
1483
        min_batch := Batch
1484
    } = ReplyContext,
1485
    %% All batch items share the same HasBeenSent flag
1486
    %% So we just take the original flag from the ReplyContext batch
1487
    %% and put it back to the batch found in inflight table
1488
    %% which must have already been set to `false`
1489
    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
×
1490
    {RealNotExpired0, RealExpired, Results} =
×
1491
        sieve_expired_requests_with_results(RealBatch, Now, Results0),
1492
    RealNotExpired =
×
1493
        lists:map(
1494
            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
1495
                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
×
1496
            end,
1497
            RealNotExpired0
1498
        ),
1499
    NumExpired = length(RealExpired),
×
1500
    %% evalutate metrics call here since we're not inside buffer
1501
    %% worker
1502
    emqx_resource_metrics:late_reply_inc(Id, NumExpired),
×
1503
    batch_reply_dropped(RealExpired, {error, late_reply}),
×
1504
    case RealNotExpired of
×
1505
        [] ->
1506
            %% all expired, no need to update back the inflight batch
1507
            _ = ack_inflight(InflightTID, Ref, BufferWorkerPid),
×
1508
            ok;
×
1509
        _ ->
1510
            %% some queries are not expired, put them back to the inflight batch
1511
            %% so it can be either acked now or retried later
1512
            ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
×
1513
            ?tp_ignore_side_effects_in_prod(
×
1514
                handle_async_reply_partially_expired,
1515
                #{
1516
                    inflight_count => inflight_count(InflightTID),
1517
                    num_inflight_messages => inflight_num_msgs(InflightTID)
1518
                }
1519
            ),
1520
            do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Results)
×
1521
    end.
1522

1523
do_handle_async_batch_reply(
1524
    #{
1525
        buffer_worker := BufferWorkerPid,
1526
        resource_id := Id,
1527
        inflight_tid := InflightTID,
1528
        request_ref := Ref,
1529
        min_batch := Batch,
1530
        query_opts := QueryOpts
1531
    },
1532
    Result
1533
) ->
1534
    {Action, PostFn, DeltaCounters} = batch_reply_caller_defer_metrics(
197✔
1535
        Id, Result, Batch, QueryOpts
1536
    ),
1537
    ?tp(handle_async_reply, #{
197✔
1538
        action => Action,
1539
        batch_or_query => Batch,
1540
        ref => Ref,
1541
        result => Result
1542
    }),
1543
    case Action of
197✔
1544
        nack ->
1545
            %% Keep retrying.
1546
            ok = mark_inflight_as_retriable(InflightTID, Ref),
10✔
1547
            ok = ?MODULE:block(BufferWorkerPid),
10✔
1548
            blocked;
10✔
1549
        ack ->
1550
            ok = do_async_ack(
187✔
1551
                InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts
1552
            )
1553
    end.
1554

1555
do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, QueryOpts) ->
1556
    IsKnownRef = ack_inflight(InflightTID, Ref, BufferWorkerPid),
608✔
1557
    case maps:get(simple_query, QueryOpts, false) of
608✔
1558
        true ->
1559
            PostFn(),
196✔
1560
            bump_counters(Id, DeltaCounters);
196✔
1561
        false when IsKnownRef ->
1562
            PostFn(),
412✔
1563
            bump_counters(Id, DeltaCounters);
412✔
1564
        false ->
1565
            ok
×
1566
    end,
1567
    ok.
608✔
1568

1569
%% check if the async reply is valid.
1570
%% e.g. if a connector evaluates the callback more than once:
1571
%% 1. If the request was previously deleted from inflight table due to
1572
%%    either succeeded previously or expired, this function logs a
1573
%%    warning message and returns 'discard' instruction.
1574
%% 2. If the request was previously failed and now pending on a retry,
1575
%%    then this function will return 'continue' as there is no way to
1576
%%    tell if this reply is stae or not.
1577
maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) ->
1578
    continue;
196✔
1579
maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) ->
1580
    try ets:member(InflightTID, Ref) of
444✔
1581
        true ->
1582
            continue;
443✔
1583
        false ->
1584
            ?tp(
×
1585
                warning,
1586
                unknown_async_reply_discarded,
1587
                #{inflight_key => Ref}
1588
            ),
1589
            discard
×
1590
    catch
1591
        error:badarg ->
1592
            %% shutdown ?
1593
            discard
1✔
1594
    end.
1595

1596
%%==============================================================================
1597
%% operations for queue
1598
queue_item_marshaller(Bin) when is_binary(Bin) ->
1599
    binary_to_term(Bin);
×
1600
queue_item_marshaller(Item) ->
1601
    term_to_binary(Item).
×
1602

1603
estimate_size(QItem) ->
1604
    erlang:external_size(QItem).
67,433✔
1605

1606
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
1607
    {[queue_query()], replayq:q(), counters()}.
1608
append_queue(Id, Index, Q, Queries) ->
1609
    %% this assertion is to ensure that we never append a raw binary
1610
    %% because the marshaller will get lost.
1611
    false = is_binary(hd(Queries)),
22,121✔
1612
    Q0 = replayq:append(Q, Queries),
22,121✔
1613
    {Overflown, Q2, DeltaCounters} =
22,121✔
1614
        case replayq:overflow(Q0) of
1615
            OverflownBytes when OverflownBytes =< 0 ->
1616
                {[], Q0, #{}};
22,121✔
1617
            OverflownBytes ->
1618
                PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
×
1619
                {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
×
1620
                ok = replayq:ack(Q1, QAckRef),
×
1621
                Dropped = length(Items2),
×
1622
                Counters = #{dropped_queue_full => Dropped},
×
1623
                ?SLOG(info, #{
×
1624
                    msg => "buffer_worker_overflow",
1625
                    resource_id => Id,
1626
                    worker_index => Index,
1627
                    dropped => Dropped
1628
                }),
×
1629
                {Items2, Q1, Counters}
×
1630
        end,
1631
    ?tp(
22,121✔
1632
        buffer_worker_appended_to_queue,
1633
        #{
1634
            id => Id,
1635
            items => Queries,
1636
            queue_count => queue_count(Q2),
1637
            overflown => length(Overflown)
1638
        }
1639
    ),
1640
    {Overflown, Q2, DeltaCounters}.
22,121✔
1641

1642
%%==============================================================================
1643
%% the inflight queue for async query
1644
-define(MAX_SIZE_REF, max_size).
1645
-define(SIZE_REF, size).
1646
-define(BATCH_COUNT_REF, batch_count).
1647
-define(INITIAL_TIME_REF, initial_time).
1648
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
1649

1650
inflight_new(InfltWinSZ) ->
1651
    TableId = ets:new(
13,296✔
1652
        emqx_resource_buffer_worker_inflight_tab,
1653
        [ordered_set, public, {write_concurrency, true}]
1654
    ),
1655
    inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}),
13,296✔
1656
    %% we use this counter because we might deal with batches as
1657
    %% elements.
1658
    inflight_append(TableId, {?SIZE_REF, 0}),
13,296✔
1659
    inflight_append(TableId, {?BATCH_COUNT_REF, 0}),
13,296✔
1660
    inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}),
13,296✔
1661
    inflight_append(TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}),
13,296✔
1662
    TableId.
13,296✔
1663

1664
-spec inflight_get_first_retriable(ets:tid(), integer()) ->
1665
    none
1666
    | {expired, inflight_key(), [queue_query()]}
1667
    | {single, inflight_key(), queue_query()}
1668
    | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
1669
inflight_get_first_retriable(InflightTID, Now) ->
1670
    MatchSpec =
211✔
1671
        ets:fun2ms(
1672
            fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _AsyncWorkerMRef)) when
1673
                IsRetriable =:= true
1674
            ->
1675
                {Ref, BatchOrQuery}
1676
            end
1677
        ),
1678
    case ets:select(InflightTID, MatchSpec, _Limit = 1) of
211✔
1679
        '$end_of_table' ->
1680
            none;
29✔
1681
        {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
1682
            case is_expired(ExpireAt, Now) of
140✔
1683
                true ->
1684
                    {expired, Ref, [Query]};
13✔
1685
                false ->
1686
                    {single, Ref, Query}
127✔
1687
            end;
1688
        {[{Ref, Batch = [_ | _]}], _Continuation} ->
1689
            case sieve_expired_requests(Batch, Now) of
42✔
1690
                {[], _AllExpired} ->
1691
                    {expired, Ref, Batch};
7✔
1692
                {NotExpired, Expired} ->
1693
                    {batch, Ref, NotExpired, Expired}
35✔
1694
            end
1695
    end.
1696

1697
is_inflight_full(undefined) ->
1698
    false;
×
1699
is_inflight_full(InflightTID) ->
1700
    [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
16,812✔
1701
    %% we consider number of batches rather than number of messages
1702
    %% because one batch request may hold several messages.
1703
    Size = inflight_count(InflightTID),
16,812✔
1704
    Size >= MaxSize.
16,812✔
1705

1706
inflight_count(InflightTID) ->
1707
    emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0).
49,198✔
1708

1709
inflight_num_msgs(InflightTID) ->
1710
    [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
3,969✔
1711
    Size.
3,969✔
1712

1713
inflight_append(undefined, _InflightItem) ->
1714
    ok;
199✔
1715
inflight_append(
1716
    InflightTID,
1717
    ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
1718
) ->
1719
    Batch = mark_as_sent(Batch0),
246✔
1720
    InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
246✔
1721
    IsNew = ets:insert_new(InflightTID, InflightItem),
246✔
1722
    BatchSize = length(Batch),
246✔
1723
    IsNew andalso inc_inflight(InflightTID, BatchSize),
246✔
1724
    ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
246✔
1725
    ok;
246✔
1726
inflight_append(
1727
    InflightTID,
1728
    ?INFLIGHT_ITEM(
1729
        Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef
1730
    )
1731
) ->
1732
    Query = mark_as_sent(Query0),
317✔
1733
    InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
317✔
1734
    IsNew = ets:insert_new(InflightTID, InflightItem),
317✔
1735
    IsNew andalso inc_inflight(InflightTID, 1),
317✔
1736
    ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
317✔
1737
    ok;
317✔
1738
inflight_append(InflightTID, {Ref, Data}) ->
1739
    ets:insert(InflightTID, {Ref, Data}),
66,480✔
1740
    %% this is a metadata row being inserted; therefore, we don't bump
1741
    %% the inflight metric.
1742
    ok.
66,480✔
1743

1744
%% a request was already appended and originally not retriable, but an
1745
%% error occurred and it is now retriable.
1746
mark_inflight_as_retriable(undefined, _Ref) ->
1747
    ok;
×
1748
mark_inflight_as_retriable(InflightTID, Ref) ->
1749
    _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
65✔
1750
    %% the old worker's DOWN should not affect this inflight any more
1751
    _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}),
65✔
1752
    ok.
65✔
1753

1754
%% Track each worker pid only once.
1755
ensure_async_worker_monitored(
1756
    Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, AsyncWorkerPid}} = _Result
1757
) when
1758
    is_pid(AsyncWorkerPid), is_map_key(AsyncWorkerPid, AsyncWorkers)
1759
->
1760
    AsyncWorkerMRef = maps:get(AsyncWorkerPid, AsyncWorkers),
140✔
1761
    {Data0, AsyncWorkerMRef};
140✔
1762
ensure_async_worker_monitored(
1763
    Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, AsyncWorkerPid}}
1764
) when
1765
    is_pid(AsyncWorkerPid)
1766
->
1767
    AsyncWorkerMRef = monitor(process, AsyncWorkerPid),
256✔
1768
    AsyncWorkers = AsyncWorkers0#{AsyncWorkerPid => AsyncWorkerMRef},
256✔
1769
    Data = Data0#{async_workers := AsyncWorkers},
256✔
1770
    {Data, AsyncWorkerMRef};
256✔
1771
ensure_async_worker_monitored(Data0, _Result) ->
1772
    {Data0, undefined}.
3,036✔
1773

1774
-spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
1775
    ok.
1776
store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) ->
1777
    ok;
×
1778
store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
1779
    ok;
3,036✔
1780
store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when
1781
    is_reference(AsyncWorkerMRef)
1782
->
1783
    _ = ets:update_element(
396✔
1784
        InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef}
1785
    ),
1786
    ok.
396✔
1787

1788
ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
1789
    false;
196✔
1790
ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
1791
    {Count, Removed} =
3,386✔
1792
        case ets:take(InflightTID, Ref) of
1793
            [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
1794
                {1, true};
285✔
1795
            [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] ->
1796
                {length(Batch), true};
226✔
1797
            [] ->
1798
                {0, false}
2,875✔
1799
        end,
1800
    FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
3,386✔
1801
    case FlushCheck of
3,386✔
1802
        no_flush -> ok;
3,384✔
1803
        flush -> ?MODULE:flush_worker(BufferWorkerPid)
2✔
1804
    end,
1805
    IsKnownRef = (Count > 0),
3,386✔
1806
    IsKnownRef.
3,386✔
1807

1808
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) ->
1809
    #{inflight_tid := InflightTID} = Data,
5✔
1810
    IsRetriable = true,
5✔
1811
    MatchSpec =
5✔
1812
        ets:fun2ms(
1813
            fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, AsyncWorkerMRef0)) when
1814
                AsyncWorkerMRef =:= AsyncWorkerMRef0
1815
            ->
1816
                ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef0)
1817
            end
1818
        ),
1819
    _NumAffected = ets:select_replace(InflightTID, MatchSpec),
5✔
1820
    ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}),
5✔
1821
    ok.
5✔
1822

1823
%% used to update a batch after dropping expired individual queries.
1824
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
1825
    _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
×
1826
    ok = dec_inflight_update(InflightTID, NumExpired).
×
1827

1828
inc_inflight(InflightTID, Count) ->
1829
    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
557✔
1830
    _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
557✔
1831
    ok.
557✔
1832

1833
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
1834
    no_flush | flush.
1835
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
1836
    no_flush;
2,875✔
1837
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
1838
    NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
×
1839
    MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
×
1840
    %% if the new value is Max - 1, it means that we've just made room
1841
    %% in the inflight table, so we should poke the buffer worker to
1842
    %% make it continue flushing.
1843
    case NewValue =:= MaxValue - 1 of
×
1844
        true -> flush;
×
1845
        false -> no_flush
×
1846
    end;
1847
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
1848
    %% If Count > 0, it must have been removed
1849
    NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
511✔
1850
    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
511✔
1851
    MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
511✔
1852
    %% if the new value is Max - 1, it means that we've just made room
1853
    %% in the inflight table, so we should poke the buffer worker to
1854
    %% make it continue flushing.
1855
    case NewValue =:= MaxValue - 1 of
511✔
1856
        true -> flush;
2✔
1857
        false -> no_flush
509✔
1858
    end.
1859

1860
dec_inflight_update(_InflightTID, _Count = 0) ->
1861
    ok;
×
1862
dec_inflight_update(InflightTID, Count) when Count > 0 ->
1863
    _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
×
1864
    ok.
×
1865

1866
%%==============================================================================
1867

1868
call_mode(force_sync, _) -> sync;
441✔
1869
call_mode(async_if_possible, always_sync) -> sync;
2,920✔
1870
call_mode(async_if_possible, async_if_possible) -> async.
720✔
1871

1872
assert_ok_result(ok) ->
1873
    true;
65,209✔
1874
assert_ok_result({async_return, R}) ->
1875
    assert_ok_result(R);
×
1876
assert_ok_result(R) when is_tuple(R) ->
1877
    try
883✔
1878
        ok = erlang:element(1, R)
883✔
1879
    catch
1880
        error:{badmatch, _} ->
1881
            error({not_ok_result, R})
×
1882
    end;
1883
assert_ok_result(R) ->
1884
    error({not_ok_result, R}).
×
1885

1886
queue_count(Q) ->
1887
    replayq:count(Q).
81,675✔
1888

1889
disk_queue_dir(Id, Index) ->
1890
    QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
17,476✔
1891
    QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
17,476✔
1892
    emqx_utils:safe_filename(QDir).
17,476✔
1893

1894
clear_disk_queue_dir(Id, Index) ->
1895
    ReplayQDir = disk_queue_dir(Id, Index),
17,476✔
1896
    case file:del_dir_r(ReplayQDir) of
17,476✔
1897
        {error, enoent} ->
1898
            ok;
17,476✔
1899
        Res ->
1900
            Res
×
1901
    end.
1902

1903
ensure_flush_timer(Data = #{batch_time := T}) ->
1904
    ensure_flush_timer(Data, T).
20,956✔
1905

1906
ensure_flush_timer(Data = #{tref := undefined}, 0) ->
1907
    %% if the batch_time is 0, we don't need to start a timer, which
1908
    %% can be costly at high rates.
1909
    Ref = make_ref(),
14,458✔
1910
    self() ! {flush, Ref},
14,458✔
1911
    Data#{tref => {Ref, Ref}};
14,458✔
1912
ensure_flush_timer(Data = #{tref := undefined}, T) ->
1913
    Ref = make_ref(),
96✔
1914
    TRef = erlang:send_after(T, self(), {flush, Ref}),
96✔
1915
    Data#{tref => {TRef, Ref}};
96✔
1916
ensure_flush_timer(Data, _T) ->
1917
    Data.
19,727✔
1918

1919
cancel_flush_timer(St = #{tref := undefined}) ->
1920
    St;
16,806✔
1921
cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
1922
    _ = erlang:cancel_timer(TRef),
45✔
1923
    St#{tref => undefined}.
45✔
1924

1925
-spec make_request_ref() -> inflight_key().
1926
make_request_ref() ->
1927
    now_().
17,249✔
1928

1929
collect_requests(Acc, Limit) ->
1930
    Count = length(Acc),
22,121✔
1931
    do_collect_requests(Acc, Count, Limit).
22,121✔
1932

1933
do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
1934
    lists:reverse(Acc);
11✔
1935
do_collect_requests(Acc, Count, Limit) ->
1936
    receive
67,422✔
1937
        ?SEND_REQ(_ReplyTo, _Req) = Request ->
1938
            do_collect_requests([Request | Acc], Count + 1, Limit)
45,312✔
1939
    after 0 ->
1940
        lists:reverse(Acc)
22,110✔
1941
    end.
1942

1943
mark_as_sent(Batch) when is_list(Batch) ->
1944
    lists:map(fun mark_as_sent/1, Batch);
246✔
1945
mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
1946
    HasBeenSent = true,
588✔
1947
    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
588✔
1948

1949
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
1950
    true;
409✔
1951
is_unrecoverable_error({error, {recoverable_error, _}}) ->
1952
    false;
327✔
1953
is_unrecoverable_error({async_return, Result}) ->
1954
    is_unrecoverable_error(Result);
860✔
1955
is_unrecoverable_error({error, _}) ->
1956
    %% TODO: delete this clause.
1957
    %% Ideally all errors except for 'unrecoverable_error' should be
1958
    %% retried, including DB schema errors.
1959
    true;
207✔
1960
is_unrecoverable_error(_) ->
1961
    false.
7,389✔
1962

1963
is_async_return({async_return, _}) ->
1964
    true;
515✔
1965
is_async_return(_) ->
1966
    false.
2,875✔
1967

1968
sieve_expired_requests(Batch, Now) ->
1969
    lists:partition(
3,682✔
1970
        fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
1971
            not is_expired(ExpireAt, Now)
66,199✔
1972
        end,
1973
        Batch
1974
    ).
1975

1976
sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) ->
1977
    %% individual results; we need to drop those that match expired queries
1978
    {RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} =
×
1979
        lists:foldl(
1980
            fun(
1981
                {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result},
1982
                {NotExpAcc, ResAcc, ExpAcc}
1983
            ) ->
1984
                case not is_expired(ExpireAt, Now) of
×
1985
                    true ->
1986
                        {[Query | NotExpAcc], [Result | ResAcc], ExpAcc};
×
1987
                    false ->
1988
                        {NotExpAcc, ResAcc, [Query | ExpAcc]}
×
1989
                end
1990
            end,
1991
            {[], [], []},
1992
            lists:zip(Batch, Results)
1993
        ),
1994
    {lists:reverse(RevNotExpiredBatch), lists:reverse(RevNotExpiredResults), ExpiredBatch};
×
1995
sieve_expired_requests_with_results(Batch, Now, Result) ->
1996
    %% one result for the whole batch, we just pass it along and
1997
    %% `batch_reply_caller_defer_metrics' will expand it
1998
    {NotExpiredBatch, ExpiredBatch} = sieve_expired_requests(Batch, Now),
×
1999
    {NotExpiredBatch, ExpiredBatch, Result}.
×
2000

2001
-spec is_expired(infinity | integer(), integer()) -> boolean().
2002
is_expired(infinity = _ExpireAt, _Now) ->
2003
    false;
204✔
2004
is_expired(ExpireAt, Now) ->
2005
    Now > ExpireAt.
66,577✔
2006

2007
now_() ->
2008
    erlang:monotonic_time(nanosecond).
88,971✔
2009

2010
-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
2011
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
2012
    Opts;
67,411✔
2013
ensure_timeout_query_opts(#{} = Opts0, sync) ->
2014
    Opts0#{timeout => ?DEFAULT_REQUEST_TTL};
22✔
2015
ensure_timeout_query_opts(#{} = Opts0, async) ->
2016
    Opts0#{timeout => infinity}.
×
2017

2018
-spec ensure_expire_at(query_opts()) -> query_opts().
2019
ensure_expire_at(#{expire_at := _} = Opts) ->
2020
    Opts;
×
2021
ensure_expire_at(#{timeout := infinity} = Opts) ->
2022
    Opts#{expire_at => infinity};
633✔
2023
ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
2024
    TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
67,429✔
2025
    ExpireAt = now_() + TimeoutNS,
67,429✔
2026
    Opts#{expire_at => ExpireAt}.
67,429✔
2027

2028
%% no need to keep the request for async reply handler
2029
minimize(?QUERY(_, _, _, _) = Q) ->
2030
    do_minimize(Q);
490✔
2031
minimize(L) when is_list(L) ->
2032
    lists:map(fun do_minimize/1, L).
230✔
2033

2034
-ifdef(TEST).
2035
do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
743✔
2036
-else.
2037
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
2038
-endif.
2039

2040
%% To avoid message loss due to misconfigurations, we adjust
2041
%% `batch_time' based on `request_ttl'.  If `batch_time' >
2042
%% `request_ttl', all requests will timeout before being sent if
2043
%% the message rate is low.  Even worse if `pool_size' is high.
2044
%% We cap `batch_time' at `request_ttl div 2' as a rule of thumb.
2045
adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) ->
2046
    BatchTime0;
5✔
2047
adjust_batch_time(Id, RequestTTL, BatchTime0) ->
2048
    BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
13,295✔
2049
    case BatchTime =:= BatchTime0 of
13,295✔
2050
        false ->
2051
            ?SLOG(info, #{
3✔
2052
                id => Id,
2053
                msg => "adjusting_buffer_worker_batch_time",
2054
                new_batch_time => BatchTime
2055
            });
3✔
2056
        true ->
2057
            ok
13,292✔
2058
    end,
2059
    BatchTime.
13,295✔
2060

2061
replayq_opts(Id, Index, Opts) ->
2062
    BufferMode = maps:get(buffer_mode, Opts, memory_only),
13,296✔
2063
    TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES),
13,296✔
2064
    case BufferMode of
13,296✔
2065
        memory_only ->
2066
            #{
13,296✔
2067
                mem_only => true,
2068
                marshaller => fun ?MODULE:queue_item_marshaller/1,
2069
                max_total_bytes => TotalBytes,
2070
                sizer => fun ?MODULE:estimate_size/1
2071
            };
2072
        volatile_offload ->
2073
            SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes),
×
2074
            SegBytes = min(SegBytes0, TotalBytes),
×
2075
            #{
×
2076
                dir => disk_queue_dir(Id, Index),
2077
                marshaller => fun ?MODULE:queue_item_marshaller/1,
2078
                max_total_bytes => TotalBytes,
2079
                %% we don't want to retain the queue after
2080
                %% resource restarts.
2081
                offload => {true, volatile},
2082
                seg_bytes => SegBytes,
2083
                sizer => fun ?MODULE:estimate_size/1
2084
            }
2085
    end.
2086

2087
%% The request timeout should be greater than the resume interval, as
2088
%% it defines how often the buffer worker tries to unblock. If request
2089
%% timeout is <= resume interval and the buffer worker is ever
2090
%% blocked, than all queued requests will basically fail without being
2091
%% attempted.
2092
-spec default_resume_interval(request_ttl(), health_check_interval()) -> timeout_ms().
2093
default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) ->
2094
    max(1, HealthCheckInterval);
4✔
2095
default_resume_interval(RequestTTL, HealthCheckInterval) ->
2096
    max(1, min(HealthCheckInterval, RequestTTL div 3)).
13,292✔
2097

2098
-spec reply_call(reference(), term()) -> ok.
2099
reply_call(Alias, Response) ->
2100
    %% Since we use a reference created with `{alias,
2101
    %% reply_demonitor}', after we `demonitor' it in case of a
2102
    %% timeout, we won't send any more messages that the caller is not
2103
    %% expecting anymore.  Using `gen_statem:reply({pid(),
2104
    %% reference()}, _)' would still send a late reply even after the
2105
    %% demonitor.
2106
    erlang:send(Alias, {Alias, Response}),
3,788✔
2107
    ok.
3,788✔
2108

2109
%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
2110
%% callbacks.
2111
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
2112
    ?MODULE:reply_call(ReplyAlias, Response),
119✔
2113
    do_reply_caller(MaybeReplyTo, Response).
119✔
2114

2115
-ifdef(TEST).
2116
-include_lib("eunit/include/eunit.hrl").
2117
adjust_batch_time_test_() ->
2118
    %% just for logging
2119
    Id = some_id,
2✔
2120
    [
2✔
2121
        {"batch time smaller than request_time/2",
2122
            ?_assertEqual(
1✔
2123
                100,
1✔
2124
                adjust_batch_time(Id, 500, 100)
1✔
2125
            )},
2126
        {"batch time equal to request_time/2",
2127
            ?_assertEqual(
1✔
2128
                100,
1✔
2129
                adjust_batch_time(Id, 200, 100)
1✔
2130
            )},
2131
        {"batch time greater than request_time/2",
2132
            ?_assertEqual(
1✔
2133
                50,
1✔
2134
                adjust_batch_time(Id, 100, 100)
1✔
2135
            )},
2136
        {"batch time smaller than request_time/2 (request_time = infinity)",
2137
            ?_assertEqual(
1✔
2138
                100,
1✔
2139
                adjust_batch_time(Id, infinity, 100)
1✔
2140
            )}
2141
    ].
2142
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc