• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 845

13 Dec 2024 05:50PM UTC coverage: 62.977%. First build
845

Pull #240

github

web-flow
Merge f93e5e2e2 into 81cff1c78
Pull Request #240: fix(autoheal): attempt healing complex asymmetric partitions

2 of 24 new or added lines in 2 files covered. (8.33%)

677 of 1075 relevant lines covered (62.98%)

46.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.2
/src/ekka_node_monitor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_node_monitor).
18

19
-behaviour(gen_server).
20

21
-include("ekka.hrl").
22

23
%% API
24
-export([start_link/0, stop/0]).
25

26
-export([partitions/0]).
27

28
%% Internal Exports
29
-export([cast/2, run_after/2]).
30

31
%% gen_server Callbacks
32
-export([ init/1
33
        , handle_call/3
34
        , handle_cast/2
35
        , handle_info/2
36
        , terminate/2
37
        , code_change/3
38
        ]).
39

40
-record(state, {
41
          partitions :: list(node()),
42
          heartbeat  :: undefined | reference(),
43
          autoheal   :: ekka_autoheal:autoheal(),
44
          autoclean  :: ekka_autoclean:autoclean()
45
         }).
46

47
-define(SERVER, ?MODULE).
48
-define(LOG(Level, Format, Args),
49
        logger:Level("Ekka(Monitor): " ++ Format, Args)).
50

51
%% @doc Start the node monitor.
52
-spec(start_link() -> {ok, pid()} | {error, term()}).
53
start_link() ->
54
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
12✔
55

56
stop() -> gen_server:stop(?SERVER).
×
57

58
%% @doc Get partitions.
59
partitions() ->
60
    gen_server:call(?SERVER, partitions).
27✔
61

62
%% @private
63
cast(Node, Msg) ->
64
    gen_server:cast({?SERVER, Node}, Msg).
48✔
65

66
%% @private
67
run_after(Delay, Msg) ->
68
    erlang:send_after(Delay, ?SERVER, Msg).
1,593✔
69

70
%%--------------------------------------------------------------------
71
%% gen_server Callbacks
72
%%--------------------------------------------------------------------
73

74
init([]) ->
75
    process_flag(trap_exit, true),
12✔
76
    rand:seed(exsplus, erlang:timestamp()),
12✔
77
    net_kernel:monitor_nodes(true, [{node_type, visible}, nodedown_reason]),
12✔
78
    {ok, _} = mnesia:subscribe(system),
12✔
79
    lists:foreach(fun(N) -> self() ! {nodeup, N, []} end, nodes() -- [node()]),
12✔
80
    State = #state{partitions = [],
12✔
81
                   autoheal   = ekka_autoheal:init(),
82
                   autoclean  = ekka_autoclean:init()
83
                  },
84
    {ok, ensure_heartbeat(State)}.
12✔
85

86
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
87
    {reply, Partitions, State};
27✔
88

89
handle_call(Req, _From, State) ->
90
    ?LOG(error, "Unexpected call: ~p", [Req]),
1✔
91
    {reply, ignore, State}.
1✔
92

93
handle_cast({heartbeat, _FromNode}, State) ->
94
    {noreply, State};
11✔
95

96
handle_cast({suspect, FromNode, TargetNode}, State) ->
97
    ?LOG(info, "Suspect from ~s: ~s~n", [FromNode, TargetNode]),
1✔
98
    spawn(fun() ->
1✔
99
            Status = case net_adm:ping(TargetNode) of
1✔
100
                         pong -> up;
×
101
                         pang -> down
1✔
102
                     end,
103
            cast(FromNode, {confirm, TargetNode, Status})
1✔
104
          end),
105
    {noreply, State};
1✔
106

107
handle_cast({confirm, TargetNode, Status}, State) ->
108
    ?LOG(info, "Confirm ~s ~s", [TargetNode, Status]),
1✔
109
    {noreply, State};
1✔
110

111
handle_cast(Msg = {report_partition, _Node}, State) ->
112
    {noreply, autoheal_handle_msg(Msg, State)};
×
113

114
handle_cast(Msg = {heal_partition, _SplitView}, State) ->
115
    {noreply, autoheal_handle_msg(Msg, State)};
×
116

117
handle_cast(Msg, State) ->
118
    ?LOG(error, "Unexpected cast: ~p", [Msg]),
2✔
119
    {noreply, State}.
2✔
120

121
handle_info({nodeup, Node, _Info}, State) ->
122
    ekka_membership:node_up(Node),
49✔
123
    {noreply, State};
49✔
124

125
handle_info({nodedown, Node, _Info}, State) ->
126
    ekka_membership:node_down(Node),
45✔
127
    run_after(3000, {suspect, Node}),
45✔
128
    {noreply, State};
45✔
129

130
handle_info({suspect, Node}, State) ->
131
    case ekka_mnesia:running_nodes() -- [node(), Node] of
31✔
132
        [ProxyNode|_] ->
133
            cast(ProxyNode, {suspect, node(), Node});
×
134
        [] -> ignore
31✔
135
    end,
136
    {noreply, State};
31✔
137

138
handle_info({mnesia_system_event, {mnesia_up, Node}},
139
            State = #state{partitions = Partitions}) ->
140
    ekka_membership:mnesia_up(Node),
24✔
141
    case lists:member(Node, Partitions) of
24✔
142
        false -> ok;
24✔
143
        true -> ekka_membership:partition_healed(Node)
×
144
    end,
145
    %% If there was an anymmetric cluster partition, we might need more
146
    %% autoheal iterations to completely bring the cluster back to normal.
147
    case ekka_autoheal:enabled() of
24✔
148
        {true, _} -> run_after(3000, confirm_partition);
24✔
NEW
149
        false -> ignore
×
150
    end,
151
    {noreply, State#state{partitions = lists:delete(Node, Partitions)}};
24✔
152

153
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
154
    ekka_membership:mnesia_down(Node),
25✔
155
    {noreply, State};
25✔
156

157
handle_info({mnesia_system_event, {inconsistent_database, Context, Node}},
158
            State = #state{partitions = Partitions}) ->
159
    ?LOG(critical, "Network partition detected from node ~s: ~p", [Node, Context]),
×
160
    ekka_membership:partition_occurred(Node),
×
161
    case ekka_autoheal:enabled() of
×
162
        {true, _} -> run_after(3000, confirm_partition);
×
163
        false -> ignore
×
164
    end,
165
    {noreply, State#state{partitions = lists:usort([Node | Partitions])}};
×
166

167
handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) ->
168
    ?LOG(warning, "Mnesia overload: ~p", [Details]),
×
169
    {noreply, State};
×
170

171
handle_info({mnesia_system_event, Event}, State) ->
172
    ?LOG(error, "Mnesia system event: ~p", [Event]),
×
173
    {noreply, State};
×
174

175
%% Confirm if we should report the partitions
176
handle_info(confirm_partition, State = #state{partitions = []}) ->
177
    {noreply, State};
16✔
178

179
handle_info(confirm_partition, State = #state{partitions = Partitions}) ->
180
    Leader = ekka_membership:leader(),
×
181
    case ekka_node:is_running(Leader, ekka) of
×
182
        true  -> cast(Leader, {report_partition, node()});
×
183
        false -> ?LOG(critical, "Leader is down, cannot autoheal the partitions: ~p", [Partitions])
×
184
    end,
185
    {noreply, State};
×
186

187
handle_info({autoheal, Msg}, State) ->
188
    {noreply, autoheal_handle_msg(Msg, State)};
×
189

190
handle_info(heartbeat, State) ->
191
    lists:foreach(fun(Node) ->
151✔
192
                      if Node =/= node() -> cast(Node, {heartbeat, node()});
193✔
193
                         true            -> ok
151✔
194
                      end
195
                  end, ekka_mnesia:cluster_nodes(all)),
196
    {noreply, ensure_heartbeat(State#state{heartbeat = undefined})};
151✔
197

198
handle_info(Msg = {'EXIT', Pid, _Reason}, State = #state{autoheal = Autoheal}) ->
199
    case ekka_autoheal:proc(Autoheal) of
×
200
        Pid -> {noreply, autoheal_handle_msg(Msg, State)};
×
201
        _   -> {noreply, State}
×
202
    end;
203

204
%% Autoclean Event.
205
handle_info(autoclean, State = #state{autoclean = AutoClean}) ->
206
    {noreply, State#state{autoclean = ekka_autoclean:check(AutoClean)}};
1,348✔
207

208
handle_info(Info, State) ->
209
    ?LOG(error, "Unexpected info: ~p", [Info]),
1✔
210
    {noreply, State}.
1✔
211

212
terminate(_Reason, _State) ->
213
    ok.
12✔
214

215
code_change(_OldVsn, State, _Extra) ->
216
    {ok, State}.
×
217

218
%%--------------------------------------------------------------------
219
%% Internal functions
220
%%--------------------------------------------------------------------
221

222
%% TODO: This function triggers a bug in dialyzer, where it forgets about some record fields.
223
-dialyzer({nowarn_function, [ensure_heartbeat/1]}).
224
ensure_heartbeat(State = #state{heartbeat = undefined}) ->
225
    Interval = rand:uniform(2000) + 2000,
163✔
226
    State#state{heartbeat = run_after(Interval, heartbeat)};
163✔
227

228
ensure_heartbeat(State) ->
229
    State.
×
230

231
autoheal_handle_msg(Msg, State = #state{autoheal = Autoheal}) ->
232
    State#state{autoheal = ekka_autoheal:handle_msg(Msg, Autoheal)}.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc