• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.07
/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% A hierarchical token bucket algorithm
18
%% Note: this is not the linux HTB algorithm(http://luxik.cdi.cz/~devik/qos/htb/manual/theory.htm)
19
%% Algorithm:
20
%% 1. the root node periodically generates tokens and then distributes them
21
%% just like the oscillation of water waves
22
%% 2. the leaf node has a counter, which is the place where the token is actually held.
23
%% 3. other nodes only play the role of transmission, and the rate of the node is like a valve,
24
%% limiting the oscillation transmitted from the parent node
25

26
-module(emqx_limiter_server).
27

28
-behaviour(gen_server).
29

30
-include_lib("emqx/include/logger.hrl").
31

32
%% gen_server callbacks
33
-export([
34
    init/1,
35
    handle_call/3,
36
    handle_cast/2,
37
    handle_info/2,
38
    terminate/2,
39
    code_change/3,
40
    format_status/2
41
]).
42

43
-export([
44
    start_link/2,
45
    connect/3,
46
    add_bucket/3,
47
    del_bucket/2,
48
    get_initial_val/1,
49
    whereis/1,
50
    info/1,
51
    name/1,
52
    restart/1,
53
    update_config/2
54
]).
55

56
%% number of tokens generated per period
57
-type root() :: #{
58
    rate := rate(),
59
    burst := rate(),
60
    %% token generation interval(second)
61
    period := pos_integer(),
62
    produced := float(),
63
    correction := emqx_limiter_decimal:zero_or_float()
64
}.
65

66
-type bucket() :: #{
67
    name := bucket_name(),
68
    rate := rate(),
69
    obtained := float(),
70
    %% token correction value
71
    correction := emqx_limiter_decimal:zero_or_float(),
72
    capacity := capacity(),
73
    counter := undefined | counters:counters_ref(),
74
    index := undefined | index()
75
}.
76

77
-type state() :: #{
78
    type := limiter_type(),
79
    root := root(),
80
    buckets := buckets(),
81
    %% current counter to alloc
82
    counter := counters:counters_ref(),
83
    index := 0 | index()
84
}.
85

86
-type buckets() :: #{bucket_name() => bucket()}.
87
-type limiter_type() :: emqx_limiter_schema:limiter_type().
88
-type bucket_name() :: emqx_limiter_schema:bucket_name().
89
-type limiter_id() :: emqx_limiter_schema:limiter_id().
90
-type rate() :: decimal().
91
-type flow() :: decimal().
92
-type capacity() :: decimal().
93
-type decimal() :: emqx_limiter_decimal:decimal().
94
-type index() :: pos_integer().
95

96
-define(CALL(Type, Msg), call(Type, Msg)).
97
-define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)).
98

99
%% minimum coefficient for overloaded limiter
100
-define(OVERLOAD_MIN_ALLOC, 0.3).
101
-define(COUNTER_SIZE, 8).
102
-define(ROOT_COUNTER_IDX, 1).
103

104
-export_type([index/0]).
105
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
106

107
-elvis([{elvis_style, no_if_expression, disable}]).
108

109
%%--------------------------------------------------------------------
110
%% API
111
%%--------------------------------------------------------------------
112
-spec connect(
113
    limiter_id(),
114
    limiter_type(),
115
    hocon:config() | undefined
116
) ->
117
    {ok, emqx_htb_limiter:limiter()} | {error, _}.
118
%% undefined is the default situation, no limiter setting by default
119
connect(Id, Type, undefined) ->
120
    create_limiter(Id, Type, undefined, undefined);
26,390✔
121
connect(Id, Type, #{rate := _} = Cfg) ->
122
    create_limiter(Id, Type, maps:get(client, Cfg, undefined), Cfg);
235✔
123
connect(Id, Type, Cfg) ->
124
    create_limiter(
481✔
125
        Id,
126
        Type,
127
        emqx_utils_maps:deep_get([client, Type], Cfg, undefined),
128
        maps:get(Type, Cfg, undefined)
129
    ).
130

131
-spec add_bucket(limiter_id(), limiter_type(), hocon:config() | undefined) -> ok.
132
add_bucket(_Id, _Type, undefined) ->
133
    ok;
1✔
134
%% a bucket with an infinity rate shouldn't be added to this server, because it is always full
135
add_bucket(_Id, _Type, #{rate := infinity}) ->
136
    ok;
225✔
137
add_bucket(Id, Type, Cfg) ->
138
    ?CALL(Type, {add_bucket, Id, Cfg}).
14✔
139

140
-spec del_bucket(limiter_id(), limiter_type()) -> ok.
141
del_bucket(Id, Type) ->
142
    ?CALL(Type, {del_bucket, Id}).
213✔
143

144
-spec info(limiter_type()) -> state() | {error, _}.
145
info(Type) ->
146
    ?CALL(Type).
1✔
147

148
-spec name(limiter_type()) -> atom().
149
name(Type) ->
150
    erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
6,180✔
151

152
-spec restart(limiter_type()) -> ok | {error, _}.
153
restart(Type) ->
154
    ?CALL(Type).
6✔
155

156
-spec update_config(limiter_type(), hocon:config()) -> ok | {error, _}.
157
update_config(Type, Config) ->
158
    ?CALL(Type, {update_config, Type, Config}).
×
159

160
-spec whereis(limiter_type()) -> pid() | undefined.
161
whereis(Type) ->
162
    erlang:whereis(name(Type)).
238✔
163

164
%%--------------------------------------------------------------------
165
%% @doc
166
%% Starts the server
167
%% @end
168
%%--------------------------------------------------------------------
169
-spec start_link(limiter_type(), hocon:config()) -> _.
170
start_link(Type, Cfg) ->
171
    gen_server:start_link({local, name(Type)}, ?MODULE, [Type, Cfg], []).
2,970✔
172

173
%%--------------------------------------------------------------------
174
%%% gen_server callbacks
175
%%--------------------------------------------------------------------
176

177
%%--------------------------------------------------------------------
178
%% @private
179
%% @doc
180
%% Initializes the server
181
%% @end
182
%%--------------------------------------------------------------------
183
-spec init(Args :: term()) ->
184
    {ok, State :: term()}
185
    | {ok, State :: term(), Timeout :: timeout()}
186
    | {ok, State :: term(), hibernate}
187
    | {stop, Reason :: term()}
188
    | ignore.
189
init([Type, Cfg]) ->
190
    State = init_tree(Type, Cfg),
2,970✔
191
    #{root := #{period := Perido}} = State,
2,970✔
192
    oscillate(Perido),
2,970✔
193
    {ok, State}.
2,970✔
194

195
%%--------------------------------------------------------------------
196
%% @private
197
%% @doc
198
%% Handling call messages
199
%% @end
200
%%--------------------------------------------------------------------
201
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
202
    {reply, Reply :: term(), NewState :: term()}
203
    | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
204
    | {reply, Reply :: term(), NewState :: term(), hibernate}
205
    | {noreply, NewState :: term()}
206
    | {noreply, NewState :: term(), Timeout :: timeout()}
207
    | {noreply, NewState :: term(), hibernate}
208
    | {stop, Reason :: term(), Reply :: term(), NewState :: term()}
209
    | {stop, Reason :: term(), NewState :: term()}.
210
handle_call(info, _From, State) ->
211
    {reply, State, State};
1✔
212
handle_call(restart, _From, #{type := Type}) ->
213
    NewState = init_tree(Type),
6✔
214
    {reply, ok, NewState};
6✔
215
handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
216
    NewState = init_tree(Type, Config),
×
217
    {reply, ok, NewState};
×
218
handle_call({add_bucket, Id, Cfg}, _From, State) ->
219
    NewState = do_add_bucket(Id, Cfg, State),
14✔
220
    {reply, ok, NewState};
14✔
221
handle_call({del_bucket, Id}, _From, State) ->
222
    NewState = do_del_bucket(Id, State),
212✔
223
    {reply, ok, NewState};
212✔
224
handle_call(Req, _From, State) ->
225
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
1✔
226
    {reply, ignored, State}.
1✔
227

228
%%--------------------------------------------------------------------
229
%% @private
230
%% @doc
231
%% Handling cast messages
232
%% @end
233
%%--------------------------------------------------------------------
234
-spec handle_cast(Request :: term(), State :: term()) ->
235
    {noreply, NewState :: term()}
236
    | {noreply, NewState :: term(), Timeout :: timeout()}
237
    | {noreply, NewState :: term(), hibernate}
238
    | {stop, Reason :: term(), NewState :: term()}.
239
handle_cast(Req, State) ->
240
    ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
1✔
241
    {noreply, State}.
1✔
242

243
%%--------------------------------------------------------------------
244
%% @private
245
%% @doc
246
%% Handling all non call/cast messages
247
%% @end
248
%%--------------------------------------------------------------------
249
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
250
    {noreply, NewState :: term()}
251
    | {noreply, NewState :: term(), Timeout :: timeout()}
252
    | {noreply, NewState :: term(), hibernate}
253
    | {stop, Reason :: normal | term(), NewState :: term()}.
254
handle_info(oscillate, State) ->
255
    {noreply, oscillation(State)};
313,072✔
256
handle_info(Info, State) ->
257
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
1✔
258
    {noreply, State}.
1✔
259

260
%%--------------------------------------------------------------------
261
%% @private
262
%% @doc
263
%% This function is called by a gen_server when it is about to
264
%% terminate. It should be the opposite of Module:init/1 and do any
265
%% necessary cleaning up. When it returns, the gen_server terminates
266
%% with Reason. The return value is ignored.
267
%% @end
268
%%--------------------------------------------------------------------
269
-spec terminate(
270
    Reason :: normal | shutdown | {shutdown, term()} | term(),
271
    State :: term()
272
) -> any().
273
terminate(_Reason, #{type := Type}) ->
274
    emqx_limiter_manager:delete_root(Type),
×
275
    ok.
×
276

277
%%--------------------------------------------------------------------
278
%% @private
279
%% @doc
280
%% Convert process state when code is changed
281
%% @end
282
%%--------------------------------------------------------------------
283
-spec code_change(
284
    OldVsn :: term() | {down, term()},
285
    State :: term(),
286
    Extra :: term()
287
) ->
288
    {ok, NewState :: term()}
289
    | {error, Reason :: term()}.
290
code_change(_OldVsn, State, _Extra) ->
291
    {ok, State}.
×
292

293
%%--------------------------------------------------------------------
294
%% @private
295
%% @doc
296
%% This function is called for changing the form and appearance
297
%% of gen_server status when it is returned from sys:get_status/1,2
298
%% or when it appears in termination error logs.
299
%% @end
300
%%--------------------------------------------------------------------
301
-spec format_status(
302
    Opt :: normal | terminate,
303
    Status :: list()
304
) -> Status :: term().
305
format_status(_Opt, Status) ->
306
    Status.
1✔
307

308
%%--------------------------------------------------------------------
309
%%% Internal functions
310
%%--------------------------------------------------------------------
311
oscillate(Interval) ->
312
    erlang:send_after(Interval, self(), ?FUNCTION_NAME).
316,042✔
313

314
%% @doc generate tokens, and then spread to leaf nodes
315
-spec oscillation(state()) -> state().
316
oscillation(
317
    #{
318
        root := #{
319
            rate := Flow,
320
            period := Interval,
321
            produced := Produced
322
        } = Root,
323
        buckets := Buckets
324
    } = State
325
) ->
326
    oscillate(Interval),
313,072✔
327
    Ordereds = get_ordered_buckets(Buckets),
313,072✔
328
    {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
313,072✔
329
    State2 = maybe_adjust_root_tokens(
313,072✔
330
        State#{
331
            buckets := Buckets2,
332
            root := Root#{produced := Produced + Alloced}
333
        },
334
        Alloced
335
    ),
336
    maybe_burst(State2).
313,072✔
337

338
%% @doc horizontal spread
339
-spec transverse(
340
    list(bucket()),
341
    flow(),
342
    float(),
343
    buckets()
344
) -> {float(), buckets()}.
345
transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
346
    {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
120✔
347
    InFlow2 = sub(InFlow, BucketAlloced),
120✔
348
    Alloced2 = Alloced + BucketAlloced,
120✔
349
    transverse(T, InFlow2, Alloced2, Buckets2);
120✔
350
transverse(_, _, Alloced, Buckets) ->
351
    {Alloced, Buckets}.
313,072✔
352

353
%% @doc vertical spread
354
-spec longitudinal(bucket(), flow(), buckets()) ->
355
    {float(), buckets()}.
356
longitudinal(
357
    #{
358
        name := Name,
359
        rate := Rate,
360
        capacity := Capacity,
361
        counter := Counter,
362
        index := Index,
363
        obtained := Obtained
364
    } = Bucket,
365
    InFlow,
366
    Buckets
367
) when Counter =/= undefined ->
368
    Flow = erlang:min(InFlow, Rate),
120✔
369

370
    ShouldAlloc =
120✔
371
        case counters:get(Counter, Index) of
372
            Tokens when Tokens < 0 ->
373
                %% toknes's value mayb be a negative value(stolen from the future)
374
                %% because ∃ x. add(Capacity, x) < 0, so here we must compare with minimum value
UNCOV
375
                erlang:max(
×
376
                    add(Capacity, Tokens),
377
                    mul(Capacity, ?OVERLOAD_MIN_ALLOC)
378
                );
379
            Tokens ->
380
                %% is it possible that Tokens > Capacity ???
381
                erlang:max(sub(Capacity, Tokens), 0)
120✔
382
        end,
383

384
    case lists:min([ShouldAlloc, Flow, Capacity]) of
120✔
385
        Available when Available > 0 ->
386
            {Inc, Bucket2} = emqx_limiter_decimal:precisely_add(Available, Bucket),
114✔
387
            counters:add(Counter, Index, Inc),
114✔
388

389
            {Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
114✔
390
        _ ->
391
            {0, Buckets}
6✔
392
    end;
393
longitudinal(_, _, Buckets) ->
394
    {0, Buckets}.
×
395

396
-spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()).
397
get_ordered_buckets(Buckets) when is_map(Buckets) ->
398
    BucketList = maps:values(Buckets),
313,072✔
399
    get_ordered_buckets(BucketList);
313,072✔
400
get_ordered_buckets(Buckets) ->
401
    %% sort by obtained, avoid node goes hungry
402
    lists:sort(
313,072✔
403
        fun(#{obtained := A}, #{obtained := B}) ->
404
            A < B
82✔
405
        end,
406
        Buckets
407
    ).
408

409
-spec maybe_adjust_root_tokens(state(), float()) -> state().
410
maybe_adjust_root_tokens(#{root := #{rate := infinity}} = State, _Alloced) ->
411
    State;
313,016✔
412
maybe_adjust_root_tokens(#{root := #{rate := Rate}} = State, Alloced) when Alloced >= Rate ->
413
    State;
41✔
414
maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} = State, Alloced) ->
415
    InFlow = Rate - Alloced,
15✔
416
    Token = counters:get(Counter, ?ROOT_COUNTER_IDX),
15✔
417
    case Token >= Rate of
15✔
418
        true ->
419
            State;
12✔
420
        _ ->
421
            Available = erlang:min(Rate - Token, InFlow),
3✔
422
            {Inc, Root2} = emqx_limiter_decimal:precisely_add(Available, Root),
3✔
423
            counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
3✔
424
            State#{root := Root2}
3✔
425
    end.
426

427
-spec maybe_burst(state()) -> state().
428
maybe_burst(
429
    #{
430
        buckets := Buckets,
431
        root := #{burst := Burst}
432
    } = State
433
) when Burst > 0 ->
434
    Fold = fun
26✔
435
        (_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
436
            case counters:get(Cnt, Idx) > 0 of
63✔
437
                true ->
438
                    Acc;
23✔
439
                false ->
440
                    [Bucket | Acc]
40✔
441
            end;
442
        (_Name, _Bucket, Acc) ->
443
            Acc
×
444
    end,
445

446
    Empties = maps:fold(Fold, [], Buckets),
26✔
447
    dispatch_burst(Empties, Burst, State);
26✔
448
maybe_burst(State) ->
449
    State.
313,046✔
450

451
-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state().
452
dispatch_burst([], _, State) ->
453
    State;
6✔
454
dispatch_burst(
455
    Empties,
456
    InFlow,
457
    #{root := #{produced := Produced} = Root, buckets := Buckets} = State
458
) ->
459
    EachFlow = InFlow / erlang:length(Empties),
20✔
460
    {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
20✔
461
    State#{root := Root#{produced := Produced + Alloced}, buckets := Buckets2}.
20✔
462

463
-spec dispatch_burst_to_buckets(
464
    list(bucket()),
465
    float(),
466
    non_neg_integer(),
467
    buckets()
468
) -> {non_neg_integer(), buckets()}.
469
dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
470
    #{
40✔
471
        name := Name,
472
        counter := Counter,
473
        index := Index,
474
        obtained := Obtained
475
    } = Bucket,
476
    {Inc, Bucket2} = emqx_limiter_decimal:precisely_add(InFlow, Bucket),
40✔
477

478
    counters:add(Counter, Index, Inc),
40✔
479

480
    Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}},
40✔
481
    dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2);
40✔
482
dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
483
    {Alloced, Buckets}.
20✔
484

485
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
486
init_tree(Type) when is_atom(Type) ->
487
    Cfg = emqx_limiter_utils:get_node_opts(Type),
6✔
488
    init_tree(Type, Cfg).
6✔
489

490
init_tree(Type, #{rate := Rate} = Cfg) ->
491
    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
2,976✔
492
    RootBucket = emqx_limiter_bucket_ref:new(Counter, ?ROOT_COUNTER_IDX, Rate),
2,976✔
493
    emqx_limiter_manager:insert_root(Type, RootBucket),
2,976✔
494
    #{
2,976✔
495
        type => Type,
496
        root => make_root(Cfg),
497
        counter => Counter,
498
        %% The first slot is reserved for the root
499
        index => ?ROOT_COUNTER_IDX,
500
        buckets => #{}
501
    }.
502

503
-spec make_root(hocon:config()) -> root().
504
make_root(#{rate := Rate, burst := Burst}) ->
505
    #{
2,976✔
506
        rate => Rate,
507
        burst => Burst,
508
        period => emqx_limiter_schema:default_period(),
509
        produced => 0.0,
510
        correction => 0
511
    }.
512

513
do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
514
    case maps:get(Id, Buckets, undefined) of
14✔
515
        undefined ->
516
            make_bucket(Id, Cfg, State);
14✔
517
        Bucket ->
518
            Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_utils:calc_capacity(Cfg)},
×
519
            State#{buckets := Buckets#{Id := Bucket2}}
×
520
    end.
521

522
make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) ->
523
    make_bucket(Id, Cfg, State#{
×
524
        counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
525
        index => 0
526
    });
527
make_bucket(
528
    Id,
529
    #{rate := Rate} = Cfg,
530
    #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
531
) ->
532
    NewIndex = Index + 1,
14✔
533
    Initial = get_initial_val(Cfg),
14✔
534
    Bucket = #{
14✔
535
        name => Id,
536
        rate => Rate,
537
        obtained => Initial,
538
        correction => 0,
539
        capacity => emqx_limiter_utils:calc_capacity(Cfg),
540
        counter => Counter,
541
        index => NewIndex
542
    },
543
    _ = put_to_counter(Counter, NewIndex, Initial),
14✔
544
    Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate),
14✔
545
    emqx_limiter_manager:insert_bucket(Id, Type, Ref),
14✔
546
    State#{buckets := Buckets#{Id => Bucket}, index := NewIndex}.
14✔
547

548
do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
549
    case maps:get(Id, Buckets, undefined) of
212✔
550
        undefined ->
551
            State;
198✔
552
        _ ->
553
            emqx_limiter_manager:delete_bucket(Id, Type),
14✔
554
            State#{buckets := maps:remove(Id, Buckets)}
14✔
555
    end.
556

557
-spec get_initial_val(hocon:config()) -> decimal().
558
get_initial_val(
559
    #{
560
        initial := Initial,
561
        rate := Rate
562
    }
563
) ->
564
    if
235✔
565
        Initial > 0 ->
566
            Initial;
56✔
567
        Rate =/= infinity ->
568
            Rate;
179✔
569
        true ->
570
            0
×
571
    end.
572

573
-spec call(limiter_type(), any()) -> {error, _} | _.
574
call(Type, Msg) ->
575
    case ?MODULE:whereis(Type) of
234✔
576
        undefined ->
577
            {error, limiter_not_started};
1✔
578
        Pid ->
579
            gen_server:call(Pid, Msg)
233✔
580
    end.
581

582
create_limiter(Id, Type, #{rate := Rate} = ClientCfg, BucketCfg) when Rate =/= infinity ->
583
    create_limiter_with_client(Id, Type, ClientCfg, BucketCfg);
242✔
584
create_limiter(Id, Type, _, BucketCfg) ->
585
    create_limiter_without_client(Id, Type, BucketCfg).
26,864✔
586

587
%% create a limiter with the client-level configuration
588
create_limiter_with_client(Id, Type, ClientCfg, BucketCfg) ->
589
    case find_referenced_bucket(Id, Type, BucketCfg) of
242✔
590
        false ->
591
            {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, infinity)};
87✔
592
        {ok, Bucket, RefCfg} ->
593
            create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
155✔
594
        Error ->
595
            Error
×
596
    end.
597

598
%% create a limiter only with the referenced configuration
599
create_limiter_without_client(Id, Type, BucketCfg) ->
600
    case find_referenced_bucket(Id, Type, BucketCfg) of
26,864✔
601
        false ->
602
            {ok, emqx_htb_limiter:make_infinity_limiter()};
26,855✔
603
        {ok, Bucket, RefCfg} ->
604
            ClientCfg = emqx_limiter_utils:default_client_config(),
8✔
605
            create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
8✔
606
        Error ->
607
            Error
1✔
608
    end.
609

610
create_limiter_with_ref(
611
    Bucket,
612
    #{rate := CliRate} = ClientCfg,
613
    #{rate := RefRate}
614
) when CliRate < RefRate ->
615
    {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, Bucket)};
134✔
616
create_limiter_with_ref(Bucket, ClientCfg, _) ->
617
    {ok, emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)}.
29✔
618

619
%% this is a listener(server)-level reference
620
find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity ->
621
    case emqx_limiter_manager:find_bucket(Id, Type) of
162✔
622
        {ok, Bucket} ->
623
            {ok, Bucket, Cfg};
161✔
624
        _ ->
625
            ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
1✔
626
            {error, invalid_bucket}
1✔
627
    end;
628
%% this is a node-level reference
629
find_referenced_bucket(_Id, Type, _) ->
630
    case emqx_limiter_utils:get_node_opts(Type) of
26,944✔
631
        #{rate := infinity} ->
632
            false;
26,942✔
633
        NodeCfg ->
634
            {ok, Bucket} = emqx_limiter_manager:find_root(Type),
2✔
635
            {ok, Bucket, NodeCfg}
2✔
636
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc