• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 532

19 Sep 2025 07:28AM UTC coverage: 67.398%. First build
532

Pull #212

github

web-flow
Merge 30a32a7d2 into 45a2aab81
Pull Request #212: fix: shared limiter overrun

23 of 24 new or added lines in 1 file covered. (95.83%)

707 of 1049 relevant lines covered (67.4%)

158.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.59
/src/esockd_limiter.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% @doc A simple ets-based rate limit server.
18
-module(esockd_limiter).
19

20
-behaviour(gen_server).
21

22
-export([ start_link/0
23
        , get_all/0
24
        , stop/0
25
        ]).
26

27
-export([ create/2
28
        , create/3
29
        , update/2
30
        , update/3
31
        , lookup/1
32
        , consume/1
33
        , consume/2
34
        , delete/1
35
        ]).
36

37
%% gen_server callbacks
38
-export([ init/1
39
        , handle_call/3
40
        , handle_cast/2
41
        , handle_info/2
42
        , terminate/2
43
        , code_change/3
44
        ]).
45

46
-type(bucket_name() :: term()).
47

48
-type(bucket_info() :: #{name      => bucket_name(),
49
                         capacity  => pos_integer(),
50
                         interval  => pos_integer(),
51
                         tokens    => pos_integer(),
52
                         lasttime  => integer()
53
                        }).
54

55
-export_type([bucket_info/0]).
56

57
-define(TAB, ?MODULE).
58
-define(SERVER, ?MODULE).
59
-define(MAX_INTERVAL, 86400000).
60

61
%%--------------------------------------------------------------------
62
%% APIs
63
%%--------------------------------------------------------------------
64

65
-spec(start_link() -> {ok, pid()}).
66
start_link() ->
67
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
27✔
68

69
-spec(get_all() -> list(bucket_info())).
70
get_all() ->
71
    [bucket_info(Bucket) || Bucket = {{bucket, _}, _, _, _} <- ets:tab2list(?TAB)].
3✔
72

73
bucket_info({{bucket, Name}, Capacity, Interval, LastTime}) ->
74
    #{name     => Name,
60✔
75
      capacity => Capacity,
76
      interval => Interval,
77
      tokens   => tokens(Name),
78
      lasttime => LastTime
79
     }.
80

81
tokens(Name) ->
82
    ets:lookup_element(?TAB, {tokens, Name}, 2).
60✔
83

84
-spec(stop() -> ok).
85
stop() ->
86
    gen_server:stop(?SERVER).
12✔
87

88
-spec(create(bucket_name(), pos_integer()) -> ok).
89
create(Name, Capacity) when is_integer(Capacity), Capacity > 0 ->
90
    create(Name, Capacity, 1).
9✔
91

92
-spec(create(bucket_name(), pos_integer(), pos_integer()) -> ok).
93
create(Name, Capacity, Interval) when is_integer(Capacity), Capacity > 0,
94
                                      is_integer(Interval), Interval > 0 ->
95
    gen_server:call(?SERVER, {create, Name, Capacity, Interval}).
45✔
96

97
-spec(update(bucket_name(), pos_integer()) -> ok).
98
update(Name, Capacity) when is_integer(Capacity), Capacity > 0 ->
99
    create(Name, Capacity, 1).
×
100

101
-spec(update(bucket_name(), pos_integer(), pos_integer()) -> ok).
102
update(Name, Capacity, Interval) when is_integer(Capacity), Capacity > 0,
103
                                      is_integer(Interval), Interval > 0 ->
104
    gen_server:call(?SERVER, {update, Name, Capacity, Interval}).
3✔
105

106
-spec(lookup(bucket_name()) -> undefined | bucket_info()).
107
lookup(Name) ->
108
    case ets:lookup(?TAB, {bucket, Name}) of
60✔
109
        [] -> undefined;
6✔
110
        [Bucket] -> bucket_info(Bucket)
54✔
111
    end.
112

113
-spec(consume(bucket_name())
114
      -> {Remaining :: integer(), PasueMillSec :: integer()}).
115
consume(Name) ->
116
    consume(Name, 1).
3✔
117

118
-spec(consume(bucket_name(), pos_integer()) -> {integer(), integer()}).
119
consume(Name, Tokens) when is_integer(Tokens), Tokens > 0 ->
120
    try ets:update_counter(?TAB, {tokens, Name}, {2, -Tokens}) of
60,111✔
121
        Remaining when Remaining > 0 ->
122
            %% enough tokens, no need to pause
123
            {Remaining, 0};
60,009✔
124
        Remaining ->
125
            %% 0 or negative, not enough tokens. But it has indeed been consumed,
126
            %% which means the token is borrowed from the future. We need to pause to that time.
127
            {Remaining, pause_time(Name, time_now(), Remaining)}
9✔
128
    catch
129
        error:badarg -> {1, 0}
93✔
130
    end.
131

132
%% @private
133
-spec pause_time(bucket_name(), pos_integer(), neg_integer() | 0) -> pos_integer().
134
pause_time(Name, Now, Remaining) ->
135
    case ets:lookup(?TAB, {bucket, Name}) of
9✔
136
        [] -> 1000; %% Pause 1 second if the bucket is deleted.
×
137
        [{_Bucket, Capacity, Interval, LastTime}] ->
138
            %% Remaining might negative or zero.
139
            %% In any case, this means that the token in this cycle has been exhausted,
140
            %% and the current caller must at least pause until the next Token generation cycle
141
            %% BorrowFrom = 1: token borrowed from next cycle
142
            %% BorrowFrom = 2: token borrowed from next next cycle
143
            %% ...etc
144
            %%
145
            %% AND NOTE:
146
            %% 1. The number of consumers is limited
147
            %% 2. The number of Tokens increased at a fixed rate
148
            %% Therefore, consumers are always paused in turn, and the `Pause` value does
149
            %% not increase indefinitely.
150
            BorrowFrom = (abs(Remaining) div Capacity) + 1,
9✔
151

152
            %% The `Now` might be slightly larger than `LastTime` due to concurrent access and
153
            %% function execution time.
154
            %% But we always take `LastTime` as the standard, because it always increases in fixed steps.
155
            %%
156
            %% In this case, the following `Pause` value might be zero or negative,
157
            %% We still consider it to be consuming tokens from the cycle before the `LastTime`.
158
            %% And since `LastTime` will be updated immediately, we pause for at least 1ms.
159
            PauseTime = LastTime + (BorrowFrom * Interval * 1000) - Now,
9✔
160
            max(1, PauseTime)
9✔
161
    end.
162

163
-spec(delete(bucket_name()) -> ok).
164
delete(Name) ->
165
    gen_server:cast(?SERVER, {delete, Name}).
126✔
166

167
%%--------------------------------------------------------------------
168
%% gen_server callbacks
169
%%--------------------------------------------------------------------
170

171
init([]) ->
172
    _ = ets:new(?TAB, [public, set, named_table, {write_concurrency, true}]),
27✔
173
    {ok, #{countdown => #{}, timer => undefined}}.
27✔
174

175
handle_call({create, Name, Capacity, Interval}, _From, State = #{countdown := Countdown}) ->
176
    true = ets:insert(?TAB, {{tokens, Name}, Capacity}),
45✔
177
    true = ets:insert(?TAB, {{bucket, Name}, Capacity, Interval, erlang:system_time(millisecond)}),
45✔
178
    NCountdown = maps:put({bucket, Name}, Interval, Countdown),
45✔
179
    {reply, ok, ensure_countdown_timer(State#{countdown := NCountdown})};
45✔
180

181
handle_call({update, Name, Capacity, Interval}, _From, State = #{countdown := Countdown}) ->
182
    BucketName = {bucket, Name},
3✔
183
    true = ets:insert(?TAB, {{tokens, Name}, Capacity}),
3✔
184
    true = ets:insert(?TAB, {BucketName, Capacity, Interval, erlang:system_time(millisecond)}),
3✔
185
    LastInterval = maps:get(BucketName, Countdown, ?MAX_INTERVAL),
3✔
186
    NewInterval = erlang:min(Interval, LastInterval),
3✔
187
    NewCountdown = maps:put(BucketName, NewInterval, Countdown),
3✔
188
    {reply, ok, ensure_countdown_timer(State#{countdown := NewCountdown})};
3✔
189

190
handle_call(Req, _From, State) ->
191
    error_logger:error_msg("Unexpected call: ~p", [Req]),
3✔
192
    {reply, ignore, State}.
3✔
193

194
handle_cast({delete, Name}, State = #{countdown := Countdown}) ->
195
    true = ets:delete(?TAB, {bucket, Name}),
126✔
196
    true = ets:delete(?TAB, {tokens, Name}),
126✔
197
    NCountdown = maps:remove({bucket, Name}, Countdown),
126✔
198
    {noreply, State#{countdown := NCountdown}};
126✔
199

200
handle_cast(Msg, State) ->
201
    error_logger:error_msg("Unexpected cast: ~p~n", [Msg]),
3✔
202
    {noreply, State}.
3✔
203

204
handle_info({timeout, Timer, countdown}, State = #{countdown := Countdown, timer := Timer}) ->
205
    Now = time_now(),
6✔
206
    {Countdown1, StrictNow} =
6✔
207
        maps:fold(
208
            fun(Key = {bucket, Name}, 1, {AccIn, _}) ->
209
                [{_Key, Capacity, Interval, LastTime}] = ets:lookup(?TAB, Key),
3✔
210
                    %% Intolerant function execution time deviation.
211
                    %% The `LastTime` value must be updated strictly in milliseconds using Interval * 1000.
212
                    %%
213
                    %% Taking this into account, `schedule_time/2` is used to calculate the time of the next update.
214
                    %% And the `StrictNow` value calculated from any bucket can be used
215
                    %% to calculate the duration of the timer.
216
                    %%
217
                    %% Bucket creation does not always coincide with the current timer period.
218
                    %% We accept a 1000ms deviation between `Now` and `StrictNow`,
219
                    %% it still correctly generates tokens according to the period on a second scale.
220
                    StrictNow = LastTime + Interval * 1000,
3✔
221

222
                    %% Generate tokens in interval, and the current tokens might be negative
223
                    %% (already borrowed by previous interval), add the Capacity value to it and
224
                    %% set an overflow threshold.
225
                    Incr = Threshold = SetValue = Capacity,
3✔
226
                    _ = ets:update_counter(?TAB, {tokens, Name}, {2, Incr, Threshold, SetValue}),
3✔
227
                    true = ets:update_element(?TAB, {bucket, Name}, {4, StrictNow}),
3✔
228

229
                    {AccIn#{Key => Interval}, StrictNow};
3✔
230
               (Key, C, {AccIn, StrictNow}) when C > 1 ->
231
                    {AccIn#{Key => C - 1}, StrictNow}
3✔
232
            end,
233
            {#{}, undefined},
234
            Countdown
235
        ),
236
    ScheduleTime = schedule_time(Now, StrictNow),
6✔
237
    NState = State#{countdown := Countdown1, timer := undefined},
6✔
238
    {noreply, ensure_countdown_timer(NState, ScheduleTime)};
6✔
239

240
handle_info(Info, State) ->
241
    error_logger:error_msg("Unexpected info: ~p~n", [Info]),
3✔
242
    {noreply, State}.
3✔
243

244
terminate(_Reason, _State) ->
245
    ok.
12✔
246

247
code_change(_OldVsn, State, _Extra) ->
248
    {ok, State}.
3✔
249

250
%%--------------------------------------------------------------------
251
%% Internal functions
252
%%--------------------------------------------------------------------
253

254
time_now() ->
255
    erlang:system_time(millisecond).
15✔
256

257
schedule_time(_Now, undefined) ->
258
    1000;
3✔
259
schedule_time(Now, StrictNow) ->
260
    StrictNow + 1000 - Now.
3✔
261

262
ensure_countdown_timer(State = #{timer := undefined}) ->
263
    ensure_countdown_timer(State, timer:seconds(1));
21✔
264
ensure_countdown_timer(State = #{timer := _TRef}) ->
265
    State.
27✔
266

267
ensure_countdown_timer(State = #{timer := undefined}, Time) when Time > 0 ->
268
    TRef = erlang:start_timer(Time, self(), countdown),
27✔
269
    State#{timer := TRef};
27✔
270
ensure_countdown_timer(State = #{timer := _TRef}, _Time) ->
NEW
271
    State.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc