• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 823

28 Mar 2024 09:46AM UTC coverage: 61.618% (-1.2%) from 62.78%
823

push

github

web-flow
Merge pull request #233 from SergeTupchiy/EMQX-11826-prevent-left-node-from-rejoining

EMQX-11826 prevent left node from rejoining

5 of 15 new or added lines in 2 files covered. (33.33%)

2 existing lines in 1 file now uncovered.

419 of 680 relevant lines covered (61.62%)

50.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.58
/src/ekka_locker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_locker).
18

19
-include_lib("stdlib/include/ms_transform.hrl").
20

21
-ifdef(TEST).
22
-include_lib("eunit/include/eunit.hrl").
23
-endif.
24

25
-behaviour(gen_server).
26

27
-export([ start_link/0
28
        , start_link/1
29
        , start_link/2
30
        ]).
31

32
%% For test cases
33
-export([stop/0, stop/1]).
34

35
%% Lock APIs
36
-export([ acquire/1
37
        , acquire/2
38
        , acquire/3
39
        , acquire/4
40
        ]).
41

42
-export([ release/1
43
        , release/2
44
        , release/3
45
        ]).
46

47
%% For RPC call
48
-export([ acquire_lock/2
49
        , acquire_lock/3
50
        , release_lock/2
51
        ]).
52

53
%% gen_server Callbacks
54
-export([ init/1
55
        , handle_call/3
56
        , handle_cast/2
57
        , handle_info/2
58
        , terminate/2
59
        , code_change/3
60
        ]).
61

62
-type resource() :: term().
63

64
-type lock_type() :: local | leader | quorum | all.
65

66
-type lock_result() :: {boolean(), [node() | {node(), any()}]}.
67

68
-type piggyback() :: mfa() | undefined.
69

70
-export_type([ resource/0
71
             , lock_type/0
72
             , lock_result/0
73
             , piggyback/0
74
             ]).
75

76
-record(lock, {
77
          resource :: resource(),
78
          owner    :: pid(),
79
          counter  :: integer(),
80
          created  :: integer()
81
         }).
82

83
-record(lease, {expiry, timer}).
84

85
-record(state, {locks, lease, monitors}).
86

87
-define(SERVER, ?MODULE).
88
-define(LOG(Level, Format, Args),
89
        logger:Level("Ekka(Locker): " ++ Format, Args)).
90

91
%% 15 seconds by default
92
-define(LEASE_TIME, 15000).
93
-define(MC_TIMEOUT, 30000).
94

95
%%--------------------------------------------------------------------
96
%% API
97
%%--------------------------------------------------------------------
98

99
-spec(start_link() -> {ok, pid()} | {error, term()}).
100
start_link() ->
101
    start_link(?SERVER).
76✔
102

103
-spec(start_link(atom()) -> {ok, pid()} | ignore | {error, any()}).
104
start_link(Name) ->
105
    start_link(Name, ?LEASE_TIME).
80✔
106

107
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
108
start_link(Name, LeaseTime) ->
109
    gen_server:start_link({local, Name}, ?MODULE, [Name, LeaseTime], []).
80✔
110

111
-spec(stop() -> ok).
112
stop() ->
113
    stop(?SERVER).
×
114

115
-spec(stop(pid() | atom()) -> ok).
116
stop(Name) ->
117
    gen_server:call(Name, stop).
4✔
118

119
-spec(acquire(resource()) -> lock_result()).
120
acquire(Resource) ->
121
    acquire(?SERVER, Resource).
1✔
122

123
-spec(acquire(atom(), resource()) -> lock_result()).
124
acquire(Name, Resource) when is_atom(Name) ->
125
    acquire(Name, Resource, local).
3✔
126

127
-spec(acquire(atom(), resource(), lock_type()) -> lock_result()).
128
acquire(Name, Resource, Type) ->
129
    acquire(Name, Resource, Type, undefined).
11✔
130

131
-spec(acquire(atom(), resource(), lock_type(), piggyback()) -> lock_result()).
132
acquire(Name, Resource, local, Piggyback) when is_atom(Name) ->
133
    acquire_lock(Name, lock_obj(Resource), Piggyback);
3✔
134
acquire(Name, Resource, leader, Piggyback) when is_atom(Name)->
135
    Leader = mria_membership:leader(),
2✔
136
    case rpc:call(Leader, ?MODULE, acquire_lock,
2✔
137
                  [Name, lock_obj(Resource), Piggyback]) of
138
        Err = {badrpc, _Reason} ->
139
            {false, [{Leader, Err}]};
×
140
        Res -> Res
2✔
141
    end;
142
acquire(Name, Resource, quorum, Piggyback) when is_atom(Name) ->
143
    Ring = mria_membership:ring(up),
2✔
144
    Nodes = ekka_ring:find_nodes(Resource, Ring),
2✔
145
    acquire_locks(Nodes, Name, lock_obj(Resource), Piggyback);
2✔
146

147
acquire(Name, Resource, all, Piggyback) when is_atom(Name) ->
148
    acquire_locks(mria_membership:nodelist(up),
4✔
149
                 Name, lock_obj(Resource), Piggyback).
150

151
acquire_locks(Nodes, Name, LockObj, Piggyback) ->
152
    {ResL, _BadNodes}
6✔
153
        = rpc:multicall(Nodes, ?MODULE, acquire_lock, [Name, LockObj, Piggyback], ?MC_TIMEOUT),
154
    case merge_results(ResL) of
6✔
155
        Res = {true, _}  -> Res;
6✔
156
        Res = {false, _} ->
157
            rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
×
158
            Res
×
159
    end.
160

161
acquire_lock(Name, LockObj, Piggyback) ->
162
    {acquire_lock(Name, LockObj), [with_piggyback(node(), Piggyback)]}.
13✔
163

164
acquire_lock(Name, LockObj = #lock{resource = Resource, owner = Owner}) ->
165
    Pos = #lock.counter,
13✔
166
    %% check lock status and set the lock atomically
167
    try ets:update_counter(Name, Resource, [{Pos, 0}, {Pos, 1, 1, 1}], LockObj) of
13✔
168
        [0, 1] -> %% no lock before, lock it
169
            true;
9✔
170
        [1, 1] -> %% has already been locked, either by self or by others
171
            case ets:lookup(Name, Resource) of
4✔
172
                [#lock{owner = Owner}] -> true;
4✔
173
                _Other -> false
×
174
            end
175
    catch
176
        error:badarg ->
177
            %% While remote node is booting, this might fail because
178
            %% the ETS table has not been created at that moment
UNCOV
179
            true
×
180
    end.
181

182
with_piggyback(Node, undefined) ->
183
    Node;
13✔
184
with_piggyback(Node, {M, F, Args}) ->
185
    {Node, erlang:apply(M, F, Args)}.
×
186

187
lock_obj(Resource) ->
188
    #lock{resource = Resource,
22✔
189
          owner    = self(),
190
          counter  = 0,
191
          created  = erlang:system_time(millisecond)
192
         }.
193

194
-spec(release(resource()) -> lock_result()).
195
release(Resource) ->
196
    release(?SERVER, Resource).
1✔
197

198
-spec(release(atom(), resource()) -> lock_result()).
199
release(Name, Resource) ->
200
    release(Name, Resource, local).
3✔
201

202
-spec(release(atom(), resource(), lock_type()) -> lock_result()).
203
release(Name, Resource, local) ->
204
    release_lock(Name, lock_obj(Resource));
3✔
205
release(Name, Resource, leader) ->
206
    Leader = mria_membership:leader(),
2✔
207
    case rpc:call(Leader, ?MODULE, release_lock, [Name, lock_obj(Resource)]) of
2✔
208
        Err = {badrpc, _Reason} ->
209
            {false, [{Leader, Err}]};
×
210
        Res -> Res
2✔
211
    end;
212
release(Name, Resource, quorum) ->
213
    Ring = mria_membership:ring(up),
2✔
214
    Nodes = ekka_ring:find_nodes(Resource, Ring),
2✔
215
    release_locks(Nodes, Name, lock_obj(Resource));
2✔
216
release(Name, Resource, all) ->
217
    release_locks(mria_membership:nodelist(up), Name, lock_obj(Resource)).
4✔
218

219
release_locks(Nodes, Name, LockObj) ->
220
    {ResL, _BadNodes} = rpc:multicall(Nodes, ?MODULE, release_lock, [Name, LockObj], ?MC_TIMEOUT),
6✔
221
    merge_results(ResL).
6✔
222

223
release_lock(Name, #lock{resource = Resource, owner = Owner}) ->
224
    Res = try ets:lookup(Name, Resource) of
13✔
225
              [Lock = #lock{owner = Owner}] ->
226
                  ets:delete_object(Name, Lock);
9✔
227
              [_Lock] -> false;
×
228
              []      -> true
4✔
229
          catch
UNCOV
230
              error:badarg -> true
×
231
          end,
232
    {Res, [node()]}.
13✔
233

234
merge_results(ResL) ->
235
    merge_results(ResL, [], []).
12✔
236
merge_results([], Succ, []) ->
237
    {true, lists:flatten(Succ)};
12✔
238
merge_results([], _, Failed) ->
239
    {false, lists:flatten(Failed)};
×
240
merge_results([{true, Res}|ResL], Succ, Failed) ->
241
    merge_results(ResL, [Res|Succ], Failed);
16✔
242
merge_results([{false, Res}|ResL], Succ, Failed) ->
243
    merge_results(ResL, Succ, [Res|Failed]).
×
244

245
%%--------------------------------------------------------------------
246
%% gen_server callbacks
247
%%--------------------------------------------------------------------
248

249
init([Name, LeaseTime]) ->
250
    Tab = ets:new(Name, [public, set, named_table, {keypos, 2},
80✔
251
                         {read_concurrency, true}, {write_concurrency, true}]),
252
    TRef = timer:send_interval(LeaseTime * 2, check_lease),
80✔
253
    Lease = #lease{expiry = LeaseTime, timer = TRef},
80✔
254
    {ok, #state{locks = Tab, lease = Lease, monitors = #{}}}.
80✔
255

256
handle_call(stop, _From, State) ->
257
    {stop, normal, ok, State};
4✔
258

259
handle_call(Req, _From, State) ->
260
    ?LOG(error, "Unexpected call: ~p", [Req]),
×
261
    {reply, ignore, State}.
×
262

263
handle_cast(Msg, State) ->
264
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
×
265
    {noreply, State}.
×
266

267
handle_info(check_lease, State = #state{locks = Tab, lease = Lease, monitors = Monitors}) ->
268
    Monitors1 = lists:foldl(
×
269
                  fun(#lock{resource = Resource, owner = Owner}, MonAcc) ->
270
                      case maps:find(Owner, MonAcc) of
×
271
                          {ok, ResourceSet} ->
272
                              case is_set_elem(Resource, ResourceSet) of
×
273
                                  true ->
274
                                      %% force kill it as it might have hung
275
                                      _ = spawn(fun() -> force_kill_lock_owner(Owner, Resource) end),
×
276
                                      MonAcc;
×
277
                                  false ->
278
                                      maps:put(Owner, set_put(Resource, ResourceSet), MonAcc)
×
279
                              end;
280
                          error ->
281
                              _MRef = erlang:monitor(process, Owner),
×
282
                              maps:put(Owner, set_put(Resource, #{}), MonAcc)
×
283
                      end
284
                  end, Monitors, check_lease(Tab, Lease, erlang:system_time(millisecond))),
285
    {noreply, State#state{monitors = Monitors1}, hibernate};
×
286

287
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
288
            State = #state{locks = Tab, monitors = Monitors}) ->
289
    case maps:find(DownPid, Monitors) of
×
290
        {ok, ResourceSet} ->
291
            lists:foreach(
×
292
              fun(Resource) ->
293
                  case ets:lookup(Tab, Resource) of
×
294
                      [Lock = #lock{owner = OwnerPid}] when OwnerPid =:= DownPid ->
295
                          ets:delete_object(Tab, Lock);
×
296
                      _ -> ok
×
297
                  end
298
              end, set_to_list(ResourceSet)),
299
            {noreply, State#state{monitors = maps:remove(DownPid, Monitors)}};
×
300
        error ->
301
            {noreply, State}
×
302
    end;
303

304
handle_info(Info, State) ->
305
    ?LOG(error, "Unexpected info: ~p", [Info]),
×
306
    {noreply, State}.
×
307

308
terminate(_Reason, _State = #state{lease = Lease}) ->
309
    cancel_lease(Lease).
4✔
310

311
code_change(_OldVsn, State, _Extra) ->
312
    {ok, State}.
×
313

314
%%--------------------------------------------------------------------
315
%% Internal functions
316
%%--------------------------------------------------------------------
317

318
check_lease(Tab, #lease{expiry = Expiry}, Now) ->
319
    Spec = ets:fun2ms(fun({_, _, _, _, T} = Resource) when (Now - T) > Expiry -> Resource end),
×
320
    ets:select(Tab, Spec).
×
321

322
cancel_lease(#lease{timer = TRef}) -> timer:cancel(TRef).
4✔
323

324
set_put(Resource, ResourceSet) when is_map(ResourceSet) ->
325
    ResourceSet#{Resource => nil}.
×
326

327
set_to_list(ResourceSet) when is_map(ResourceSet) ->
328
    maps:keys(ResourceSet).
×
329

330
is_set_elem(Resource, ResourceSet) when is_map(ResourceSet) ->
331
    maps:is_key(Resource, ResourceSet).
×
332

333
force_kill_lock_owner(Pid, Resource) ->
334
    logger:error("kill ~p as it has held the lock for too long, resource: ~p", [Pid, Resource]),
1✔
335
    Fields = [status, message_queue_len, current_stacktrace],
1✔
336
    Status = rpc:call(node(Pid), erlang, process_info, [Pid, Fields], 5000),
1✔
337
    logger:error("lock_owner_status:~n~p", [Status]),
1✔
338
    _ = exit(Pid, kill),
1✔
339
    ok.
1✔
340

341
-ifdef(TEST).
342
force_kill_test() ->
343
    Pid = spawn(fun() ->
1✔
344
                        receive
1✔
345
                            foo ->
346
                                ok
×
347
                        end
348
                end),
349
    ?assert(is_process_alive(Pid)),
1✔
350
    ok = force_kill_lock_owner(Pid, resource),
1✔
351
    ?assertNot(is_process_alive(Pid)).
1✔
352

353
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc