• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / esockd / 531

18 Sep 2025 04:10PM UTC coverage: 67.398%. First build
531

Pull #212

github

web-flow
Merge 2d34b7552 into 45a2aab81
Pull Request #212: fix: shared limiter overrun

23 of 24 new or added lines in 1 file covered. (95.83%)

707 of 1049 relevant lines covered (67.4%)

105.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.59
/src/esockd_limiter.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% @doc A simple ets-based rate limit server.
18
-module(esockd_limiter).
19

20
-behaviour(gen_server).
21

22
-export([ start_link/0
23
        , get_all/0
24
        , stop/0
25
        ]).
26

27
-export([ create/2
28
        , create/3
29
        , update/2
30
        , update/3
31
        , lookup/1
32
        , consume/1
33
        , consume/2
34
        , delete/1
35
        ]).
36

37
%% gen_server callbacks
38
-export([ init/1
39
        , handle_call/3
40
        , handle_cast/2
41
        , handle_info/2
42
        , terminate/2
43
        , code_change/3
44
        ]).
45

46
-type(bucket_name() :: term()).
47

48
-type(bucket_info() :: #{name      => bucket_name(),
49
                         capacity  => pos_integer(),
50
                         interval  => pos_integer(),
51
                         tokens    => pos_integer(),
52
                         lasttime  => integer()
53
                        }).
54

55
-export_type([bucket_info/0]).
56

57
-define(TAB, ?MODULE).
58
-define(SERVER, ?MODULE).
59
-define(MAX_INTERVAL, 86400000).
60

61
%%--------------------------------------------------------------------
62
%% APIs
63
%%--------------------------------------------------------------------
64

65
-spec(start_link() -> {ok, pid()}).
66
start_link() ->
67
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
18✔
68

69
-spec(get_all() -> list(bucket_info())).
70
get_all() ->
71
    [bucket_info(Bucket) || Bucket = {{bucket, _}, _, _, _} <- ets:tab2list(?TAB)].
2✔
72

73
bucket_info({{bucket, Name}, Capacity, Interval, LastTime}) ->
74
    #{name     => Name,
40✔
75
      capacity => Capacity,
76
      interval => Interval,
77
      tokens   => tokens(Name),
78
      lasttime => LastTime
79
     }.
80

81
tokens(Name) ->
82
    ets:lookup_element(?TAB, {tokens, Name}, 2).
40✔
83

84
-spec(stop() -> ok).
85
stop() ->
86
    gen_server:stop(?SERVER).
8✔
87

88
-spec(create(bucket_name(), pos_integer()) -> ok).
89
create(Name, Capacity) when is_integer(Capacity), Capacity > 0 ->
90
    create(Name, Capacity, 1).
6✔
91

92
-spec(create(bucket_name(), pos_integer(), pos_integer()) -> ok).
93
create(Name, Capacity, Interval) when is_integer(Capacity), Capacity > 0,
94
                                      is_integer(Interval), Interval > 0 ->
95
    gen_server:call(?SERVER, {create, Name, Capacity, Interval}).
30✔
96

97
-spec(update(bucket_name(), pos_integer()) -> ok).
98
update(Name, Capacity) when is_integer(Capacity), Capacity > 0 ->
99
    create(Name, Capacity, 1).
×
100

101
-spec(update(bucket_name(), pos_integer(), pos_integer()) -> ok).
102
update(Name, Capacity, Interval) when is_integer(Capacity), Capacity > 0,
103
                                      is_integer(Interval), Interval > 0 ->
104
    gen_server:call(?SERVER, {update, Name, Capacity, Interval}).
2✔
105

106
-spec(lookup(bucket_name()) -> undefined | bucket_info()).
107
lookup(Name) ->
108
    case ets:lookup(?TAB, {bucket, Name}) of
40✔
109
        [] -> undefined;
4✔
110
        [Bucket] -> bucket_info(Bucket)
36✔
111
    end.
112

113
-spec(consume(bucket_name())
114
      -> {Remaining :: integer(), PasueMillSec :: integer()}).
115
consume(Name) ->
116
    consume(Name, 1).
2✔
117

118
-spec(consume(bucket_name(), pos_integer()) -> {integer(), integer()}).
119
consume(Name, Tokens) when is_integer(Tokens), Tokens > 0 ->
120
    try ets:update_counter(?TAB, {tokens, Name}, {2, -Tokens}) of
40,074✔
121
        Remaining when Remaining > 0 ->
122
            %% enough tokens, no need to pause
123
            {Remaining, 0};
40,006✔
124
        Remaining ->
125
            %% 0 or negative, not enough tokens. But it has indeed been consumed,
126
            %% which means the token is borrowed from the future. We need to pause to that time.
127
            {Remaining, pause_time(Name, time_now(), Remaining)}
6✔
128
    catch
129
        error:badarg -> {1, 0}
62✔
130
    end.
131

132
%% @private
133
-spec pause_time(bucket_name(), pos_integer(), neg_integer() | 0) -> pos_integer().
134
pause_time(Name, Now, Remaining) ->
135
    case ets:lookup(?TAB, {bucket, Name}) of
6✔
136
        [] -> 1000; %% Pause 1 second if the bucket is deleted.
×
137
        [{_Bucket, Capacity, Interval, LastTime}] ->
138
            %% Remaining might negative or zero.
139
            %% In any case, this means that the token in this cycle has been exhausted,
140
            %% and the current caller must at least pause until the next Token generation cycle
141
            %% BorrowFrom = 1: token borrowed from next cycle
142
            %% BorrowFrom = 2: token borrowed from next next cycle
143
            %% ...etc
144
            %%
145
            %% AND NOTE:
146
            %% 1. The number of consumers is limited
147
            %% 2. The number of Tokens increased at a fixed rate
148
            %% Therefore, consumers are always paused in turn, and the `Pause` value does
149
            %% not increase indefinitely.
150
            BorrowFrom = (abs(Remaining) div Capacity) + 1,
6✔
151

152
            %% The `Now` might be slightly larger than `LastTime` due to concurrent access and
153
            %% function execution time.
154
            %% But we always take `LastTime` as the standard, because it always increases in fixed steps.
155
            %%
156
            %% In this case, the following `Pause` value might be zero or negative,
157
            %% We still consider it to be consuming tokens from the cycle before the `LastTime`.
158
            %% And since `LastTime` will be updated immediately, we pause for at least 1ms.
159
            PauseTime = LastTime + (BorrowFrom * Interval * 1000) - Now,
6✔
160
            max(1, PauseTime)
6✔
161
    end.
162

163
-spec(delete(bucket_name()) -> ok).
164
delete(Name) ->
165
    gen_server:cast(?SERVER, {delete, Name}).
84✔
166

167
%%--------------------------------------------------------------------
168
%% gen_server callbacks
169
%%--------------------------------------------------------------------
170

171
init([]) ->
172
    _ = ets:new(?TAB, [public, set, named_table, {write_concurrency, true}]),
18✔
173
    {ok, #{countdown => #{}, timer => undefined}}.
18✔
174

175
handle_call({create, Name, Capacity, Interval}, _From, State = #{countdown := Countdown}) ->
176
    true = ets:insert(?TAB, {{tokens, Name}, Capacity}),
30✔
177
    true = ets:insert(?TAB, {{bucket, Name}, Capacity, Interval, erlang:system_time(millisecond)}),
30✔
178
    NCountdown = maps:put({bucket, Name}, Interval, Countdown),
30✔
179
    {reply, ok, ensure_countdown_timer(State#{countdown := NCountdown})};
30✔
180

181
handle_call({update, Name, Capacity, Interval}, _From, State = #{countdown := Countdown}) ->
182
    BucketName = {bucket, Name},
2✔
183
    true = ets:insert(?TAB, {{tokens, Name}, Capacity}),
2✔
184
    true = ets:insert(?TAB, {BucketName, Capacity, Interval, erlang:system_time(millisecond)}),
2✔
185
    LastInterval = maps:get(BucketName, Countdown, ?MAX_INTERVAL),
2✔
186
    NewInterval = erlang:min(Interval, LastInterval),
2✔
187
    NewCountdown = maps:put(BucketName, NewInterval, Countdown),
2✔
188
    {reply, ok, ensure_countdown_timer(State#{countdown := NewCountdown})};
2✔
189

190
handle_call(Req, _From, State) ->
191
    error_logger:error_msg("Unexpected call: ~p", [Req]),
2✔
192
    {reply, ignore, State}.
2✔
193

194
handle_cast({delete, Name}, State = #{countdown := Countdown}) ->
195
    true = ets:delete(?TAB, {bucket, Name}),
84✔
196
    true = ets:delete(?TAB, {tokens, Name}),
84✔
197
    NCountdown = maps:remove({bucket, Name}, Countdown),
84✔
198
    {noreply, State#{countdown := NCountdown}};
84✔
199

200
handle_cast(Msg, State) ->
201
    error_logger:error_msg("Unexpected cast: ~p~n", [Msg]),
2✔
202
    {noreply, State}.
2✔
203

204
handle_info({timeout, Timer, countdown}, State = #{countdown := Countdown, timer := Timer}) ->
205
    Now = time_now(),
4✔
206
    {Countdown1, StrictNow} =
4✔
207
        maps:fold(
208
            fun(Key = {bucket, Name}, 1, {AccIn, _}) ->
209
                [{_Key, Capacity, Interval, LastTime}] = ets:lookup(?TAB, Key),
2✔
210
                    %% Intolerant function execution time deviation.
211
                    %% The `LastTime` value must be updated strictly in milliseconds using Interval * 1000.
212
                    %%
213
                    %% Taking this into account, `schedule_time/2` is used to calculate the time of the next update.
214
                    %% And the `StrictNow` value calculated from any bucket can be used
215
                    %% to calculate the duration of the timer.
216
                    StrictNow = LastTime + Interval * 1000,
2✔
217

218
                    %% Generate tokens in interval, and the current tokens might be negative
219
                    %% (already borrowed by previous interval), add the Capacity value to it and
220
                    %% set an overflow threshold.
221
                    Incr = Threshold = SetValue = Capacity,
2✔
222
                    _ = ets:update_counter(?TAB, {tokens, Name}, {2, Incr, Threshold, SetValue}),
2✔
223
                    true = ets:update_element(?TAB, {bucket, Name}, {4, StrictNow}),
2✔
224

225
                    {AccIn#{Key => Interval}, StrictNow};
2✔
226
               (Key, C, {AccIn, StrictNow}) when C > 1 ->
227
                    {AccIn#{Key => C - 1}, StrictNow}
2✔
228
            end,
229
            {#{}, undefined},
230
            Countdown
231
        ),
232
    ScheduleTime = schedule_time(Now, StrictNow),
4✔
233
    NState = State#{countdown := Countdown1, timer := undefined},
4✔
234
    {noreply, ensure_countdown_timer(NState, ScheduleTime)};
4✔
235

236
handle_info(Info, State) ->
237
    error_logger:error_msg("Unexpected info: ~p~n", [Info]),
2✔
238
    {noreply, State}.
2✔
239

240
terminate(_Reason, _State) ->
241
    ok.
8✔
242

243
code_change(_OldVsn, State, _Extra) ->
244
    {ok, State}.
2✔
245

246
%%--------------------------------------------------------------------
247
%% Internal functions
248
%%--------------------------------------------------------------------
249

250
time_now() ->
251
    erlang:system_time(millisecond).
10✔
252

253
schedule_time(_Now, undefined) ->
254
    1000;
2✔
255
schedule_time(Now, StrictNow) ->
256
    StrictNow + 1000 - Now.
2✔
257

258
ensure_countdown_timer(State = #{timer := undefined}) ->
259
    ensure_countdown_timer(State, timer:seconds(1));
14✔
260
ensure_countdown_timer(State = #{timer := _TRef}) ->
261
    State.
18✔
262

263
ensure_countdown_timer(State = #{timer := undefined}, Time) when Time > 0 ->
264
    TRef = erlang:start_timer(Time, self(), countdown),
18✔
265
    State#{timer := TRef};
18✔
266
ensure_countdown_timer(State = #{timer := _TRef}, _Time) ->
NEW
267
    State.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc