• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 848

18 Dec 2024 09:46AM UTC coverage: 62.569%. First build
848

Pull #240

github

web-flow
Merge 710df1f66 into 81cff1c78
Pull Request #240: fix(autoheal): attempt healing complex asymmetric partitions

2 of 46 new or added lines in 2 files covered. (4.35%)

682 of 1090 relevant lines covered (62.57%)

52.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.63
/src/ekka_node_monitor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_node_monitor).
18

19
-behaviour(gen_server).
20

21
-include("ekka.hrl").
22

23
%% API
24
-export([start_link/0, stop/0]).
25

26
-export([partitions/0]).
27

28
%% Internal Exports
29
-export([cast/2, run_after/2]).
30

31
%% gen_server Callbacks
32
-export([ init/1
33
        , handle_call/3
34
        , handle_cast/2
35
        , handle_info/2
36
        , terminate/2
37
        , code_change/3
38
        ]).
39

40
-record(state, {
41
          partitions :: list(node()),
42
          heartbeat  :: undefined | reference(),
43
          autoheal   :: ekka_autoheal:autoheal(),
44
          autoclean  :: ekka_autoclean:autoclean()
45
         }).
46

47
-define(SERVER, ?MODULE).
48
-define(LOG(Level, Format, Args),
49
        logger:Level("Ekka(Monitor): " ++ Format, Args)).
50

51
%% @doc Start the node monitor.
52
-spec(start_link() -> {ok, pid()} | {error, term()}).
53
start_link() ->
54
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
12✔
55

56
stop() -> gen_server:stop(?SERVER).
×
57

58
%% @doc Get partitions.
59
partitions() ->
60
    gen_server:call(?SERVER, partitions).
32✔
61

62
%% @private
63
cast(Node, Msg) ->
64
    gen_server:cast({?SERVER, Node}, Msg).
61✔
65

66
%% @private
67
run_after(Delay, Msg) ->
68
    erlang:send_after(Delay, ?SERVER, Msg).
2,607✔
69

70
%%--------------------------------------------------------------------
71
%% gen_server Callbacks
72
%%--------------------------------------------------------------------
73

74
init([]) ->
75
    process_flag(trap_exit, true),
12✔
76
    rand:seed(exsplus, erlang:timestamp()),
12✔
77
    net_kernel:monitor_nodes(true, [{node_type, visible}, nodedown_reason]),
12✔
78
    {ok, _} = mnesia:subscribe(system),
12✔
79
    lists:foreach(fun(N) -> self() ! {nodeup, N, []} end, nodes() -- [node()]),
12✔
80
    State = #state{partitions = [],
12✔
81
                   autoheal   = ekka_autoheal:init(),
82
                   autoclean  = ekka_autoclean:init()
83
                  },
84
    {ok, ensure_heartbeat(State)}.
12✔
85

86
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
87
    {reply, Partitions, State};
32✔
88

89
handle_call(Req, _From, State) ->
90
    ?LOG(error, "Unexpected call: ~p", [Req]),
1✔
91
    {reply, ignore, State}.
1✔
92

93
handle_cast({heartbeat, _FromNode}, State) ->
94
    {noreply, State};
11✔
95

96
handle_cast({suspect, FromNode, TargetNode}, State) ->
97
    ?LOG(info, "Suspect from ~s: ~s~n", [FromNode, TargetNode]),
1✔
98
    spawn(fun() ->
1✔
99
            Status = case net_adm:ping(TargetNode) of
1✔
100
                         pong -> up;
×
101
                         pang -> down
1✔
102
                     end,
103
            cast(FromNode, {confirm, TargetNode, Status})
1✔
104
          end),
105
    {noreply, State};
1✔
106

107
handle_cast({confirm, TargetNode, Status}, State) ->
108
    ?LOG(info, "Confirm ~s ~s", [TargetNode, Status]),
2✔
109
    {noreply, State};
2✔
110

111
handle_cast(Msg = {report_partition, _Node}, State) ->
112
    {noreply, autoheal_handle_msg(Msg, State)};
×
113
handle_cast(Msg = {heal_partition, _SplitView}, State) ->
114
    {noreply, autoheal_handle_msg(Msg, State)};
×
115
handle_cast(Msg = {heal_cluster, _, _}, State) ->
NEW
116
    {noreply, autoheal_handle_msg(Msg, State)};
×
117

118
handle_cast(Msg, State) ->
119
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
2✔
120
    {noreply, State}.
2✔
121

122
handle_info({nodeup, Node, _Info}, State) ->
123
    ekka_membership:node_up(Node),
53✔
124
    {noreply, State};
53✔
125

126
handle_info({nodedown, Node, _Info}, State) ->
127
    ekka_membership:node_down(Node),
49✔
128
    run_after(3000, {suspect, Node}),
49✔
129
    {noreply, State};
49✔
130

131
handle_info({suspect, Node}, State) ->
132
    case ekka_mnesia:running_nodes() -- [node(), Node] of
35✔
133
        [ProxyNode|_] ->
134
            cast(ProxyNode, {suspect, node(), Node});
1✔
135
        [] -> ignore
34✔
136
    end,
137
    {noreply, State};
35✔
138

139
handle_info({mnesia_system_event, {mnesia_up, Node}},
140
            State = #state{partitions = Partitions}) ->
141
    ekka_membership:mnesia_up(Node),
24✔
142
    case lists:member(Node, Partitions) of
24✔
143
        false -> ok;
24✔
144
        true -> ekka_membership:partition_healed(Node)
×
145
    end,
146
    %% If there was an anymmetric cluster partition, we might need more
147
    %% autoheal iterations to completely bring the cluster back to normal.
148
    case ekka_autoheal:enabled() of
24✔
149
        {true, _} -> run_after(3000, confirm_partition);
24✔
NEW
150
        false -> ignore
×
151
    end,
152
    {noreply, State#state{partitions = lists:delete(Node, Partitions)}};
24✔
153

154
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
155
    ekka_membership:mnesia_down(Node),
25✔
156
    {noreply, State};
25✔
157

158
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}},
159
            State = #state{partitions = Partitions}) ->
160
    ?LOG(critical, "Network partition detected from node ~s: ~p", [Node, Context]),
×
161
    ekka_membership:partition_occurred(Node),
×
162
    case ekka_autoheal:enabled() of
×
163
        {true, _} -> run_after(3000, confirm_partition);
×
164
        false -> ignore
×
165
    end,
166
    {noreply, State#state{partitions = lists:usort([Node | Partitions])}};
×
167

168
handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) ->
169
    ?LOG(warning, "Mnesia overload: ~p", [Details]),
×
170
    {noreply, State};
×
171

172
handle_info({mnesia_system_event, Event}, State) ->
173
    ?LOG(error, "Mnesia system event: ~p", [Event]),
×
174
    {noreply, State};
×
175

176
%% Confirm if we should report the partitions
177
handle_info(confirm_partition, State = #state{partitions = []}) ->
178
    {noreply, State};
17✔
179

180
handle_info(confirm_partition, State = #state{partitions = Partitions}) ->
181
    Leader = ekka_membership:leader(),
×
182
    case ekka_node:is_running(Leader, ekka) of
×
183
        true  -> cast(Leader, {report_partition, node()});
×
184
        false -> ?LOG(critical, "Leader is down, cannot autoheal the partitions: ~p", [Partitions])
×
185
    end,
186
    {noreply, State};
×
187

188
handle_info({autoheal, Msg}, State) ->
189
    {noreply, autoheal_handle_msg(Msg, State)};
×
190

191
handle_info(heartbeat, State) ->
192
    lists:foreach(fun(Node) ->
190✔
193
                      if Node =/= node() -> cast(Node, {heartbeat, node()});
244✔
194
                         true            -> ok
190✔
195
                      end
196
                  end, ekka_mnesia:cluster_nodes(all)),
197
    {noreply, ensure_heartbeat(State#state{heartbeat = undefined})};
190✔
198

199
handle_info(Msg = {'EXIT', Pid, _Reason}, State = #state{autoheal = Autoheal}) ->
200
    case ekka_autoheal:proc(Autoheal) of
×
201
        Pid -> {noreply, autoheal_handle_msg(Msg, State)};
×
202
        _   -> {noreply, State}
×
203
    end;
204

205
%% Autoclean Event.
206
handle_info(autoclean, State = #state{autoclean = AutoClean}) ->
207
    {noreply, State#state{autoclean = ekka_autoclean:check(AutoClean)}};
2,319✔
208

209
handle_info(Info, State) ->
210
    ?LOG(error, "Unexpected info: ~p", [Info]),
1✔
211
    {noreply, State}.
1✔
212

213
terminate(_Reason, _State) ->
214
    ok.
12✔
215

216
code_change(_OldVsn, State, _Extra) ->
217
    {ok, State}.
×
218

219
%%--------------------------------------------------------------------
220
%% Internal functions
221
%%--------------------------------------------------------------------
222

223
%% TODO: This function triggers a bug in dialyzer, where it forgets about some record fields.
224
-dialyzer({nowarn_function, [ensure_heartbeat/1]}).
225
ensure_heartbeat(State = #state{heartbeat = undefined}) ->
226
    Interval = rand:uniform(2000) + 2000,
202✔
227
    State#state{heartbeat = run_after(Interval, heartbeat)};
202✔
228

229
ensure_heartbeat(State) ->
230
    State.
×
231

232
autoheal_handle_msg(Msg, State = #state{autoheal = Autoheal}) ->
233
    State#state{autoheal = ekka_autoheal:handle_msg(Msg, Autoheal)}.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc