• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 886

07 May 2025 12:24PM UTC coverage: 63.362%. First build
886

Pull #251

github

web-flow
Merge eb826916b into 567960622
Pull Request #251: fix(locker): return false when no available nodes

1 of 2 new or added lines in 1 file covered. (50.0%)

441 of 696 relevant lines covered (63.36%)

115.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.86
/src/ekka_locker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_locker).
18

19
-include_lib("stdlib/include/ms_transform.hrl").
20

21
-ifdef(TEST).
22
-include_lib("eunit/include/eunit.hrl").
23
-endif.
24

25
-behaviour(gen_server).
26

27
-export([ start_link/0
28
        , start_link/1
29
        , start_link/2
30
        ]).
31

32
%% For test cases
33
-export([stop/0, stop/1]).
34

35
%% Lock APIs
36
-export([ acquire/1
37
        , acquire/2
38
        , acquire/3
39
        , acquire/4
40
        ]).
41

42
-export([ release/1
43
        , release/2
44
        , release/3
45
        ]).
46

47
%% For RPC call
48
-export([ acquire_lock/2
49
        , acquire_lock/3
50
        , release_lock/2
51
        ]).
52

53
%% gen_server Callbacks
54
-export([ init/1
55
        , handle_call/3
56
        , handle_cast/2
57
        , handle_info/2
58
        , terminate/2
59
        , code_change/3
60
        ]).
61

62
-type resource() :: term().
63

64
-type lock_type() :: local | leader | quorum | all.
65

66
-type lock_result() :: {boolean(), [node() | {node(), any()}]}.
67

68
-type piggyback() :: mfa() | undefined.
69

70
-export_type([ resource/0
71
             , lock_type/0
72
             , lock_result/0
73
             , piggyback/0
74
             ]).
75

76
-record(lock, {
77
          resource :: resource(),
78
          owner    :: pid(),
79
          counter  :: integer(),
80
          created  :: integer()
81
         }).
82

83
-record(lease, {expiry, timer}).
84

85
-record(state, {locks, lease, monitors}).
86

87
-define(SERVER, ?MODULE).
88
-define(LOG(Level, Format, Args),
89
        logger:Level("Ekka(Locker): " ++ Format, Args)).
90

91
%% 15 seconds by default
92
-define(LEASE_TIME, 15000).
93
-define(MC_TIMEOUT, 30000).
94

95
%%--------------------------------------------------------------------
96
%% API
97
%%--------------------------------------------------------------------
98

99
-spec(start_link() -> {ok, pid()} | {error, term()}).
100
start_link() ->
101
    start_link(?SERVER).
160✔
102

103
-spec(start_link(atom()) -> {ok, pid()} | ignore | {error, any()}).
104
start_link(Name) ->
105
    start_link(Name, ?LEASE_TIME).
172✔
106

107
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
108
start_link(Name, LeaseTime) ->
109
    gen_server:start_link({local, Name}, ?MODULE, [Name, LeaseTime], []).
172✔
110

111
-spec(stop() -> ok).
112
stop() ->
113
    stop(?SERVER).
×
114

115
-spec(stop(pid() | atom()) -> ok).
116
stop(Name) ->
117
    gen_server:call(Name, stop).
12✔
118

119
-spec(acquire(resource()) -> lock_result()).
120
acquire(Resource) ->
121
    acquire(?SERVER, Resource).
2✔
122

123
-spec(acquire(atom(), resource()) -> lock_result()).
124
acquire(Name, Resource) when is_atom(Name) ->
125
    acquire(Name, Resource, local).
6✔
126

127
-spec(acquire(atom(), resource(), lock_type()) -> lock_result()).
128
acquire(Name, Resource, Type) ->
129
    acquire(Name, Resource, Type, undefined).
26✔
130

131
-spec(acquire(atom(), resource(), lock_type(), piggyback()) -> lock_result()).
132
acquire(Name, Resource, local, Piggyback) when is_atom(Name) ->
133
    acquire_lock(Name, lock_obj(Resource), Piggyback);
6✔
134
acquire(Name, Resource, leader, Piggyback) when is_atom(Name)->
135
    Leader = mria_membership:leader(),
4✔
136
    case rpc:call(Leader, ?MODULE, acquire_lock,
4✔
137
                  [Name, lock_obj(Resource), Piggyback]) of
138
        Err = {badrpc, _Reason} ->
139
            {false, [{Leader, Err}]};
×
140
        Res -> Res
4✔
141
    end;
142
acquire(Name, Resource, quorum, Piggyback) when is_atom(Name) ->
143
    Ring = mria_membership:ring(up),
4✔
144
    Nodes = ekka_ring:find_nodes(Resource, Ring),
4✔
145
    acquire_locks(Nodes, Name, lock_obj(Resource), Piggyback);
4✔
146

147
acquire(Name, Resource, all, Piggyback) when is_atom(Name) ->
148
    acquire_locks(mria_membership:nodelist(up),
12✔
149
                 Name, lock_obj(Resource), Piggyback).
150

151
acquire_locks(Nodes, Name, LockObj, Piggyback) ->
152
    {ResL, _BadNodes}
16✔
153
        = rpc:multicall(Nodes, ?MODULE, acquire_lock, [Name, LockObj, Piggyback], ?MC_TIMEOUT),
154
    case merge_results(ResL) of
16✔
155
        Res = {true, _}  -> Res;
12✔
156
        Res = {false, _} ->
157
            rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
4✔
158
            Res
4✔
159
    end.
160

161
acquire_lock(Name, LockObj, Piggyback) ->
162
    {acquire_lock(Name, LockObj), [with_piggyback(node(), Piggyback)]}.
26✔
163

164
acquire_lock(Name, LockObj = #lock{resource = Resource, owner = Owner}) ->
165
    Pos = #lock.counter,
26✔
166
    %% check lock status and set the lock atomically
167
    try ets:update_counter(Name, Resource, [{Pos, 0}, {Pos, 1, 1, 1}], LockObj) of
26✔
168
        [0, 1] -> %% no lock before, lock it
169
            true;
18✔
170
        [1, 1] -> %% has already been locked, either by self or by others
171
            case ets:lookup(Name, Resource) of
8✔
172
                [#lock{owner = Owner}] -> true;
8✔
173
                _Other -> false
×
174
            end
175
    catch
176
        error:badarg ->
177
            %% While remote node is booting, this might fail because
178
            %% the ETS table has not been created at that moment
179
            true
×
180
    end.
181

182
with_piggyback(Node, undefined) ->
183
    Node;
26✔
184
with_piggyback(Node, {M, F, Args}) ->
185
    {Node, erlang:apply(M, F, Args)}.
×
186

187
lock_obj(Resource) ->
188
    #lock{resource = Resource,
48✔
189
          owner    = self(),
190
          counter  = 0,
191
          created  = erlang:system_time(millisecond)
192
         }.
193

194
-spec(release(resource()) -> lock_result()).
195
release(Resource) ->
196
    release(?SERVER, Resource).
2✔
197

198
-spec(release(atom(), resource()) -> lock_result()).
199
release(Name, Resource) ->
200
    release(Name, Resource, local).
6✔
201

202
-spec(release(atom(), resource(), lock_type()) -> lock_result()).
203
release(Name, Resource, local) ->
204
    release_lock(Name, lock_obj(Resource));
6✔
205
release(Name, Resource, leader) ->
206
    Leader = mria_membership:leader(),
4✔
207
    case rpc:call(Leader, ?MODULE, release_lock, [Name, lock_obj(Resource)]) of
4✔
208
        Err = {badrpc, _Reason} ->
209
            {false, [{Leader, Err}]};
×
210
        Res -> Res
4✔
211
    end;
212
release(Name, Resource, quorum) ->
213
    Ring = mria_membership:ring(up),
4✔
214
    Nodes = ekka_ring:find_nodes(Resource, Ring),
4✔
215
    release_locks(Nodes, Name, lock_obj(Resource));
4✔
216
release(Name, Resource, all) ->
217
    release_locks(mria_membership:nodelist(up), Name, lock_obj(Resource)).
8✔
218

219
release_locks(Nodes, Name, LockObj) ->
220
    {ResL, _BadNodes} = rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
12✔
221
    merge_results(ResL).
12✔
222

223
release_lock(Name, #lock{resource = Resource, owner = Owner}) ->
224
    Res = try ets:lookup(Name, Resource) of
26✔
225
              [Lock = #lock{owner = Owner}] ->
226
                  ets:delete_object(Name, Lock);
18✔
227
              [_Lock] -> false;
×
228
              []      -> true
8✔
229
          catch
230
              error:badarg -> true
×
231
          end,
232
    {Res, [node()]}.
26✔
233

234
merge_results([]) ->
235
    {false, []};
4✔
236
merge_results(ResL) ->
237
    merge_results(ResL, [], []).
24✔
238
merge_results([], Succ, []) ->
239
    {true, lists:flatten(Succ)};
24✔
240
merge_results([], _, Failed) ->
241
    {false, lists:flatten(Failed)};
×
242
merge_results([{true, Res}|ResL], Succ, Failed) ->
243
    merge_results(ResL, [Res|Succ], Failed);
32✔
244
merge_results([{false, Res}|ResL], Succ, Failed) ->
NEW
245
    merge_results(ResL, Succ, [Res|Failed]).
×
246

247
%%--------------------------------------------------------------------
248
%% gen_server callbacks
249
%%--------------------------------------------------------------------
250

251
init([Name, LeaseTime]) ->
252
    Tab = ets:new(Name, [public, set, named_table, {keypos, 2},
172✔
253
                         {read_concurrency, true}, {write_concurrency, true}]),
254
    TRef = timer:send_interval(LeaseTime * 2, check_lease),
172✔
255
    Lease = #lease{expiry = LeaseTime, timer = TRef},
172✔
256
    {ok, #state{locks = Tab, lease = Lease, monitors = #{}}}.
172✔
257

258
handle_call(stop, _From, State) ->
259
    {stop, normal, ok, State};
12✔
260

261
handle_call(Req, _From, State) ->
262
    ?LOG(error, "Unexpected call: ~p", [Req]),
×
263
    {reply, ignore, State}.
×
264

265
handle_cast(Msg, State) ->
266
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
×
267
    {noreply, State}.
×
268

269
handle_info(check_lease, State = #state{locks = Tab, lease = Lease, monitors = Monitors}) ->
270
    Monitors1 = lists:foldl(
×
271
                  fun(#lock{resource = Resource, owner = Owner}, MonAcc) ->
272
                      case maps:find(Owner, MonAcc) of
×
273
                          {ok, ResourceSet} ->
274
                              case is_set_elem(Resource, ResourceSet) of
×
275
                                  true ->
276
                                      %% force kill it as it might have hung
277
                                      _ = spawn(fun() -> force_kill_lock_owner(Owner, Resource) end),
×
278
                                      MonAcc;
×
279
                                  false ->
280
                                      maps:put(Owner, set_put(Resource, ResourceSet), MonAcc)
×
281
                              end;
282
                          error ->
283
                              _MRef = erlang:monitor(process, Owner),
×
284
                              maps:put(Owner, set_put(Resource, #{}), MonAcc)
×
285
                      end
286
                  end, Monitors, check_lease(Tab, Lease, erlang:system_time(millisecond))),
287
    {noreply, State#state{monitors = Monitors1}, hibernate};
×
288

289
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
290
            State = #state{locks = Tab, monitors = Monitors}) ->
291
    case maps:find(DownPid, Monitors) of
×
292
        {ok, ResourceSet} ->
293
            lists:foreach(
×
294
              fun(Resource) ->
295
                  case ets:lookup(Tab, Resource) of
×
296
                      [Lock = #lock{owner = OwnerPid}] when OwnerPid =:= DownPid ->
297
                          ets:delete_object(Tab, Lock);
×
298
                      _ -> ok
×
299
                  end
300
              end, set_to_list(ResourceSet)),
301
            {noreply, State#state{monitors = maps:remove(DownPid, Monitors)}};
×
302
        error ->
303
            {noreply, State}
×
304
    end;
305

306
handle_info(Info, State) ->
307
    ?LOG(error, "Unexpected info: ~p", [Info]),
×
308
    {noreply, State}.
×
309

310
terminate(_Reason, _State = #state{lease = Lease}) ->
311
    cancel_lease(Lease).
12✔
312

313
code_change(_OldVsn, State, _Extra) ->
314
    {ok, State}.
×
315

316
%%--------------------------------------------------------------------
317
%% Internal functions
318
%%--------------------------------------------------------------------
319

320
check_lease(Tab, #lease{expiry = Expiry}, Now) ->
321
    Spec = ets:fun2ms(fun({_, _, _, _, T} = Resource) when (Now - T) > Expiry -> Resource end),
×
322
    ets:select(Tab, Spec).
×
323

324
cancel_lease(#lease{timer = TRef}) -> timer:cancel(TRef).
12✔
325

326
set_put(Resource, ResourceSet) when is_map(ResourceSet) ->
327
    ResourceSet#{Resource => nil}.
×
328

329
set_to_list(ResourceSet) when is_map(ResourceSet) ->
330
    maps:keys(ResourceSet).
×
331

332
is_set_elem(Resource, ResourceSet) when is_map(ResourceSet) ->
333
    maps:is_key(Resource, ResourceSet).
×
334

335
force_kill_lock_owner(Pid, Resource) ->
336
    logger:error("kill ~p as it has held the lock for too long, resource: ~p", [Pid, Resource]),
2✔
337
    Fields = [status, message_queue_len, current_stacktrace],
2✔
338
    Status = rpc:call(node(Pid), erlang, process_info, [Pid, Fields], 5000),
2✔
339
    logger:error("lock_owner_status:~n~p", [Status]),
2✔
340
    _ = exit(Pid, kill),
2✔
341
    ok.
2✔
342

343
-ifdef(TEST).
344
force_kill_test() ->
345
    Pid = spawn(fun() ->
2✔
346
                        receive
2✔
347
                            foo ->
348
                                ok
×
349
                        end
350
                end),
351
    ?assert(is_process_alive(Pid)),
2✔
352
    ok = force_kill_lock_owner(Pid, resource),
2✔
353
    ?assertNot(is_process_alive(Pid)).
2✔
354

355
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc