• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 762

15 Sep 2023 10:12AM UTC coverage: 63.619% (-0.1%) from 63.749%
762

push

github

turtleDeng
fix always output log

668 of 1050 relevant lines covered (63.62%)

40.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.36
/src/ekka_membership.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_membership).
18

19
-behaviour(gen_server).
20

21
-include("ekka.hrl").
22

23
-export([start_link/0, stop/0]).
24

25
%% Ring API
26
-export([ring/0, ring/1]).
27

28
%% Members API
29
-export([ local_member/0
30
        , lookup_member/1
31
        , members/0
32
        , members/1
33
        , is_member/1
34
        , oldest/1
35
        ]).
36

37
-export([ leader/0
38
        , nodelist/0
39
        , nodelist/1
40
        , coordinator/0
41
        , coordinator/1
42
        ]).
43

44
-export([is_all_alive/0]).
45

46
%% Monitor API
47
-export([monitor/3]).
48

49
%% Announce API
50
-export([announce/1]).
51

52
%% Ping/Pong API
53
-export([ping/2, pong/2]).
54

55
%% On Node/Mnesia Status
56
-export([ node_up/1
57
        , node_down/1
58
        , mnesia_up/1
59
        , mnesia_down/1
60
        ]).
61

62
%% On Cluster Status
63
-export([ partition_occurred/1
64
        , partition_healed/1
65
        ]).
66

67
%% gen_server Callbacks
68
-export([ init/1
69
        , handle_call/3
70
        , handle_cast/2
71
        , handle_info/2
72
        , terminate/2
73
        , code_change/3
74
        ]).
75

76
-record(state, {monitors, events}).
77

78
-type(event_type() :: partition | membership).
79

80
-define(SERVER, ?MODULE).
81
-define(LOG(Level, Format, Args),
82
        logger:Level("Ekka(Membership): " ++ Format, Args)).
83

84
-spec(start_link() -> {ok, pid()} | {error, term()}).
85
start_link() ->
86
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
24✔
87

88
-spec(stop() -> ok).
89
stop() ->
90
    gen_server:stop(?SERVER).
13✔
91

92
%%--------------------------------------------------------------------
93
%% API
94
%%--------------------------------------------------------------------
95

96
-spec(ring() -> [member()]).
97
ring() ->
98
    lists:keysort(#member.hash, members()).
×
99

100
-spec(ring(up | down) -> [member()]).
101
ring(Status) ->
102
    lists:keysort(#member.hash, members(Status)).
×
103

104
-spec(local_member() -> member()).
105
local_member() ->
106
    lookup_member(node()).
47✔
107

108
-spec(lookup_member(node()) -> member() | false).
109
lookup_member(Node) ->
110
    case ets:lookup(membership, Node) of [M] -> M; [] -> false end.
56✔
111

112
-spec(is_member(node()) -> boolean()).
113
is_member(Node) ->
114
    ets:member(membership, Node).
5✔
115

116
-spec(members() -> [member()]).
117
members() ->
118
    ets:tab2list(membership).
1,365✔
119

120
-spec(members(up | down) -> [member()]).
121
members(Status) ->
122
    [M || M = #member{status = St} <- members(), St =:= Status].
1,308✔
123

124
%% Get leader node of the members
125
-spec(leader() -> node()).
126
leader() ->
127
    Member = oldest(members()), Member#member.node.
1✔
128

129
%% Get coordinator node from all the alive members
130
-spec(coordinator() -> node()).
131
coordinator() ->
132
    Member = oldest(members(up)), Member#member.node.
1✔
133

134
%% Get Coordinator from nodes
135
-spec(coordinator(list(node())) -> node()).
136
coordinator(Nodes) ->
137
    Member = oldest([M || M <- [lookup_member(N) || N <- Nodes], M =/= false]),
1✔
138
    Member#member.node.
1✔
139

140
%% Get oldest member.
141
oldest(Members) ->
142
    hd(lists:sort(fun compare/2, Members)).
4✔
143

144
%% @private
145
compare(M1, M2) ->
146
    M1#member.guid < M2#member.guid.
11✔
147

148
-spec(nodelist() -> [node()]).
149
nodelist() ->
150
    [Node || #member{node = Node} <- members()].
19✔
151

152
-spec(nodelist(up | down) -> [node()]).
153
nodelist(Status) ->
154
    [Node || #member{node = Node} <- members(Status)].
5✔
155

156
-spec(is_all_alive() -> boolean()).
157
is_all_alive() ->
158
    length(ekka_mnesia:cluster_nodes(all) -- [node() | nodes()]) == 0.
1✔
159

160
-spec(monitor(event_type(), pid() | function(), boolean()) -> ok).
161
monitor(Type, PidOrFun, OnOff) ->
162
    call({monitor, {Type, PidOrFun, OnOff}}).
×
163

164
-spec(announce(join | leave | heal | {force_leave, node()}) -> ok).
165
announce(Action) ->
166
    call({announce, Action}).
17✔
167

168
-spec(ping(node(), member()) -> ok).
169
ping(Node, Member) ->
170
    case ekka_node:is_aliving(Node) of
2✔
171
        true  -> ping(Node, Member, 5);
2✔
172
        false -> ignore
×
173
    end.
174

175
ping(Node, _Member, 0) ->
176
    ?LOG(error, "Failed to ping ~s~n", [Node]);
×
177
ping(Node, Member, Retries) ->
178
    case ekka_node:is_running(Node, ekka) of
2✔
179
        true  -> cast(Node, {ping, Member});
2✔
180
        false -> timer:sleep(1000),
×
181
                 ping(Node, Member, Retries -1)
×
182
    end.
183

184
pong(Node, Member) ->
185
    cast(Node, {pong, Member}).
82✔
186

187
-spec(node_up(node()) -> ok).
188
node_up(Node) ->
189
    cast({node_up, Node}).
44✔
190

191
-spec(node_down(node()) -> ok).
192
node_down(Node) ->
193
    cast({node_down, Node}).
40✔
194

195
-spec(mnesia_up(node()) -> ok).
196
mnesia_up(Node) ->
197
    cast({mnesia_up, Node}).
24✔
198

199
-spec(mnesia_down(node()) -> ok).
200
mnesia_down(Node) ->
201
    cast({mnesia_down, Node}).
25✔
202

203
-spec partition_occurred(node()) -> ok.
204
partition_occurred(Node) ->
205
    cast({partition_occurred, Node}).
1✔
206

207
-spec partition_healed(node()) -> ok.
208
partition_healed(Node) ->
209
    cast({partition_healed, Node}).
1✔
210

211
%% @private
212
cast(Msg) ->
213
    gen_server:cast(?SERVER, Msg).
135✔
214

215
%% @private
216
cast(Node, Msg) ->
217
    gen_server:cast({?SERVER, Node}, Msg).
128✔
218

219
%% @private
220
call(Req) ->
221
    gen_server:call(?SERVER, Req).
17✔
222

223
%%--------------------------------------------------------------------
224
%% gen_server Callbacks
225
%%--------------------------------------------------------------------
226

227
init([]) ->
228
    _ = ets:new(membership, [ordered_set, protected, named_table, {keypos, 2}]),
24✔
229
    IsMnesiaRunning = case lists:member(node(), ekka_mnesia:running_nodes()) of
24✔
230
                          true  -> running;
11✔
231
                          false -> stopped
13✔
232
                      end,
233
    LocalMember = with_hash(#member{node = node(), guid = ekka_guid:gen(),
24✔
234
                                    status = up, mnesia = IsMnesiaRunning,
235
                                    ltime = erlang:timestamp()
236
                                   }),
237
    true = ets:insert(membership, LocalMember),
24✔
238
    lists:foreach(fun(Node) ->
24✔
239
                      spawn(?MODULE, ping, [Node, LocalMember])
2✔
240
                  end, ekka_mnesia:cluster_nodes(all) -- [node()]),
241
    {ok, #state{monitors = [], events = []}}.
24✔
242

243
with_hash(Member = #member{node = Node, guid = Guid}) ->
244
    Member#member{hash = erlang:phash2({Node, Guid}, trunc(math:pow(2, 32) - 1))}.
24✔
245

246
handle_call({monitor, {Type, PidOrFun, true}}, _From, State) ->
247
    reply(ok, add_monitor({Type, PidOrFun}, State));
×
248

249
handle_call({monitor, {Type, PidOrFun, false}}, _From, State) ->
250
    reply(ok, del_monitor({Type, PidOrFun}, State));
×
251

252
handle_call({announce, Action}, _From, State)
253
    when Action == join; Action == leave; Action == heal ->
254
    Status = case Action of
16✔
255
                 join  -> joining;
14✔
256
                 heal  -> healing;
×
257
                 leave -> leaving
2✔
258
             end,
259
    _ = [cast(N, {Status, node()}) || N <- nodelist(), N =/= node()],
16✔
260
    reply(ok, State);
16✔
261

262
handle_call({announce, {force_leave, Node}}, _From, State) ->
263
    _ = [cast(N, {leaving, Node}) || N <- nodelist(), N =/= Node],
1✔
264
    reply(ok, State);
1✔
265

266
handle_call(Req, _From, State) ->
267
    ?LOG(error, "Unexpected call: ~p", [Req]),
×
268
    {reply, ignore, State}.
×
269

270
handle_cast({node_up, Node}, State) ->
271
    ?LOG(info, "Node ~s up", [Node]),
44✔
272
    case ekka_mnesia:is_node_in_cluster(Node) of
44✔
273
        true ->
274
            Member = case lookup(Node) of
4✔
275
                       [M] -> M#member{status = up};
4✔
276
                       []  -> #member{node = Node, status = up}
×
277
                     end,
278
            insert(Member#member{mnesia = ekka_mnesia:cluster_status(Node)});
4✔
279
        false -> ignore
40✔
280
    end,
281
    notify({node, up, Node}, State),
44✔
282
    {noreply, State};
44✔
283

284
handle_cast({node_down, Node}, State) ->
285
    ?LOG(info, "Node ~s down", [Node]),
40✔
286
    case lookup(Node) of
40✔
287
        [#member{status = leaving}] ->
288
            ets:delete(membership, Node);
×
289
        [Member] ->
290
            insert(Member#member{status = down});
8✔
291
        [] -> ignore
32✔
292
    end,
293
    notify({node, down, Node}, State),
40✔
294
    {noreply, State};
40✔
295

296
handle_cast({joining, Node}, State) ->
297
    ?LOG(info, "Node ~s joining", [Node]),
×
298
    insert(case lookup(Node) of
×
299
               [Member] -> Member#member{status = joining};
×
300
               []       -> #member{node = Node, status = joining}
×
301
           end),
302
    notify({node, joining, Node}, State),
×
303
    {noreply, State};
×
304

305
handle_cast({healing, Node}, State) ->
306
    ?LOG(info, "Node ~s healing", [Node]),
×
307
    case lookup(Node) of
×
308
        [Member] -> insert(Member#member{status = healing});
×
309
        []       -> ignore
×
310
    end,
311
    notify({node, healing, Node}, State),
×
312
    {noreply, State};
×
313

314
handle_cast({ping, Member = #member{node = Node}}, State) ->
315
    pong(Node, local_member()),
19✔
316
    insert(Member#member{mnesia = ekka_mnesia:cluster_status(Node)}),
19✔
317
    {noreply, State};
19✔
318

319
handle_cast({pong, Member = #member{node = Node}}, State) ->
320
    insert(Member#member{mnesia = ekka_mnesia:cluster_status(Node)}),
41✔
321
    {noreply, State};
41✔
322

323
handle_cast({leaving, Node}, State) ->
324
    ?LOG(info, "Node ~s leaving", [Node]),
20✔
325
    case lookup(Node) of
20✔
326
        [#member{status = down}] ->
327
            ets:delete(membership, Node);
1✔
328
        [Member] ->
329
            insert(Member#member{status = leaving});
19✔
330
        [] -> ignore
×
331
    end,
332
    notify({node, leaving, Node}, State),
20✔
333
    {noreply, State};
20✔
334

335
handle_cast({mnesia_up, Node}, State) ->
336
    ?LOG(info, "Mnesia ~s up", [Node]),
24✔
337
    insert(case lookup(Node) of
24✔
338
               [Member] ->
339
                   Member#member{status = up, mnesia = running};
4✔
340
               [] ->
341
                   #member{node = Node, status = up, mnesia = running}
20✔
342
           end),
343
    spawn(?MODULE, pong, [Node, local_member()]),
24✔
344
    notify({mnesia, up, Node}, State),
24✔
345
    {noreply, State};
24✔
346

347
handle_cast({mnesia_down, Node}, State) ->
348
    ?LOG(info, "Mnesia ~s down", [Node]),
25✔
349
    case lookup(Node) of
25✔
350
        [#member{status = leaving}] ->
351
            ets:delete(membership, Node);
19✔
352
        [Member] ->
353
            insert(Member#member{mnesia = stopped});
6✔
354
        [] -> ignore
×
355
    end,
356
    notify({mnesia, down, Node}, State),
25✔
357
    {noreply, State};
25✔
358

359
handle_cast({partition_occurred, Node}, State) ->
360
    notify(partition, {occurred, Node}, State),
1✔
361
    {noreply, State};
1✔
362

363
handle_cast({partition_healed, Nodes}, State) ->
364
    notify(partition, {healed, Nodes}, State),
1✔
365
    {noreply, State};
1✔
366

367
handle_cast(Msg, State) ->
368
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
×
369
    {noreply, State}.
×
370

371
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
372
            State = #state{monitors = Monitors}) ->
373
    Left = [M || M = {{_, Pid}, _} <- Monitors, Pid =/= DownPid],
×
374
    {noreply, State#state{monitors = Left}};
×
375

376
handle_info(Info, State) ->
377
    ?LOG(error, "Unexpected info: ~p", [Info]),
×
378
    {noreply, State}.
×
379

380
terminate(_Reason, _State) ->
381
    ok.
13✔
382

383
code_change(_OldVsn, State, _Extra) ->
384
    {ok, State}.
×
385

386
%%--------------------------------------------------------------------
387
%% Internal functions
388
%%--------------------------------------------------------------------
389

390
lookup(Node) ->
391
    ets:lookup(membership, Node).
113✔
392

393
insert(Member) ->
394
    ets:insert(membership, Member#member{ltime = erlang:timestamp()}).
121✔
395

396
reply(Reply, State) ->
397
    {reply, Reply, State}.
17✔
398

399
notify(Event, State) ->
400
    notify(membership, Event, State).
153✔
401

402
notify(Type, Event, #state{monitors = Monitors}) ->
403
    Notify = fun(P) when is_pid(P) ->
155✔
404
                     P ! {Type, Event};
×
405
                (F) when is_function(F) ->
406
                     F({Type, Event})
×
407
             end,
408
    [Notify(PidOrFun) || {{T, PidOrFun}, _} <- Monitors, T == Type].
155✔
409

410
add_monitor({Type, PidOrFun}, S = #state{monitors = Monitors}) ->
411
    case lists:keymember({Type, PidOrFun}, 1, Monitors) of
×
412
        true  -> S;
×
413
        false ->
414
            MRef = case is_pid(PidOrFun) of
×
415
                       true -> erlang:monitor(process, PidOrFun);
×
416
                       _ -> undefined
×
417
                   end,
418
            S#state{monitors = [{{Type, PidOrFun}, MRef} | Monitors]}
×
419
    end.
420

421
del_monitor({Type, PidOrFun}, S = #state{monitors = Monitors}) ->
422
    case lists:keyfind({Type, PidOrFun}, 1, Monitors) of
×
423
        false -> S;
×
424
        {_, MRef} ->
425
            is_pid(PidOrFun) andalso erlang:demonitor(MRef, [flush]),
×
426
            S#state{monitors = lists:delete({{Type, PidOrFun}, MRef}, Monitors)}
×
427
    end.
428

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc