• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 823

28 Mar 2024 09:46AM UTC coverage: 61.618% (-1.2%) from 62.78%
823

push

github

web-flow
Merge pull request #233 from SergeTupchiy/EMQX-11826-prevent-left-node-from-rejoining

EMQX-11826 prevent left node from rejoining

5 of 15 new or added lines in 2 files covered. (33.33%)

2 existing lines in 1 file now uncovered.

419 of 680 relevant lines covered (61.62%)

50.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.27
/src/ekka_autocluster.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autocluster).
18

19
-behavior(gen_server).
20

21
-include_lib("mria/include/mria.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
%% API
25
-export([ enabled/0
26
        , configured/0
27
        , run/1
28
        , unregister_node/0
29
        , core_node_discovery_callback/0
30
        , stop/1
31
        ]).
32

33
%% gen_server callbacks
34
-export([ init/1
35
        , handle_call/3
36
        , handle_cast/2
37
        , handle_info/2
38
        ]).
39

40
-define(SERVER, ?MODULE).
41

42
-define(LOG(Level, Format, Args),
43
        logger:Level("Ekka(AutoCluster): " ++ Format, Args)).
44

45
%% ms
46
-define(DISCOVER_AND_JOIN_RETRY_INTERVAL, 5000).
47

48
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
49
%% API
50
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
51

52
-spec enabled() -> boolean().
53
enabled() ->
54
    configured() andalso mria_config:role() =:= core.
22✔
55

56
-spec configured() -> boolean().
57
configured() ->
58
    case ekka:env(cluster_discovery) of
22✔
59
        {ok, {manual, _}} -> false;
×
60
        {ok, _Strategy}   -> true;
22✔
61
        undefined         -> false
×
62
    end.
63

64
-spec run(atom()) -> ok | ignore.
65
run(App) ->
66
    ?tp(ekka_autocluster_run, #{app => App}),
11✔
67
    case enabled() andalso gen_server:start({local, ?SERVER}, ?MODULE, [App], []) of
11✔
68
        {ok, _Pid} ->
69
            ok;
11✔
70
        {error, {already_started, _}} ->
71
            ignore;
×
72
        false ->
73
            ignore
×
74
    end.
75

76
-spec unregister_node() -> ok | ignore.
77
unregister_node() ->
78
    with_strategy(
×
79
      fun(Mod, Options) ->
80
          log_error("Unregister", ekka_cluster_strategy:unregister(Mod, Options))
×
81
      end).
82

83
-spec stop(term()) -> ok.
84
stop(Reason) ->
NEW
85
    try
×
NEW
86
        gen_server:stop(?SERVER, {shutdown, Reason}, 5000)
×
87
    catch
88
        _:noproc ->
NEW
89
            ok
×
90
    end.
91

92
%% @doc Core node discovery used by mria by replicant nodes to find
93
%% the core nodes.
94
-spec core_node_discovery_callback() -> [node()].
95
core_node_discovery_callback() ->
96
    case ekka:env(cluster_discovery) of
19✔
97
        {ok, {manual, _}} ->
98
            [];
×
99
        {ok, {Strategy, Options}} ->
100
            Mod = strategy_module(Strategy),
19✔
101
            try ekka_cluster_strategy:discover(Mod, Options) of
19✔
102
                {ok, Nodes} ->
103
                    Nodes;
19✔
104
                {error, Reason} ->
105
                    ?LOG(error, "Core node discovery error: ~p", [Reason]),
×
106
                    []
×
107
            catch _:Err:Stack ->
108
                    ?LOG(error, "Core node discovery error ~p: ~p", [Err, Stack]),
×
109
                    []
×
110
            end;
111
        undefined ->
112
            []
×
113
    end.
114

115
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
116
%% gen_server callbacks
117
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
118

119
-record(s,
120
        { application :: atom()
121
        }).
122

123
init([App]) ->
124
    group_leader(whereis(init), self()),
11✔
125
    self() ! loop,
11✔
126
    {ok, #s{ application = App
11✔
127
           }}.
128

129
handle_info(loop, S = #s{application = App}) ->
130
    wait_application_ready(App, 10),
18✔
131
    JoinResult = discover_and_join(),
18✔
132
    case is_discovery_complete(JoinResult) of
12✔
133
        true ->
134
            ?tp(ekka_autocluster_complete, #{app => App}),
5✔
135
            {stop, normal, S};
5✔
136
        false ->
137
            timer:send_after(?DISCOVER_AND_JOIN_RETRY_INTERVAL, loop),
7✔
138
            {noreply, S}
7✔
139
    end;
140
handle_info(_, S) ->
141
    {noreply, S}.
×
142

143
is_discovery_complete(ignore) ->
144
    is_node_registered();
3✔
145
is_discovery_complete(JoinResult) ->
146
    %% Check if the node joined cluster?
147
    NodeInCluster = mria:cluster_nodes(all) =/= [node()],
9✔
148
    %% Possibly there are nodes outside the cluster; keep trying if
149
    %% so.
150
    NoNodesOutside = JoinResult =:= ok,
9✔
151
    Registered = is_node_registered(),
9✔
152
    ?tp(ekka_maybe_run_app_again,
9✔
153
        #{ node_in_cluster  => NodeInCluster
154
         , node_registered  => Registered
155
         , no_nodes_outside => NoNodesOutside
156
         }),
157
    Registered andalso NodeInCluster andalso NoNodesOutside.
9✔
158

159
handle_call(_Req, _From, S) ->
160
    {reply, {error, unknown_call}, S}.
×
161

162
handle_cast(_Req, S) ->
163
    {noreply, S}.
×
164

165
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
166
%% Internal functions
167
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
168

169
wait_application_ready(_App, 0) ->
170
    timeout;
×
171
wait_application_ready(App, Retries) ->
172
    case ekka_node:is_running(App) of
31✔
173
        true  -> ok;
22✔
174
        false -> timer:sleep(1000),
9✔
175
                 wait_application_ready(App, Retries - 1)
4✔
176
    end.
177

178
-spec discover_and_join() -> ok | ignore | error.
179
discover_and_join() ->
180
    with_strategy(
18✔
181
      fun(Mod, Options) ->
182
        try ekka_cluster_strategy:lock(Mod, Options) of
18✔
183
            ok ->
184
                discover_and_join(Mod, Options);
13✔
185
            ignore ->
186
                timer:sleep(rand:uniform(3000)),
5✔
187
                discover_and_join(Mod, Options);
4✔
188
            {error, Reason} ->
189
                ?LOG(error, "AutoCluster stopped for lock error: ~p", [Reason]),
×
190
                error
×
191
        after
192
            log_error("Unlock", ekka_cluster_strategy:unlock(Mod, Options))
12✔
193
        end
194
      end).
195

196
with_strategy(Fun) ->
197
    case ekka:env(cluster_discovery) of
18✔
198
        {ok, {manual, _}} ->
199
            ignore;
×
200
        {ok, {Strategy, Options}} ->
201
            Fun(strategy_module(Strategy), Options);
18✔
202
        undefined ->
203
            ignore
×
204
    end.
205

206
strategy_module(Strategy) ->
207
    case code:is_loaded(Strategy) of
37✔
208
        {file, _} -> Strategy; %% Provider?
×
209
        false     -> list_to_atom("ekka_cluster_" ++  atom_to_list(Strategy))
37✔
210
    end.
211

212
-spec discover_and_join(module(), ekka_cluster_strategy:options()) -> ok | ignore | error.
213
discover_and_join(Mod, Options) ->
214
    ?tp(ekka_autocluster_discover_and_join, #{mod => Mod}),
17✔
215
    try ekka_cluster_strategy:discover(Mod, Options) of
17✔
216
        {ok, Nodes} ->
217
            ?tp(ekka_autocluster_discover_and_join_ok, #{mod => Mod, nodes => Nodes}),
17✔
218
            {AliveNodes, DeadNodes} = lists:partition(
17✔
219
                                        fun ekka_node:is_aliving/1,
220
                                        Nodes),
221
            Res = maybe_join(AliveNodes),
17✔
222
            ?LOG(debug, "join result: ~p", [Res]),
12✔
223
            log_error("Register", ekka_cluster_strategy:register(Mod, Options)),
12✔
224
            case DeadNodes of
12✔
225
                [] ->
226
                    ?LOG(info, "all discovered nodes are alive", []),
6✔
227
                    case Res of
6✔
228
                        {error, _} -> error;
×
229
                        ok         -> ok;
3✔
230
                        ignore     -> ignore
3✔
231
                    end;
232
                [_ | _] ->
233
                    ?LOG(info, "discovered nodes are not responding: ~p", [DeadNodes]),
6✔
234
                    error
6✔
235
            end;
236
        {error, Reason} ->
237
            ?LOG(error, "Discovery error: ~p", [Reason]),
×
238
            error
×
239
    catch
240
        _:Error:Stacktrace ->
241
            ?LOG(error, "Discover error: ~p~n~p", [Error, Stacktrace]),
×
242
            error
×
243
    end.
244

245
-spec maybe_join([node()]) -> ignore | ok | {error, _}.
246
maybe_join([]) ->
247
    ignore;
1✔
248
maybe_join(Nodes0) ->
249
    Nodes = lists:usort(Nodes0),
16✔
250
    KnownNodes = lists:usort(mria:cluster_nodes(all)),
16✔
251
    case Nodes =:= KnownNodes of
16✔
252
        true  ->
253
            ?LOG(info, "all discovered nodes already in cluster; ignoring", []),
5✔
254
            ignore;
5✔
255
        false ->
256
            OldestNode = find_oldest_node(Nodes),
11✔
257
            ?LOG(info, "joining with ~p", [OldestNode]),
11✔
258
            join_with(OldestNode)
11✔
259
    end.
260

261
join_with(false) ->
262
    ignore;
×
263
join_with(Node) when Node =:= node() ->
264
    ignore;
2✔
265
join_with(Node) ->
266
    case ekka_cluster:join(Node) of
9✔
267
        {error, {already_in_cluster, Node}} ->
NEW
268
            ignore;
×
269
        Res ->
270
            %% Wait for ekka to be restarted after join to avoid noproc error
271
            %% that can occur if underlying cluster implementation (e.g. ekka_cluster_etcd)
272
            %% uses some processes started under ekka supervision tree
273
            _ = wait_application_ready(ekka, 10),
9✔
274
            Res
4✔
275
    end.
276

277
find_oldest_node([Node]) ->
278
    Node;
4✔
279
find_oldest_node(Nodes) ->
280
    case rpc:multicall(Nodes, mria_membership, local_member, [], 30000) of
7✔
281
        {ResL, []} ->
282
            case [M || M <- ResL, is_record(M, member)] of
7✔
283
                [] ->
284
                    ?LOG(error, "bad_members_found, all_nodes: ~p~n"
×
285
                                "normal_rpc_results:~p", [Nodes, ResL]),
286
                    false;
×
287
                Members ->
288
                    (mria_membership:oldest(Members))#member.node
7✔
289
            end;
290
        {ResL, BadNodes} ->
291
            ?LOG(error, "bad_nodes_found, failed_nodes: ~p~n"
×
292
                        "normal_rpc_results: ~p", [BadNodes, ResL]),
293
            false
×
294
   end.
295

296
is_node_registered() ->
297
    Nodes = core_node_discovery_callback(),
12✔
298
    lists:member(node(), Nodes).
12✔
299

300
log_error(Format, {error, Reason}) ->
301
    ?LOG(error, Format ++ " error: ~p", [Reason]);
×
302
log_error(_Format, _Ok) -> ok.
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc