• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 845

13 Dec 2024 05:50PM UTC coverage: 62.977%. First build
845

Pull #240

github

web-flow
Merge f93e5e2e2 into 81cff1c78
Pull Request #240: fix(autoheal): attempt healing complex asymmetric partitions

2 of 24 new or added lines in 2 files covered. (8.33%)

677 of 1075 relevant lines covered (62.98%)

46.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.14
/src/ekka_autoheal.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autoheal).
18

19
-include_lib("snabbkaffe/include/trace.hrl").
20

21
-export([ init/0
22
        , enabled/0
23
        , proc/1
24
        , handle_msg/2
25
        ]).
26

27
-record(autoheal, {delay, role, proc, timer}).
28

29
-type autoheal() :: #autoheal{}.
30

31
-export_type([autoheal/0]).
32

33
-define(DEFAULT_DELAY, 15000).
34
-define(LOG(Level, Format, Args),
35
        logger:Level("Ekka(Autoheal): " ++ Format, Args)).
36

37
init() ->
38
    case enabled() of
12✔
39
        {true, Delay} -> #autoheal{delay = Delay};
12✔
40
        false -> undefined
×
41
    end.
42

43
enabled() ->
44
    case ekka:env(cluster_autoheal, true) of
37✔
45
        false -> false;
×
46
        true  -> {true, ?DEFAULT_DELAY};
23✔
47
        Delay when is_integer(Delay) ->
48
            {true, Delay}
14✔
49
    end.
50

51
proc(undefined) -> undefined;
×
52
proc(#autoheal{proc = Proc}) ->
53
    Proc.
×
54

55
handle_msg(Msg, undefined) ->
56
    ?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
×
57

58
handle_msg({report_partition, _Node}, Autoheal = #autoheal{proc = Proc})
59
    when Proc =/= undefined ->
60
    Autoheal;
×
61

62
handle_msg({report_partition, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef}) ->
63
    case ekka_membership:leader() =:= node() of
×
64
        true ->
65
            ensure_cancel_timer(TRef),
×
66
            TRef1 = ekka_node_monitor:run_after(Delay, {autoheal, {create_splitview, node()}}),
×
67
            Autoheal#autoheal{role = leader, timer = TRef1};
×
68
        false ->
69
            ?LOG(critical, "I am not leader, but received partition report from ~s", [Node]),
×
70
            Autoheal
×
71
    end;
72

73
handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef})
74
  when Node =:= node() ->
75
    ensure_cancel_timer(TRef),
×
76
    case ekka_membership:is_all_alive() of
×
77
        true ->
78
            Nodes = ekka_mnesia:cluster_nodes(all),
×
79
            case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
×
80
                {Views, []} ->
81
                    SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
×
NEW
82
                    case SplitView of
×
NEW
83
                        [] -> ignore;
×
NEW
84
                        [{_, []}] -> ignore;
×
85
                        _Otherwise ->
NEW
86
                            Coordinator = coordinator(SplitView),
×
NEW
87
                            ekka_node_monitor:cast(Coordinator, {heal_partition, SplitView})
×
88
                    end,
NEW
89
                    Autoheal#autoheal{timer = undefined};
×
90
                {_Views, BadNodes} ->
NEW
91
                    ?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
×
NEW
92
                    Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
93
            end;
94
        false ->
95
            Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
96
    end;
97

98
handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
99
    ?LOG(critical, "I am not leader, but received : ~p", [Msg]),
×
100
    Autoheal;
×
101

102
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
NEW
103
    case SplitView of
×
104
        %% No partitions.
NEW
105
        [] -> Autoheal;
×
NEW
106
        [{_, []}] -> Autoheal;
×
107
        %% Partitions.
108
        SplitView ->
NEW
109
            Proc = spawn_link(fun() ->
×
NEW
110
                ?tp(start_heal_partition, #{split_view => SplitView}),
×
NEW
111
                heal_partition(SplitView)
×
112
            end),
NEW
113
            Autoheal#autoheal{role = coordinator, proc = Proc}
×
114
    end;
115

116
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
117
    ?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
×
118
    Autoheal;
×
119

120
handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
121
    Autoheal#autoheal{proc = undefined};
×
122
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
NEW
123
    ?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
×
NEW
124
    _Retry = ekka_node_monitor:run_after(1000, confirm_partition),
×
125
    Autoheal#autoheal{proc = undefined};
×
126

127
handle_msg(Msg, Autoheal) ->
128
    ?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
×
129
    Autoheal.
×
130

131
compare_view({Running1, _}, {Running2, _}) ->
132
    Len1 = length(Running1), Len2 = length(Running2),
×
133
    if
×
134
        Len1 > Len2  -> true;
×
135
        Len1 == Len2 -> lists:member(node(), Running1);
×
136
        true -> false
×
137
    end.
138

139
coordinator([{Nodes, _} | _]) ->
140
    ekka_membership:coordinator(Nodes).
×
141

142
heal_partition([{Nodes, []} | _] = SplitView) ->
143
    %% Symmetric partition.
NEW
144
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
145
    reboot_minority(Nodes -- [node()]);
×
146
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
147
    %% Symmetric partition.
NEW
148
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
149
    reboot_minority(Minority);
×
150
heal_partition([{_Nodes, Stopped} | _Asymmetric] = SplitView) ->
151
    %% Asymmetric partitions.
152
    %% Start with rebooting known stopped nodes. If this won't be enough, retry mechanism
153
    %% in `ekka_node_monitor:handle_info({mnesia_system_event, ...}` should then launch
154
    %% new iteration.
NEW
155
    ?LOG(info, "Trying to heal asymmetric partition: ~p", [SplitView]),
×
NEW
156
    reboot_minority(Stopped).
×
157

158
reboot_minority(Minority) ->
159
    lists:foreach(fun shutdown/1, Minority),
×
160
    timer:sleep(rand:uniform(1000) + 100),
×
161
    lists:foreach(fun reboot/1, Minority),
×
162
    Minority.
×
163

164
shutdown(Node) ->
165
    Ret = rpc:call(Node, ekka_cluster, heal, [shutdown]),
×
166
    ?LOG(critical, "Shutdown ~s for autoheal: ~p", [Node, Ret]).
×
167

168
reboot(Node) ->
169
    Ret = rpc:call(Node, ekka_cluster, heal, [reboot]),
×
170
    ?LOG(critical, "Reboot ~s for autoheal: ~p", [Node, Ret]).
×
171

172
ensure_cancel_timer(undefined) ->
173
    ok;
×
174
ensure_cancel_timer(TRef) ->
175
    catch erlang:cancel_timer(TRef).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc