• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 846

16 Dec 2024 04:58PM UTC coverage: 63.153%. First build
846

Pull #240

github

web-flow
Merge f0caedb17 into 81cff1c78
Pull Request #240: fix(autoheal): attempt healing complex asymmetric partitions

2 of 21 new or added lines in 2 files covered. (9.52%)

677 of 1072 relevant lines covered (63.15%)

49.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.46
/src/ekka_autoheal.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autoheal).
18

19
-include_lib("snabbkaffe/include/trace.hrl").
20

21
-export([ init/0
22
        , enabled/0
23
        , proc/1
24
        , handle_msg/2
25
        ]).
26

27
-record(autoheal, {delay, role, proc, timer}).
28

29
-type autoheal() :: #autoheal{}.
30

31
-export_type([autoheal/0]).
32

33
-define(DEFAULT_DELAY, 15000).
34
-define(LOG(Level, Format, Args),
35
        logger:Level("Ekka(Autoheal): " ++ Format, Args)).
36

37
init() ->
38
    case enabled() of
12✔
39
        {true, Delay} -> #autoheal{delay = Delay};
12✔
40
        false -> undefined
×
41
    end.
42

43
enabled() ->
44
    case ekka:env(cluster_autoheal, true) of
37✔
45
        false -> false;
×
46
        true  -> {true, ?DEFAULT_DELAY};
23✔
47
        Delay when is_integer(Delay) ->
48
            {true, Delay}
14✔
49
    end.
50

51
proc(undefined) -> undefined;
×
52
proc(#autoheal{proc = Proc}) ->
53
    Proc.
×
54

55
handle_msg(Msg, undefined) ->
56
    ?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
×
57

58
handle_msg({report_partition, _Node}, Autoheal = #autoheal{proc = Proc})
59
    when Proc =/= undefined ->
60
    Autoheal;
×
61

62
handle_msg({report_partition, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef}) ->
63
    case ekka_membership:leader() =:= node() of
×
64
        true ->
65
            ensure_cancel_timer(TRef),
×
66
            TRef1 = ekka_node_monitor:run_after(Delay, {autoheal, {create_splitview, node()}}),
×
67
            Autoheal#autoheal{role = leader, timer = TRef1};
×
68
        false ->
69
            ?LOG(critical, "I am not leader, but received partition report from ~s", [Node]),
×
70
            Autoheal
×
71
    end;
72

73
handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef})
74
  when Node =:= node() ->
75
    ensure_cancel_timer(TRef),
×
76
    case ekka_membership:is_all_alive() of
×
77
        true ->
78
            Nodes = ekka_mnesia:cluster_nodes(all),
×
79
            case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
×
80
                {Views, []} ->
81
                    SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
×
NEW
82
                    Coordinator = coordinator(SplitView),
×
NEW
83
                    ekka_node_monitor:cast(Coordinator, {heal_partition, SplitView}),
×
NEW
84
                    Autoheal#autoheal{timer = undefined};
×
85
                {_Views, BadNodes} ->
NEW
86
                    ?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
×
NEW
87
                    Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
88
            end;
89
        false ->
90
            Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
×
91
    end;
92

93
handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
94
    ?LOG(critical, "I am not leader, but received : ~p", [Msg]),
×
95
    Autoheal;
×
96

97
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
NEW
98
    case SplitView of
×
99
        %% No partitions.
NEW
100
        [] -> Autoheal;
×
NEW
101
        [{_, []}] -> Autoheal;
×
102
        %% Partitions.
103
        SplitView ->
NEW
104
            Proc = spawn_link(fun() ->
×
NEW
105
                ?tp(start_heal_partition, #{split_view => SplitView}),
×
NEW
106
                heal_partition(SplitView)
×
107
            end),
NEW
108
            Autoheal#autoheal{role = coordinator, proc = Proc}
×
109
    end;
110

111
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
112
    ?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
×
113
    Autoheal;
×
114

115
handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
116
    Autoheal#autoheal{proc = undefined};
×
117
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
NEW
118
    ?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
×
NEW
119
    _Retry = ekka_node_monitor:run_after(1000, confirm_partition),
×
120
    Autoheal#autoheal{proc = undefined};
×
121

122
handle_msg(Msg, Autoheal) ->
123
    ?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
×
124
    Autoheal.
×
125

126
compare_view({Running1, _}, {Running2, _}) ->
127
    Len1 = length(Running1), Len2 = length(Running2),
×
128
    if
×
129
        Len1 > Len2  -> true;
×
130
        Len1 == Len2 -> lists:member(node(), Running1);
×
131
        true -> false
×
132
    end.
133

134
coordinator([{Nodes, _} | _]) ->
135
    ekka_membership:coordinator(Nodes).
×
136

137
heal_partition([{Nodes, []} | _] = SplitView) ->
138
    %% Symmetric partition.
NEW
139
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
140
    reboot_minority(Nodes -- [node()]);
×
141
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
142
    %% Symmetric partition.
NEW
143
    ?LOG(info, "Healing partition: ~p", [SplitView]),
×
144
    reboot_minority(Minority);
×
145
heal_partition([{_Nodes, Stopped} | _Asymmetric] = SplitView) ->
146
    %% Asymmetric partitions.
147
    %% Start with rebooting known stopped nodes. If this won't be enough, retry mechanism
148
    %% in `ekka_node_monitor:handle_info({mnesia_system_event, ...}` should then launch
149
    %% new iteration.
NEW
150
    ?LOG(info, "Trying to heal asymmetric partition: ~p", [SplitView]),
×
NEW
151
    reboot_minority(Stopped).
×
152

153
reboot_minority(Minority) ->
154
    lists:foreach(fun shutdown/1, Minority),
×
155
    timer:sleep(rand:uniform(1000) + 100),
×
156
    lists:foreach(fun reboot/1, Minority),
×
157
    Minority.
×
158

159
shutdown(Node) ->
160
    Ret = rpc:call(Node, ekka_cluster, heal, [shutdown]),
×
161
    ?LOG(critical, "Shutdown ~s for autoheal: ~p", [Node, Ret]).
×
162

163
reboot(Node) ->
164
    Ret = rpc:call(Node, ekka_cluster, heal, [reboot]),
×
165
    ?LOG(critical, "Reboot ~s for autoheal: ~p", [Node, Ret]).
×
166

167
ensure_cancel_timer(undefined) ->
168
    ok;
×
169
ensure_cancel_timer(TRef) ->
170
    catch erlang:cancel_timer(TRef).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc