• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 888

07 May 2025 04:29PM UTC coverage: 63.271%. First build
888

Pull #251

github

web-flow
Merge 4a3e08a29 into 567960622
Pull Request #251: fix(locker): return false when no available nodes or rpc timeout

1 of 2 new or added lines in 1 file covered. (50.0%)

441 of 697 relevant lines covered (63.27%)

115.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.26
/src/ekka_locker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_locker).
18

19
-include_lib("stdlib/include/ms_transform.hrl").
20

21
-ifdef(TEST).
22
-include_lib("eunit/include/eunit.hrl").
23
-endif.
24

25
-behaviour(gen_server).
26

27
-export([ start_link/0
28
        , start_link/1
29
        , start_link/2
30
        ]).
31

32
%% For test cases
33
-export([stop/0, stop/1]).
34

35
%% Lock APIs
36
-export([ acquire/1
37
        , acquire/2
38
        , acquire/3
39
        , acquire/4
40
        ]).
41

42
-export([ release/1
43
        , release/2
44
        , release/3
45
        ]).
46

47
%% For RPC call
48
-export([ acquire_lock/2
49
        , acquire_lock/3
50
        , release_lock/2
51
        ]).
52

53
%% gen_server Callbacks
54
-export([ init/1
55
        , handle_call/3
56
        , handle_cast/2
57
        , handle_info/2
58
        , terminate/2
59
        , code_change/3
60
        ]).
61

62
-type resource() :: term().
63

64
-type lock_type() :: local | leader | quorum | all.
65

66
-type lock_result() :: {boolean(), [node() | {node(), any()}]}.
67

68
-type piggyback() :: mfa() | undefined.
69

70
-export_type([ resource/0
71
             , lock_type/0
72
             , lock_result/0
73
             , piggyback/0
74
             ]).
75

76
-record(lock, {
77
          resource :: resource(),
78
          owner    :: pid(),
79
          counter  :: integer(),
80
          created  :: integer()
81
         }).
82

83
-record(lease, {expiry, timer}).
84

85
-record(state, {locks, lease, monitors}).
86

87
-define(SERVER, ?MODULE).
88
-define(LOG(Level, Format, Args),
89
        logger:Level("Ekka(Locker): " ++ Format, Args)).
90

91
%% 15 seconds by default
92
-define(LEASE_TIME, 15000).
93
-define(MC_TIMEOUT, 30000).
94

95
%%--------------------------------------------------------------------
96
%% API
97
%%--------------------------------------------------------------------
98

99
-spec(start_link() -> {ok, pid()} | {error, term()}).
100
start_link() ->
101
    start_link(?SERVER).
160✔
102

103
-spec(start_link(atom()) -> {ok, pid()} | ignore | {error, any()}).
104
start_link(Name) ->
105
    start_link(Name, ?LEASE_TIME).
174✔
106

107
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
108
start_link(Name, LeaseTime) ->
109
    gen_server:start_link({local, Name}, ?MODULE, [Name, LeaseTime], []).
174✔
110

111
-spec(stop() -> ok).
112
stop() ->
113
    stop(?SERVER).
×
114

115
-spec(stop(pid() | atom()) -> ok).
116
stop(Name) ->
117
    gen_server:call(Name, stop).
14✔
118

119
-spec(acquire(resource()) -> lock_result()).
120
acquire(Resource) ->
121
    acquire(?SERVER, Resource).
2✔
122

123
-spec(acquire(atom(), resource()) -> lock_result()).
124
acquire(Name, Resource) when is_atom(Name) ->
125
    acquire(Name, Resource, local).
6✔
126

127
-spec(acquire(atom(), resource(), lock_type()) -> lock_result()).
128
acquire(Name, Resource, Type) ->
129
    acquire(Name, Resource, Type, undefined).
28✔
130

131
-spec(acquire(atom(), resource(), lock_type(), piggyback()) -> lock_result()).
132
acquire(Name, Resource, local, Piggyback) when is_atom(Name) ->
133
    acquire_lock(Name, lock_obj(Resource), Piggyback);
6✔
134
acquire(Name, Resource, leader, Piggyback) when is_atom(Name)->
135
    Leader = mria_membership:leader(),
4✔
136
    case rpc:call(Leader, ?MODULE, acquire_lock,
4✔
137
                  [Name, lock_obj(Resource), Piggyback]) of
138
        Err = {badrpc, _Reason} ->
139
            {false, [{Leader, Err}]};
×
140
        Res -> Res
4✔
141
    end;
142
acquire(Name, Resource, quorum, Piggyback) when is_atom(Name) ->
143
    Ring = mria_membership:ring(up),
4✔
144
    Nodes = ekka_ring:find_nodes(Resource, Ring),
4✔
145
    acquire_locks(Nodes, Name, lock_obj(Resource), Piggyback);
4✔
146

147
acquire(Name, Resource, all, Piggyback) when is_atom(Name) ->
148
    acquire_locks(mria_membership:nodelist(up),
14✔
149
                 Name, lock_obj(Resource), Piggyback).
150

151
acquire_locks(Nodes, Name, LockObj, Piggyback) ->
152
    {ResL, _BadNodes}
18✔
153
        = rpc:multicall(Nodes, ?MODULE, acquire_lock, [Name, LockObj, Piggyback], ?MC_TIMEOUT),
154
    case merge_results(ResL) of
18✔
155
        Res = {true, _}  -> Res;
12✔
156
        Res = {false, _} ->
157
            rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
6✔
158
            Res
6✔
159
    end.
160

161
acquire_lock(Name, LockObj, Piggyback) ->
162
    {acquire_lock(Name, LockObj), [with_piggyback(node(), Piggyback)]}.
26✔
163

164
acquire_lock(Name, LockObj = #lock{resource = Resource, owner = Owner}) ->
165
    Pos = #lock.counter,
26✔
166
    %% check lock status and set the lock atomically
167
    try ets:update_counter(Name, Resource, [{Pos, 0}, {Pos, 1, 1, 1}], LockObj) of
26✔
168
        [0, 1] -> %% no lock before, lock it
169
            true;
18✔
170
        [1, 1] -> %% has already been locked, either by self or by others
171
            case ets:lookup(Name, Resource) of
8✔
172
                [#lock{owner = Owner}] -> true;
8✔
173
                _Other -> false
×
174
            end
175
    catch
176
        error:badarg ->
177
            %% While remote node is booting, this might fail because
178
            %% the ETS table has not been created at that moment
179
            true
×
180
    end.
181

182
with_piggyback(Node, undefined) ->
183
    Node;
26✔
184
with_piggyback(Node, {M, F, Args}) ->
185
    {Node, erlang:apply(M, F, Args)}.
×
186

187
lock_obj(Resource) ->
188
    #lock{resource = Resource,
50✔
189
          owner    = self(),
190
          counter  = 0,
191
          created  = erlang:system_time(millisecond)
192
         }.
193

194
-spec(release(resource()) -> lock_result()).
195
release(Resource) ->
196
    release(?SERVER, Resource).
2✔
197

198
-spec(release(atom(), resource()) -> lock_result()).
199
release(Name, Resource) ->
200
    release(Name, Resource, local).
6✔
201

202
-spec(release(atom(), resource(), lock_type()) -> lock_result()).
203
release(Name, Resource, local) ->
204
    release_lock(Name, lock_obj(Resource));
6✔
205
release(Name, Resource, leader) ->
206
    Leader = mria_membership:leader(),
4✔
207
    case rpc:call(Leader, ?MODULE, release_lock, [Name, lock_obj(Resource)]) of
4✔
208
        Err = {badrpc, _Reason} ->
209
            {false, [{Leader, Err}]};
×
210
        Res -> Res
4✔
211
    end;
212
release(Name, Resource, quorum) ->
213
    Ring = mria_membership:ring(up),
4✔
214
    Nodes = ekka_ring:find_nodes(Resource, Ring),
4✔
215
    release_locks(Nodes, Name, lock_obj(Resource));
4✔
216
release(Name, Resource, all) ->
217
    release_locks(mria_membership:nodelist(up), Name, lock_obj(Resource)).
8✔
218

219
release_locks(Nodes, Name, LockObj) ->
220
    {ResL, _BadNodes} = rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
12✔
221
    merge_results(ResL).
12✔
222

223
release_lock(Name, #lock{resource = Resource, owner = Owner}) ->
224
    Res = try ets:lookup(Name, Resource) of
26✔
225
              [Lock = #lock{owner = Owner}] ->
226
                  ets:delete_object(Name, Lock);
18✔
227
              [_Lock] -> false;
×
228
              []      -> true
8✔
229
          catch
230
              error:badarg -> true
×
231
          end,
232
    {Res, [node()]}.
26✔
233

234
merge_results([]) ->
235
    {false, []};
6✔
236
merge_results(ResL) ->
237
    merge_results(ResL, [], []).
24✔
238
merge_results([], Succ, []) ->
239
    {true, lists:flatten(Succ)};
24✔
240
merge_results([], _, Failed) ->
241
    {false, lists:flatten(Failed)};
×
242
merge_results([{true, Res}|ResL], Succ, Failed) ->
243
    merge_results(ResL, [Res|Succ], Failed);
32✔
244
merge_results([{false, Res}|ResL], Succ, Failed) ->
NEW
245
    merge_results(ResL, Succ, [Res|Failed]);
×
246
merge_results([{badrpc, _} = Res | ResL], Succ, Failed) ->
247
    merge_results(ResL, Succ, [Res|Failed]).
×
248

249
%%--------------------------------------------------------------------
250
%% gen_server callbacks
251
%%--------------------------------------------------------------------
252

253
init([Name, LeaseTime]) ->
254
    Tab = ets:new(Name, [public, set, named_table, {keypos, 2},
174✔
255
                         {read_concurrency, true}, {write_concurrency, true}]),
256
    TRef = timer:send_interval(LeaseTime * 2, check_lease),
174✔
257
    Lease = #lease{expiry = LeaseTime, timer = TRef},
174✔
258
    {ok, #state{locks = Tab, lease = Lease, monitors = #{}}}.
174✔
259

260
handle_call(stop, _From, State) ->
261
    {stop, normal, ok, State};
14✔
262

263
handle_call(Req, _From, State) ->
264
    ?LOG(error, "Unexpected call: ~p", [Req]),
×
265
    {reply, ignore, State}.
×
266

267
handle_cast(Msg, State) ->
268
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
×
269
    {noreply, State}.
×
270

271
handle_info(check_lease, State = #state{locks = Tab, lease = Lease, monitors = Monitors}) ->
272
    Monitors1 = lists:foldl(
×
273
                  fun(#lock{resource = Resource, owner = Owner}, MonAcc) ->
274
                      case maps:find(Owner, MonAcc) of
×
275
                          {ok, ResourceSet} ->
276
                              case is_set_elem(Resource, ResourceSet) of
×
277
                                  true ->
278
                                      %% force kill it as it might have hung
279
                                      _ = spawn(fun() -> force_kill_lock_owner(Owner, Resource) end),
×
280
                                      MonAcc;
×
281
                                  false ->
282
                                      maps:put(Owner, set_put(Resource, ResourceSet), MonAcc)
×
283
                              end;
284
                          error ->
285
                              _MRef = erlang:monitor(process, Owner),
×
286
                              maps:put(Owner, set_put(Resource, #{}), MonAcc)
×
287
                      end
288
                  end, Monitors, check_lease(Tab, Lease, erlang:system_time(millisecond))),
289
    {noreply, State#state{monitors = Monitors1}, hibernate};
×
290

291
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
292
            State = #state{locks = Tab, monitors = Monitors}) ->
293
    case maps:find(DownPid, Monitors) of
×
294
        {ok, ResourceSet} ->
295
            lists:foreach(
×
296
              fun(Resource) ->
297
                  case ets:lookup(Tab, Resource) of
×
298
                      [Lock = #lock{owner = OwnerPid}] when OwnerPid =:= DownPid ->
299
                          ets:delete_object(Tab, Lock);
×
300
                      _ -> ok
×
301
                  end
302
              end, set_to_list(ResourceSet)),
303
            {noreply, State#state{monitors = maps:remove(DownPid, Monitors)}};
×
304
        error ->
305
            {noreply, State}
×
306
    end;
307

308
handle_info(Info, State) ->
309
    ?LOG(error, "Unexpected info: ~p", [Info]),
×
310
    {noreply, State}.
×
311

312
terminate(_Reason, _State = #state{lease = Lease}) ->
313
    cancel_lease(Lease).
14✔
314

315
code_change(_OldVsn, State, _Extra) ->
316
    {ok, State}.
×
317

318
%%--------------------------------------------------------------------
319
%% Internal functions
320
%%--------------------------------------------------------------------
321

322
check_lease(Tab, #lease{expiry = Expiry}, Now) ->
323
    Spec = ets:fun2ms(fun({_, _, _, _, T} = Resource) when (Now - T) > Expiry -> Resource end),
×
324
    ets:select(Tab, Spec).
×
325

326
cancel_lease(#lease{timer = TRef}) -> timer:cancel(TRef).
14✔
327

328
set_put(Resource, ResourceSet) when is_map(ResourceSet) ->
329
    ResourceSet#{Resource => nil}.
×
330

331
set_to_list(ResourceSet) when is_map(ResourceSet) ->
332
    maps:keys(ResourceSet).
×
333

334
is_set_elem(Resource, ResourceSet) when is_map(ResourceSet) ->
335
    maps:is_key(Resource, ResourceSet).
×
336

337
force_kill_lock_owner(Pid, Resource) ->
338
    logger:error("kill ~p as it has held the lock for too long, resource: ~p", [Pid, Resource]),
2✔
339
    Fields = [status, message_queue_len, current_stacktrace],
2✔
340
    Status = rpc:call(node(Pid), erlang, process_info, [Pid, Fields], 5000),
2✔
341
    logger:error("lock_owner_status:~n~p", [Status]),
2✔
342
    _ = exit(Pid, kill),
2✔
343
    ok.
2✔
344

345
-ifdef(TEST).
346
force_kill_test() ->
347
    Pid = spawn(fun() ->
2✔
348
                        receive
2✔
349
                            foo ->
350
                                ok
×
351
                        end
352
                end),
353
    ?assert(is_process_alive(Pid)),
2✔
354
    ok = force_kill_lock_owner(Pid, resource),
2✔
355
    ?assertNot(is_process_alive(Pid)).
2✔
356

357
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc