• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 848

18 Dec 2024 09:46AM UTC coverage: 62.569%. First build
848

Pull #240

github

web-flow
Merge 710df1f66 into 81cff1c78
Pull Request #240: fix(autoheal): attempt healing complex asymmetric partitions

2 of 46 new or added lines in 2 files covered. (4.35%)

682 of 1090 relevant lines covered (62.57%)

52.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.95
/src/ekka_autoheal.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autoheal).
18

19
-include_lib("snabbkaffe/include/trace.hrl").
20

21
-export([ init/0
22
        , enabled/0
23
        , proc/1
24
        , handle_msg/2
25
        ]).
26

27
-record(autoheal, {delay, role, proc, timer}).
28

29
-type autoheal() :: #autoheal{}.
30

31
-export_type([autoheal/0]).
32

33
-define(DEFAULT_DELAY, 15000).
34
-define(LOG(Level, Format, Args),
35
        logger:Level("Ekka(Autoheal): " ++ Format, Args)).
36

37
init() ->
38
    case enabled() of
12✔
39
        {true, Delay} -> #autoheal{delay = Delay};
12✔
40
        false -> undefined
×
41
    end.
42

43
enabled() ->
44
    case ekka:env(cluster_autoheal, true) of
37✔
45
        false -> false;
×
46
        true  -> {true, ?DEFAULT_DELAY};
23✔
47
        Delay when is_integer(Delay) ->
48
            {true, Delay}
14✔
49
    end.
50

51
proc(undefined) -> undefined;
×
NEW
52
proc(#autoheal{proc = Proc}) -> Proc.
×
53

54
handle_msg(Msg, undefined) ->
55
    ?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
×
56

57
handle_msg({report_partition, _Node}, Autoheal = #autoheal{proc = Proc})
58
    when Proc =/= undefined ->
59
    Autoheal;
×
60

61
handle_msg({report_partition, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef}) ->
62
    case ekka_membership:leader() =:= node() of
×
63
        true ->
64
            ensure_cancel_timer(TRef),
×
65
            TRef1 = ekka_node_monitor:run_after(Delay, {autoheal, {create_splitview, node()}}),
×
66
            Autoheal#autoheal{role = leader, timer = TRef1};
×
67
        false ->
68
            ?LOG(critical, "I am not leader, but received partition report from ~s", [Node]),
×
69
            Autoheal
×
70
    end;
71

72
handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef})
73
  when Node =:= node() ->
74
    ensure_cancel_timer(TRef),
×
75
    case ekka_membership:is_all_alive() of
×
76
        true ->
77
            Nodes = ekka_mnesia:cluster_nodes(all),
×
78
            case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
×
79
                {Views, []} ->
NEW
80
                    SplitView = find_split_view(Nodes, Views),
×
NEW
81
                    HealPlan = find_heal_plan(SplitView),
×
NEW
82
                    case HealPlan of
×
83
                        {Candidates = [_ | _], Minority} ->
84
                            %% Non-empty list of candidates, choose a coordinator.
NEW
85
                            CoordNode = ekka_membership:coordinator(Candidates),
×
NEW
86
                            ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView});
×
87
                        {[], Cluster} ->
88
                            %% It's very unlikely but possible to have empty list of candidates.
NEW
89
                            ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView});
×
90
                        {} ->
NEW
91
                            ignore
×
92
                    end,
NEW
93
                    Autoheal#autoheal{timer = undefined};
×
94
                {_Views, BadNodes} ->
NEW
95
                    ?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
×
NEW
96
                    Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
97
            end;
98
        false ->
99
            Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
100
    end;
101

102
handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
103
    ?LOG(critical, "I am not leader, but received : ~p", [Msg]),
×
104
    Autoheal;
×
105

106
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
107
    %% NOTE: Backward compatibility.
NEW
108
    case SplitView of
×
109
        %% No partitions.
NEW
110
        [] -> Autoheal;
×
NEW
111
        [{_, []}] -> Autoheal;
×
112
        %% Partitions.
113
        SplitView ->
NEW
114
            Proc = spawn_link(fun() -> heal_partition(SplitView) end),
×
NEW
115
            Autoheal#autoheal{role = coordinator, proc = Proc}
×
116
    end;
117

118
handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
119
    Proc = spawn_link(fun() ->
×
NEW
120
        ?tp(notice, "Healing cluster partition", #{
×
121
            need_reboot => Minority,
122
            split_view => SplitView
123
        }),
NEW
124
        reboot_minority(Minority -- [node()])
×
125
    end),
126
    Autoheal#autoheal{role = coordinator, proc = Proc};
×
127

128
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
129
    ?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
×
130
    Autoheal;
×
131

132
handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
133
    Autoheal#autoheal{proc = undefined};
×
134
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
NEW
135
    ?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
×
NEW
136
    _Retry = ekka_node_monitor:run_after(1000, confirm_partition),
×
137
    Autoheal#autoheal{proc = undefined};
×
138

139
handle_msg(Msg, Autoheal) ->
140
    ?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
×
141
    Autoheal.
×
142

143
find_split_view(Nodes, Views) ->
NEW
144
    ClusterView = lists:zipwith(
×
NEW
145
        fun(N, {Running, Stopped}) -> {N, Running, Stopped} end,
×
146
        Nodes,
147
        Views
148
    ),
NEW
149
    MajorityView = lists:usort(fun compare_node_views/2, ClusterView),
×
NEW
150
    find_split_view(MajorityView).
×
151

152
compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) ->
NEW
153
    Len1 = length(Running1),
×
NEW
154
    Len2 = length(Running2),
×
NEW
155
    case Len1 of
×
156
        %% Prefer partitions with higher number of surviving nodes.
NEW
157
        L when L > Len2 -> true;
×
158
        %% If number of nodes is the same, prefer those where current node is a survivor.
159
        %% Otherwise, sort by list of running nodes. If lists happen to be the same, this
160
        %% view will be excluded by usort.
NEW
161
        Len2 -> lists:member(node(), Running1) orelse Running1 < Running2;
×
NEW
162
        L when L < Len2 -> false
×
163
    end.
164

165
find_split_view([{_Node, _Running, []} | Views]) ->
166
    %% Node observes no partitions, ignore.
NEW
167
    find_split_view(Views);
×
168
find_split_view([View = {_Node, _Running, Partitioned} | Views]) ->
169
    %% Node observes some nodes as partitioned from it.
170
    %% These nodes need to be rebooted, and as such they should not be part of split view.
NEW
171
    Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned),
×
NEW
172
    [View | find_split_view(Rest)];
×
173
find_split_view([]) ->
NEW
174
    [].
×
175

176
find_heal_plan([{_Node, R0, P0} | Rest]) ->
177
    %% If we have more than one parition in split view, we need to reboot _all_ of the nodes
178
    %% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to
179
    %% find candidates to do it, as ⋃(Running) ∖ ⋃(Partitions).
NEW
180
    {_Nodes, Rs, Ps} = lists:unzip3(Rest),
×
NEW
181
    URunning = ordsets:union(lists:map(fun ordsets:from_list/1, [R0 | Rs])),
×
NEW
182
    UPartitions = ordsets:union(lists:map(fun ordsets:from_list/1, [P0 | Ps])),
×
NEW
183
    {ordsets:subtract(URunning, UPartitions), UPartitions};
×
184
find_heal_plan([]) ->
NEW
185
    {}.
×
186

187
heal_partition([{Nodes, []} | _] = SplitView) ->
188
    %% Symmetric partition.
NEW
189
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
190
    reboot_minority(Nodes -- [node()]);
×
191
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
192
    %% Symmetric partition.
NEW
193
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
NEW
194
    reboot_minority(Minority).
×
195

196
reboot_minority(Minority) ->
197
    lists:foreach(fun shutdown/1, Minority),
×
198
    timer:sleep(rand:uniform(1000) + 100),
×
199
    lists:foreach(fun reboot/1, Minority),
×
200
    Minority.
×
201

202
shutdown(Node) ->
203
    Ret = rpc:call(Node, ekka_cluster, heal, [shutdown]),
×
204
    ?LOG(critical, "Shutdown ~s for autoheal: ~p", [Node, Ret]).
×
205

206
reboot(Node) ->
207
    Ret = rpc:call(Node, ekka_cluster, heal, [reboot]),
×
208
    ?LOG(critical, "Reboot ~s for autoheal: ~p", [Node, Ret]).
×
209

210
ensure_cancel_timer(undefined) ->
211
    ok;
×
212
ensure_cancel_timer(TRef) ->
213
    catch erlang:cancel_timer(TRef).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc