• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 789

06 Dec 2023 01:58PM UTC coverage: 62.836% (-0.5%) from 63.315%
789

push

github

web-flow
Merge pull request #223 from zmstone/1206-delete-mcast

1206 delete mcast

421 of 670 relevant lines covered (62.84%)

295.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.52
/src/ekka_autocluster.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_autocluster).
18

19
-behavior(gen_server).
20

21
-include_lib("mria/include/mria.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
%% API
25
-export([ enabled/0
26
        , run/1
27
        , unregister_node/0
28
        , core_node_discovery_callback/0
29
        ]).
30

31
%% gen_server callbacks
32
-export([ init/1
33
        , handle_call/3
34
        , handle_cast/2
35
        , handle_info/2
36
        ]).
37

38
-define(SERVER, ?MODULE).
39

40
-define(LOG(Level, Format, Args),
41
        logger:Level("Ekka(AutoCluster): " ++ Format, Args)).
42

43
%% ms
44
-define(DISCOVER_AND_JOIN_RETRY_INTERVAL, 5000).
45

46
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
47
%% API
48
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
49

50
-spec enabled() -> boolean().
51
enabled() ->
52
    case ekka:env(cluster_discovery) of
22✔
53
        {ok, {manual, _}} -> false;
×
54
        {ok, _Strategy}   -> mria_config:role() =:= core;
22✔
55
        undefined         -> false
×
56
    end.
57

58
-spec run(atom()) -> ok | ignore.
59
run(App) ->
60
    ?tp(ekka_autocluster_run, #{app => App}),
11✔
61
    case enabled() andalso gen_server:start({local, ?SERVER}, ?MODULE, [App], []) of
11✔
62
        {ok, _Pid} ->
63
            ok;
11✔
64
        {error, {already_started, _}} ->
65
            ignore;
×
66
        false ->
67
            ignore
×
68
    end.
69

70
-spec unregister_node() -> ok | ignore.
71
unregister_node() ->
72
    with_strategy(
×
73
      fun(Mod, Options) ->
74
          log_error("Unregister", ekka_cluster_strategy:unregister(Mod, Options))
×
75
      end).
76

77
%% @doc Core node discovery used by mria by replicant nodes to find
78
%% the core nodes.
79
-spec core_node_discovery_callback() -> [node()].
80
core_node_discovery_callback() ->
81
    case ekka:env(cluster_discovery) of
20✔
82
        {ok, {manual, _}} ->
83
            [];
×
84
        {ok, {Strategy, Options}} ->
85
            Mod = strategy_module(Strategy),
20✔
86
            try ekka_cluster_strategy:discover(Mod, Options) of
20✔
87
                {ok, Nodes} ->
88
                    Nodes;
20✔
89
                {error, Reason} ->
90
                    ?LOG(error, "Core node discovery error: ~p", [Reason]),
×
91
                    []
×
92
            catch _:Err:Stack ->
93
                    ?LOG(error, "Core node discovery error ~p: ~p", [Err, Stack]),
×
94
                    []
×
95
            end;
96
        undefined ->
97
            []
×
98
    end.
99

100
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
101
%% gen_server callbacks
102
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
103

104
-record(s,
105
        { application :: atom()
106
        }).
107

108
init([App]) ->
109
    group_leader(whereis(init), self()),
11✔
110
    self() ! loop,
11✔
111
    {ok, #s{ application = App
11✔
112
           }}.
113

114
handle_info(loop, S = #s{application = App}) ->
115
    wait_application_ready(App, 10),
18✔
116
    JoinResult = discover_and_join(),
18✔
117
    case is_discovery_complete(JoinResult) of
12✔
118
        true ->
119
            ?tp(ekka_autocluster_complete, #{app => App}),
5✔
120
            {stop, normal, S};
5✔
121
        false ->
122
            timer:send_after(?DISCOVER_AND_JOIN_RETRY_INTERVAL, loop),
7✔
123
            {noreply, S}
7✔
124
    end;
125
handle_info(_, S) ->
126
    {noreply, S}.
×
127

128
is_discovery_complete(ignore) ->
129
    is_node_registered();
3✔
130
is_discovery_complete(JoinResult) ->
131
    %% Check if the node joined cluster?
132
    NodeInCluster = mria:cluster_nodes(all) =/= [node()],
9✔
133
    %% Possibly there are nodes outside the cluster; keep trying if
134
    %% so.
135
    NoNodesOutside = JoinResult =:= ok,
9✔
136
    Registered = is_node_registered(),
9✔
137
    ?tp(ekka_maybe_run_app_again,
9✔
138
        #{ node_in_cluster  => NodeInCluster
139
         , node_registered  => Registered
140
         , no_nodes_outside => NoNodesOutside
141
         }),
142
    Registered andalso NodeInCluster andalso NoNodesOutside.
9✔
143

144
handle_call(_Req, _From, S) ->
145
    {reply, {error, unknown_call}, S}.
×
146

147
handle_cast(_Req, S) ->
148
    {noreply, S}.
×
149

150
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
151
%% Internal functions
152
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
153

154
wait_application_ready(_App, 0) ->
155
    timeout;
×
156
wait_application_ready(App, Retries) ->
157
    case ekka_node:is_running(App) of
31✔
158
        true  -> ok;
22✔
159
        false -> timer:sleep(1000),
9✔
160
                 wait_application_ready(App, Retries - 1)
4✔
161
    end.
162

163
-spec discover_and_join() -> ok | ignore | error.
164
discover_and_join() ->
165
    with_strategy(
18✔
166
      fun(Mod, Options) ->
167
        try ekka_cluster_strategy:lock(Mod, Options) of
18✔
168
            ok ->
169
                discover_and_join(Mod, Options);
13✔
170
            ignore ->
171
                timer:sleep(rand:uniform(3000)),
5✔
172
                discover_and_join(Mod, Options);
4✔
173
            {error, Reason} ->
174
                ?LOG(error, "AutoCluster stopped for lock error: ~p", [Reason]),
×
175
                error
×
176
        after
177
            log_error("Unlock", ekka_cluster_strategy:unlock(Mod, Options))
12✔
178
        end
179
      end).
180

181
with_strategy(Fun) ->
182
    case ekka:env(cluster_discovery) of
18✔
183
        {ok, {manual, _}} ->
184
            ignore;
×
185
        {ok, {Strategy, Options}} ->
186
            Fun(strategy_module(Strategy), Options);
18✔
187
        undefined ->
188
            ignore
×
189
    end.
190

191
strategy_module(Strategy) ->
192
    case code:is_loaded(Strategy) of
38✔
193
        {file, _} -> Strategy; %% Provider?
×
194
        false     -> list_to_atom("ekka_cluster_" ++  atom_to_list(Strategy))
38✔
195
    end.
196

197
-spec discover_and_join(module(), ekka_cluster_strategy:options()) -> ok | ignore | error.
198
discover_and_join(Mod, Options) ->
199
    ?tp(ekka_autocluster_discover_and_join, #{mod => Mod}),
17✔
200
    try ekka_cluster_strategy:discover(Mod, Options) of
17✔
201
        {ok, Nodes} ->
202
            ?tp(ekka_autocluster_discover_and_join_ok, #{mod => Mod, nodes => Nodes}),
17✔
203
            {AliveNodes, DeadNodes} = lists:partition(
17✔
204
                                        fun ekka_node:is_aliving/1,
205
                                        Nodes),
206
            Res = maybe_join(AliveNodes),
17✔
207
            ?LOG(debug, "join result: ~p", [Res]),
12✔
208
            log_error("Register", ekka_cluster_strategy:register(Mod, Options)),
12✔
209
            case DeadNodes of
12✔
210
                [] ->
211
                    ?LOG(info, "all discovered nodes are alive", []),
6✔
212
                    case Res of
6✔
213
                        {error, _} -> error;
×
214
                        ok         -> ok;
3✔
215
                        ignore     -> ignore
3✔
216
                    end;
217
                [_ | _] ->
218
                    ?LOG(info, "discovered nodes are not responding: ~p", [DeadNodes]),
6✔
219
                    error
6✔
220
            end;
221
        {error, Reason} ->
222
            ?LOG(error, "Discovery error: ~p", [Reason]),
×
223
            error
×
224
    catch
225
        _:Error:Stacktrace ->
226
            ?LOG(error, "Discover error: ~p~n~p", [Error, Stacktrace]),
×
227
            error
×
228
    end.
229

230
-spec maybe_join([node()]) -> ignore | ok | {error, _}.
231
maybe_join([]) ->
232
    ignore;
1✔
233
maybe_join(Nodes0) ->
234
    Nodes = lists:usort(Nodes0),
16✔
235
    KnownNodes = lists:usort(mria:cluster_nodes(all)),
16✔
236
    case Nodes =:= KnownNodes of
16✔
237
        true  ->
238
            ?LOG(info, "all discovered nodes already in cluster; ignoring", []),
5✔
239
            ignore;
5✔
240
        false ->
241
            OldestNode = find_oldest_node(Nodes),
11✔
242
            ?LOG(info, "joining with ~p", [OldestNode]),
11✔
243
            join_with(OldestNode)
11✔
244
    end.
245

246
join_with(false) ->
247
    ignore;
×
248
join_with(Node) when Node =:= node() ->
249
    ignore;
2✔
250
join_with(Node) ->
251
    Res = ekka_cluster:join(Node),
9✔
252
    %% Wait for ekka to be restarted after join to avoid noproc error
253
    %% that can occur if underlying cluster implementation (e.g. ekka_cluster_etcd)
254
    %% uses some processes started under ekka supervision tree
255
    _ = wait_application_ready(ekka, 10),
9✔
256
    Res.
4✔
257

258
find_oldest_node([Node]) ->
259
    Node;
4✔
260
find_oldest_node(Nodes) ->
261
    case rpc:multicall(Nodes, mria_membership, local_member, [], 30000) of
7✔
262
        {ResL, []} ->
263
            case [M || M <- ResL, is_record(M, member)] of
7✔
264
                [] ->
265
                    ?LOG(error, "bad_members_found, all_nodes: ~p~n"
×
266
                                "normal_rpc_results:~p", [Nodes, ResL]),
267
                    false;
×
268
                Members ->
269
                    (mria_membership:oldest(Members))#member.node
7✔
270
            end;
271
        {ResL, BadNodes} ->
272
            ?LOG(error, "bad_nodes_found, failed_nodes: ~p~n"
×
273
                        "normal_rpc_results: ~p", [BadNodes, ResL]),
274
            false
×
275
   end.
276

277
is_node_registered() ->
278
    Nodes = core_node_discovery_callback(),
12✔
279
    lists:member(node(), Nodes).
12✔
280

281
log_error(Format, {error, Reason}) ->
282
    ?LOG(error, Format ++ " error: ~p", [Reason]);
×
283
log_error(_Format, _Ok) -> ok.
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc