• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 850

18 Dec 2024 02:02PM UTC coverage: 62.397% (-0.5%) from 62.861%
850

push

github

web-flow
Merge pull request #240 from keynslug/fix/EEC-112/autoheal-asymm

fix(autoheal): attempt healing complex asymmetric partitions

2 of 49 new or added lines in 2 files covered. (4.08%)

94 existing lines in 8 files now uncovered.

682 of 1093 relevant lines covered (62.4%)

49.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.92
/src/ekka_autocluster.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autocluster).
18

19
-behavior(gen_server).
20

21
-include("ekka.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
%% API
25
-export([ enabled/0
26
        , run/1
27
        , unregister_node/0
28
        ]).
29

30
%% gen_server callbacks
31
-export([ init/1
32
        , handle_call/3
33
        , handle_cast/2
34
        , handle_info/2
35
        ]).
36

37
-define(SERVER, ?MODULE).
38

39
-define(LOG(Level, Format, Args),
40
        logger:Level("Ekka(AutoCluster): " ++ Format, Args)).
41

42
%% ms
43
-define(DISCOVER_AND_JOIN_RETRY_INTERVAL, 5000).
44

45
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
46
%% API
47
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
48

49
-spec enabled() -> boolean().
50
enabled() ->
51
    case ekka:env(cluster_discovery) of
3✔
52
        {ok, {manual, _}} -> false;
1✔
53
        {ok, _Strategy}   -> true;
2✔
UNCOV
54
        undefined         -> false
×
55
    end.
56

57
-spec run(atom()) -> ok | ignore.
58
run(App) ->
59
    ?tp(ekka_autocluster_run, #{app => App}),
1✔
60
    case enabled() andalso gen_server:start({local, ?SERVER}, ?MODULE, [App], []) of
1✔
61
        {ok, _Pid} ->
62
            ok;
1✔
63
        {error, {already_started, _}} ->
UNCOV
64
            ignore;
×
65
        false ->
UNCOV
66
            ignore
×
67
    end.
68

69
-spec unregister_node() -> ok.
70
unregister_node() ->
UNCOV
71
    with_strategy(
×
72
      fun(Mod, Options) ->
UNCOV
73
          log_error("Unregister", ekka_cluster_strategy:unregister(Mod, Options))
×
74
      end).
75

76
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
77
%% gen_server callbacks
78
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
79

80
-record(s,
81
        { application :: atom()
82
        }).
83

84
init([App]) ->
85
    group_leader(whereis(init), self()),
1✔
86
    self() ! loop,
1✔
87
    {ok, #s{ application = App
1✔
88
           }}.
89

90
handle_info(loop, S = #s{application = App}) ->
91
    ?tp(ekka_autocluster_loop, #{}),
2✔
92
    wait_application_ready(App, 10),
2✔
93
    JoinResult = discover_and_join(),
2✔
94
    case is_discovery_complete(JoinResult) of
2✔
95
        true ->
96
            ?tp(ekka_autocluster_complete, #{app => App}),
1✔
97
            {stop, normal, S};
1✔
98
        false ->
99
            timer:send_after(?DISCOVER_AND_JOIN_RETRY_INTERVAL, loop),
1✔
100
            {noreply, S}
1✔
101
    end;
102
handle_info(_, S) ->
UNCOV
103
    {noreply, S}.
×
104

105
is_discovery_complete(ignore) ->
UNCOV
106
    true;
×
107
is_discovery_complete(JoinResult) ->
108
    %% Check if the node joined cluster?
109
    NodeInCluster = ekka_mnesia:is_node_in_cluster(),
2✔
110
    %% Possibly there are nodes outside the cluster; keep trying if
111
    %% so.
112
    NoNodesOutside = JoinResult =:= ok,
2✔
113
    ?tp(ekka_maybe_run_app_again,
2✔
114
        #{ node_in_cluster  => NodeInCluster
115
         , no_nodes_outside => NoNodesOutside
116
         }),
117
    NodeInCluster andalso NoNodesOutside.
2✔
118

119
handle_call(_Req, _From, S) ->
UNCOV
120
    {reply, {error, unknown_call}, S}.
×
121

122
handle_cast(_Req, S) ->
UNCOV
123
    {noreply, S}.
×
124

125
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
126
%% Internal functions
127
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
128

129
wait_application_ready(_App, 0) ->
UNCOV
130
    timeout;
×
131
wait_application_ready(App, Retries) ->
132
    case ekka_node:is_running(App) of
2✔
133
        true  -> ok;
2✔
UNCOV
134
        false -> timer:sleep(1000),
×
UNCOV
135
                 wait_application_ready(App, Retries - 1)
×
136
    end.
137

138
-spec discover_and_join() -> ok | ignore | error.
139
discover_and_join() ->
140
    with_strategy(
2✔
141
      fun(Mod, Options) ->
142
        try ekka_cluster_strategy:lock(Mod, Options) of
2✔
143
            ok ->
144
                discover_and_join(Mod, Options);
2✔
145
            ignore ->
UNCOV
146
                timer:sleep(rand:uniform(3000)),
×
UNCOV
147
                discover_and_join(Mod, Options);
×
148
            {error, Reason} ->
UNCOV
149
                ?LOG(error, "AutoCluster stopped for lock error: ~p", [Reason]),
×
UNCOV
150
                error
×
151
        after
152
            log_error("Unlock", ekka_cluster_strategy:unlock(Mod, Options))
2✔
153
        end
154
      end).
155

156
with_strategy(Fun) ->
157
    case ekka:env(cluster_discovery) of
2✔
158
        {ok, {manual, _}} ->
UNCOV
159
            ignore;
×
160
        {ok, {Strategy, Options}} ->
161
            Fun(strategy_module(Strategy), Options);
2✔
162
        undefined ->
163
            ignore
×
164
    end.
165

166
strategy_module(Strategy) ->
167
    case code:is_loaded(Strategy) of
2✔
UNCOV
168
        {file, _} -> Strategy; %% Provider?
×
169
        false     -> list_to_atom("ekka_cluster_" ++  atom_to_list(Strategy))
2✔
170
    end.
171

172
-spec discover_and_join(module(), ekka_cluster_strategy:options()) -> ok | ignore | error.
173
discover_and_join(Mod, Options) ->
174
    ?tp(ekka_autocluster_discover_and_join, #{mod => Mod}),
2✔
175
    try ekka_cluster_strategy:discover(Mod, Options) of
2✔
176
        {ok, Nodes} ->
177
            {AliveNodes, DeadNodes} = lists:partition(
2✔
178
                                        fun ekka_node:is_aliving/1,
179
                                        Nodes),
180
            Res = maybe_join(AliveNodes),
2✔
181
            ?LOG(debug, "join result: ~p", [Res]),
2✔
182
            log_error("Register", ekka_cluster_strategy:register(Mod, Options)),
2✔
183
            case DeadNodes of
2✔
184
                [] ->
185
                    ?LOG(info, "no discovered nodes outside cluster", []),
1✔
186
                    case Res of
1✔
UNCOV
187
                        {error, _} -> error;
×
188
                        ok         -> ok;
1✔
UNCOV
189
                        ignore     -> ignore
×
190
                    end;
191
                [_ | _] ->
192
                    ?LOG(warning, "discovered nodes outside cluster: ~p", [DeadNodes]),
1✔
193
                    error
1✔
194
            end;
195
        {error, Reason} ->
UNCOV
196
            ?LOG(error, "Discovery error: ~p", [Reason]),
×
UNCOV
197
            error
×
198
    catch
199
        _:Error:Stacktrace ->
UNCOV
200
            ?LOG(error, "Discover error: ~p~n~p", [Error, Stacktrace]),
×
UNCOV
201
            error
×
202
    end.
203

204
-spec maybe_join([node()]) -> ignore | ok | {error, _}.
205
maybe_join([]) ->
UNCOV
206
    ignore;
×
207
maybe_join(Nodes0) ->
208
    Nodes = lists:usort(Nodes0),
2✔
209
    KnownNodes = lists:usort(ekka_mnesia:cluster_nodes(all)),
2✔
210
    case Nodes =:= KnownNodes of
2✔
211
        true  ->
212
            ?LOG(info, "all discovered nodes already in cluster; ignoring", []),
1✔
213
            ignore;
1✔
214
        false ->
215
            OldestNode = find_oldest_node(Nodes),
1✔
216
            ?LOG(info, "joining with ~p", [OldestNode]),
1✔
217
            join_with(OldestNode)
1✔
218
    end.
219

220
join_with(false) ->
UNCOV
221
    ignore;
×
222
join_with(Node) when Node =:= node() ->
UNCOV
223
    ignore;
×
224
join_with(Node) ->
225
    ekka_cluster:join(Node).
1✔
226

227
find_oldest_node([Node]) ->
UNCOV
228
    Node;
×
229
find_oldest_node(Nodes) ->
230
    case rpc:multicall(Nodes, ekka_membership, local_member, [], 30000) of
1✔
231
        {ResL, []} ->
232
            case [M || M <- ResL, is_record(M, member)] of
1✔
233
                [] -> ?LOG(error, "Bad members found on nodes ~p: ~p", [Nodes, ResL]),
×
UNCOV
234
                      false;
×
235
                Members ->
236
                    (ekka_membership:oldest(Members))#member.node
1✔
237
            end;
238
        {ResL, BadNodes} ->
UNCOV
239
            ?LOG(error, "Bad nodes found: ~p, ResL: ", [BadNodes, ResL]), false
×
240
   end.
241

242
log_error(Format, {error, Reason}) ->
243
    ?LOG(error, Format ++ " error: ~p", [Reason]);
×
244
log_error(_Format, _Ok) -> ok.
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc