• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 891

08 May 2025 10:21AM UTC coverage: 63.271%. First build
891

Pull #252

github

web-flow
Merge 07fdd0c07 into ed5773ea0
Pull Request #252: Backport/william/locker when no nodes

1 of 2 new or added lines in 1 file covered. (50.0%)

441 of 697 relevant lines covered (63.27%)

56.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.26
/src/ekka_locker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_locker).
18

19
-include_lib("stdlib/include/ms_transform.hrl").
20

21
-ifdef(TEST).
22
-include_lib("eunit/include/eunit.hrl").
23
-endif.
24

25
-behaviour(gen_server).
26

27
-export([ start_link/0
28
        , start_link/1
29
        , start_link/2
30
        ]).
31

32
%% For test cases
33
-export([stop/0, stop/1]).
34

35
%% Lock APIs
36
-export([ acquire/1
37
        , acquire/2
38
        , acquire/3
39
        , acquire/4
40
        ]).
41

42
-export([ release/1
43
        , release/2
44
        , release/3
45
        ]).
46

47
%% For RPC call
48
-export([ acquire_lock/2
49
        , acquire_lock/3
50
        , release_lock/2
51
        ]).
52

53
%% gen_server Callbacks
54
-export([ init/1
55
        , handle_call/3
56
        , handle_cast/2
57
        , handle_info/2
58
        , terminate/2
59
        , code_change/3
60
        ]).
61

62
-type resource() :: term().
63

64
-type lock_type() :: local | leader | quorum | all.
65

66
-type lock_result() :: {boolean(), [node() | {node(), any()}]}.
67

68
-type piggyback() :: mfa() | undefined.
69

70
-export_type([ resource/0
71
             , lock_type/0
72
             , lock_result/0
73
             , piggyback/0
74
             ]).
75

76
-record(lock, {
77
          resource :: resource(),
78
          owner    :: pid(),
79
          counter  :: integer(),
80
          created  :: integer()
81
         }).
82

83
-record(lease, {expiry, timer}).
84

85
-record(state, {locks, lease, monitors}).
86

87
-define(SERVER, ?MODULE).
88
-define(LOG(Level, Format, Args),
89
        logger:Level("Ekka(Locker): " ++ Format, Args)).
90

91
%% 15 seconds by default
92
-define(LEASE_TIME, 15000).
93
-define(MC_TIMEOUT, 30000).
94

95
%%--------------------------------------------------------------------
96
%% API
97
%%--------------------------------------------------------------------
98

99
-spec(start_link() -> {ok, pid()} | {error, term()}).
100
start_link() ->
101
    start_link(?SERVER).
80✔
102

103
-spec(start_link(atom()) -> {ok, pid()} | ignore | {error, any()}).
104
start_link(Name) ->
105
    start_link(Name, ?LEASE_TIME).
87✔
106

107
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
108
start_link(Name, LeaseTime) ->
109
    gen_server:start_link({local, Name}, ?MODULE, [Name, LeaseTime], []).
87✔
110

111
-spec(stop() -> ok).
112
stop() ->
113
    stop(?SERVER).
×
114

115
-spec(stop(pid() | atom()) -> ok).
116
stop(Name) ->
117
    gen_server:call(Name, stop).
7✔
118

119
-spec(acquire(resource()) -> lock_result()).
120
acquire(Resource) ->
121
    acquire(?SERVER, Resource).
1✔
122

123
-spec(acquire(atom(), resource()) -> lock_result()).
124
acquire(Name, Resource) when is_atom(Name) ->
125
    acquire(Name, Resource, local).
3✔
126

127
-spec(acquire(atom(), resource(), lock_type()) -> lock_result()).
128
acquire(Name, Resource, Type) ->
129
    acquire(Name, Resource, Type, undefined).
14✔
130

131
-spec(acquire(atom(), resource(), lock_type(), piggyback()) -> lock_result()).
132
acquire(Name, Resource, local, Piggyback) when is_atom(Name) ->
133
    acquire_lock(Name, lock_obj(Resource), Piggyback);
3✔
134
acquire(Name, Resource, leader, Piggyback) when is_atom(Name)->
135
    Leader = mria_membership:leader(),
2✔
136
    case rpc:call(Leader, ?MODULE, acquire_lock,
2✔
137
                  [Name, lock_obj(Resource), Piggyback]) of
138
        Err = {badrpc, _Reason} ->
139
            {false, [{Leader, Err}]};
×
140
        Res -> Res
2✔
141
    end;
142
acquire(Name, Resource, quorum, Piggyback) when is_atom(Name) ->
143
    Ring = mria_membership:ring(up),
2✔
144
    Nodes = ekka_ring:find_nodes(Resource, Ring),
2✔
145
    acquire_locks(Nodes, Name, lock_obj(Resource), Piggyback);
2✔
146

147
acquire(Name, Resource, all, Piggyback) when is_atom(Name) ->
148
    acquire_locks(mria_membership:nodelist(up),
7✔
149
                 Name, lock_obj(Resource), Piggyback).
150

151
acquire_locks(Nodes, Name, LockObj, Piggyback) ->
152
    {ResL, _BadNodes}
9✔
153
        = rpc:multicall(Nodes, ?MODULE, acquire_lock, [Name, LockObj, Piggyback], ?MC_TIMEOUT),
154
    case merge_results(ResL) of
9✔
155
        Res = {true, _}  -> Res;
6✔
156
        Res = {false, _} ->
157
            rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
3✔
158
            Res
3✔
159
    end.
160

161
acquire_lock(Name, LockObj, Piggyback) ->
162
    {acquire_lock(Name, LockObj), [with_piggyback(node(), Piggyback)]}.
14✔
163

164
acquire_lock(Name, LockObj = #lock{resource = Resource, owner = Owner}) ->
165
    Pos = #lock.counter,
14✔
166
    %% check lock status and set the lock atomically
167
    try ets:update_counter(Name, Resource, [{Pos, 0}, {Pos, 1, 1, 1}], LockObj) of
14✔
168
        [0, 1] -> %% no lock before, lock it
169
            true;
10✔
170
        [1, 1] -> %% has already been locked, either by self or by others
171
            case ets:lookup(Name, Resource) of
4✔
172
                [#lock{owner = Owner}] -> true;
4✔
173
                _Other -> false
×
174
            end
175
    catch
176
        error:badarg ->
177
            %% While remote node is booting, this might fail because
178
            %% the ETS table has not been created at that moment
179
            true
×
180
    end.
181

182
with_piggyback(Node, undefined) ->
183
    Node;
14✔
184
with_piggyback(Node, {M, F, Args}) ->
185
    {Node, erlang:apply(M, F, Args)}.
×
186

187
lock_obj(Resource) ->
188
    #lock{resource = Resource,
25✔
189
          owner    = self(),
190
          counter  = 0,
191
          created  = erlang:system_time(millisecond)
192
         }.
193

194
-spec(release(resource()) -> lock_result()).
195
release(Resource) ->
196
    release(?SERVER, Resource).
1✔
197

198
-spec(release(atom(), resource()) -> lock_result()).
199
release(Name, Resource) ->
200
    release(Name, Resource, local).
3✔
201

202
-spec(release(atom(), resource(), lock_type()) -> lock_result()).
203
release(Name, Resource, local) ->
204
    release_lock(Name, lock_obj(Resource));
3✔
205
release(Name, Resource, leader) ->
206
    Leader = mria_membership:leader(),
2✔
207
    case rpc:call(Leader, ?MODULE, release_lock, [Name, lock_obj(Resource)]) of
2✔
208
        Err = {badrpc, _Reason} ->
209
            {false, [{Leader, Err}]};
×
210
        Res -> Res
2✔
211
    end;
212
release(Name, Resource, quorum) ->
213
    Ring = mria_membership:ring(up),
2✔
214
    Nodes = ekka_ring:find_nodes(Resource, Ring),
2✔
215
    release_locks(Nodes, Name, lock_obj(Resource));
2✔
216
release(Name, Resource, all) ->
217
    release_locks(mria_membership:nodelist(up), Name, lock_obj(Resource)).
4✔
218

219
release_locks(Nodes, Name, LockObj) ->
220
    {ResL, _BadNodes} = rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
6✔
221
    merge_results(ResL).
6✔
222

223
release_lock(Name, #lock{resource = Resource, owner = Owner}) ->
224
    Res = try ets:lookup(Name, Resource) of
14✔
225
              [Lock = #lock{owner = Owner}] ->
226
                  ets:delete_object(Name, Lock);
10✔
227
              [_Lock] -> false;
×
228
              []      -> true
4✔
229
          catch
230
              error:badarg -> true
×
231
          end,
232
    {Res, [node()]}.
14✔
233

234
merge_results([]) ->
235
    {false, []};
3✔
236
merge_results(ResL) ->
237
    merge_results(ResL, [], []).
12✔
238
merge_results([], Succ, []) ->
239
    {true, lists:flatten(Succ)};
12✔
240
merge_results([], _, Failed) ->
241
    {false, lists:flatten(Failed)};
×
242
merge_results([{true, Res}|ResL], Succ, Failed) ->
243
    merge_results(ResL, [Res|Succ], Failed);
16✔
244
merge_results([{false, Res}|ResL], Succ, Failed) ->
NEW
245
    merge_results(ResL, Succ, [Res|Failed]);
×
246
merge_results([{badrpc, _} = Res | ResL], Succ, Failed) ->
247
    merge_results(ResL, Succ, [Res|Failed]).
×
248

249
%%--------------------------------------------------------------------
250
%% gen_server callbacks
251
%%--------------------------------------------------------------------
252

253
init([Name, LeaseTime]) ->
254
    Tab = ets:new(Name, [public, set, named_table, {keypos, 2},
87✔
255
                         {read_concurrency, true}, {write_concurrency, true}]),
256
    TRef = timer:send_interval(LeaseTime * 2, check_lease),
87✔
257
    Lease = #lease{expiry = LeaseTime, timer = TRef},
86✔
258
    {ok, #state{locks = Tab, lease = Lease, monitors = #{}}}.
86✔
259

260
handle_call(stop, _From, State) ->
261
    {stop, normal, ok, State};
7✔
262

263
handle_call(Req, _From, State) ->
264
    ?LOG(error, "Unexpected call: ~p", [Req]),
×
265
    {reply, ignore, State}.
×
266

267
handle_cast(Msg, State) ->
268
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
×
269
    {noreply, State}.
×
270

271
handle_info(check_lease, State = #state{locks = Tab, lease = Lease, monitors = Monitors}) ->
272
    Monitors1 = lists:foldl(
×
273
                  fun(#lock{resource = Resource, owner = Owner}, MonAcc) ->
274
                      case maps:find(Owner, MonAcc) of
×
275
                          {ok, ResourceSet} ->
276
                              case is_set_elem(Resource, ResourceSet) of
×
277
                                  true ->
278
                                      %% force kill it as it might have hung
279
                                      _ = spawn(fun() -> force_kill_lock_owner(Owner, Resource) end),
×
280
                                      MonAcc;
×
281
                                  false ->
282
                                      maps:put(Owner, set_put(Resource, ResourceSet), MonAcc)
×
283
                              end;
284
                          error ->
285
                              _MRef = erlang:monitor(process, Owner),
×
286
                              maps:put(Owner, set_put(Resource, #{}), MonAcc)
×
287
                      end
288
                  end, Monitors, check_lease(Tab, Lease, erlang:system_time(millisecond))),
289
    {noreply, State#state{monitors = Monitors1}, hibernate};
×
290

291
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
292
            State = #state{locks = Tab, monitors = Monitors}) ->
293
    case maps:find(DownPid, Monitors) of
×
294
        {ok, ResourceSet} ->
295
            lists:foreach(
×
296
              fun(Resource) ->
297
                  case ets:lookup(Tab, Resource) of
×
298
                      [Lock = #lock{owner = OwnerPid}] when OwnerPid =:= DownPid ->
299
                          ets:delete_object(Tab, Lock);
×
300
                      _ -> ok
×
301
                  end
302
              end, set_to_list(ResourceSet)),
303
            {noreply, State#state{monitors = maps:remove(DownPid, Monitors)}};
×
304
        error ->
305
            {noreply, State}
×
306
    end;
307

308
handle_info(Info, State) ->
309
    ?LOG(error, "Unexpected info: ~p", [Info]),
×
310
    {noreply, State}.
×
311

312
terminate(_Reason, _State = #state{lease = Lease}) ->
313
    cancel_lease(Lease).
7✔
314

315
code_change(_OldVsn, State, _Extra) ->
316
    {ok, State}.
×
317

318
%%--------------------------------------------------------------------
319
%% Internal functions
320
%%--------------------------------------------------------------------
321

322
check_lease(Tab, #lease{expiry = Expiry}, Now) ->
323
    Spec = ets:fun2ms(fun({_, _, _, _, T} = Resource) when (Now - T) > Expiry -> Resource end),
×
324
    ets:select(Tab, Spec).
×
325

326
cancel_lease(#lease{timer = TRef}) -> timer:cancel(TRef).
7✔
327

328
set_put(Resource, ResourceSet) when is_map(ResourceSet) ->
329
    ResourceSet#{Resource => nil}.
×
330

331
set_to_list(ResourceSet) when is_map(ResourceSet) ->
332
    maps:keys(ResourceSet).
×
333

334
is_set_elem(Resource, ResourceSet) when is_map(ResourceSet) ->
335
    maps:is_key(Resource, ResourceSet).
×
336

337
force_kill_lock_owner(Pid, Resource) ->
338
    logger:error("kill ~p as it has held the lock for too long, resource: ~p", [Pid, Resource]),
1✔
339
    Fields = [status, message_queue_len, current_stacktrace],
1✔
340
    Status = rpc:call(node(Pid), erlang, process_info, [Pid, Fields], 5000),
1✔
341
    logger:error("lock_owner_status:~n~p", [Status]),
1✔
342
    _ = exit(Pid, kill),
1✔
343
    ok.
1✔
344

345
-ifdef(TEST).
346
force_kill_test() ->
347
    Pid = spawn(fun() ->
1✔
348
                        receive
1✔
349
                            foo ->
350
                                ok
×
351
                        end
352
                end),
353
    ?assert(is_process_alive(Pid)),
1✔
354
    ok = force_kill_lock_owner(Pid, resource),
1✔
355
    ?assertNot(is_process_alive(Pid)).
1✔
356

357
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc