• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8702269235

16 Apr 2024 08:17AM UTC coverage: 67.82% (-0.01%) from 67.831%
8702269235

push

github

web-flow
Merge pull request #12881 from keynslug/fix/ds-repl-flaky

fix(dsrepl): make replication-related tests more stable

11 of 17 new or added lines in 1 file covered. (64.71%)

32 existing lines in 11 files now uncovered.

37936 of 55936 relevant lines covered (67.82%)

7895.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.11
/apps/emqx_conf/src/emqx_conf_app.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_conf_app).
18

19
-behaviour(application).
20

21
-export([start/2, stop/1]).
22
-export([get_override_config_file/0]).
23
-export([sync_data_from_node/0]).
24
-export([unset_config_loaded/0]).
25

26
-include_lib("emqx/include/logger.hrl").
27
-include("emqx_conf.hrl").
28

29
start(_StartType, _StartArgs) ->
30
    ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
281✔
31
    try
281✔
32
        ok = init_conf()
281✔
33
    catch
34
        C:E:St ->
35
            %% logger is not quite ready.
36
            io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
×
37
            init:stop(1)
×
38
    end,
39
    ok = emqx_config_logger:refresh_config(),
281✔
40
    emqx_conf_sup:start_link().
281✔
41

42
stop(_State) ->
43
    ok.
206✔
44

45
%% @doc emqx_conf relies on this flag to synchronize configuration between nodes.
46
%% Therefore, we must clean up this flag when emqx application is restarted by mria.
47
unset_config_loaded() ->
48
    emqx_app:unset_config_loaded().
×
49

50
%% Read the cluster config from the local node.
51
%% This function is named 'override' due to historical reasons.
52
get_override_config_file() ->
53
    Node = node(),
84✔
54
    Data = #{
84✔
55
        wall_clock => erlang:statistics(wall_clock),
56
        node => Node,
57
        release => emqx_release:version_with_prefix()
58
    },
59
    case emqx_app:init_load_done() of
84✔
60
        false ->
61
            {error, Data#{msg => "init_conf_load_not_done"}};
1✔
62
        true ->
63
            case erlang:whereis(emqx_config_handler) of
83✔
64
                undefined ->
65
                    {error, Data#{msg => "emqx_config_handler_not_ready"}};
15✔
66
                _ ->
67
                    Fun = fun() ->
68✔
68
                        TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
68✔
69
                        Conf = emqx_config_handler:get_raw_cluster_override_conf(),
68✔
70
                        HasDeprecateFile = emqx_config:has_deprecated_file(),
68✔
71
                        Data#{
68✔
72
                            conf => Conf,
73
                            tnx_id => TnxId,
74
                            has_deprecated_file => HasDeprecateFile
75
                        }
76
                    end,
77
                    case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
68✔
78
                        {atomic, Res} -> {ok, Res};
68✔
79
                        {aborted, Reason} -> {error, Data#{msg => Reason}}
×
80
                    end
81
            end
82
    end.
83

84
sync_data_from_node() ->
85
    Dir = emqx:data_dir(),
52✔
86
    TargetDirs = lists:filter(fun(Type) -> filelib:is_dir(filename:join(Dir, Type)) end, [
52✔
87
        "authz", "certs"
88
    ]),
89
    Name = "data.zip",
52✔
90
    case zip:zip(Name, TargetDirs, [memory, {cwd, Dir}]) of
52✔
91
        {ok, {Name, Bin}} -> {ok, Bin};
52✔
92
        {error, Reason} -> {error, Reason}
×
93
    end.
94

95
%% ------------------------------------------------------------------------------
96
%% Internal functions
97
%% ------------------------------------------------------------------------------
98

99
init_load(TnxId) ->
100
    case emqx_app:get_config_loader() of
281✔
101
        Module when Module == emqx; Module == emqx_conf ->
102
            ok = emqx_config:init_load(emqx_conf:schema_module()),
46✔
103
            %% Set load config done after update(init) tnx_id.
104
            ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId),
46✔
105
            ok = emqx_app:set_config_loader(emqx_conf),
46✔
106
            ok;
46✔
107
        Module ->
108
            ?SLOG(info, #{
235✔
109
                msg => "skip_init_config_load",
110
                reason => "Some application has set another config loader",
111
                loader => Module
112
            })
234✔
113
    end.
114

115
init_conf() ->
116
    emqx_cluster_rpc:wait_for_cluster_rpc(),
281✔
117
    {ok, TnxId} = sync_cluster_conf(),
281✔
118
    ok = init_load(TnxId),
281✔
119
    ok.
281✔
120

121
cluster_nodes() ->
122
    mria:cluster_nodes(cores) -- [node()].
281✔
123

124
%% @doc Try to sync the cluster config from other core nodes.
125
sync_cluster_conf() ->
126
    case cluster_nodes() of
281✔
127
        [] ->
128
            %% The first core nodes is self.
129
            ?SLOG(info, #{
213✔
130
                msg => "skip_sync_cluster_conf",
131
                reason => "This is a single node, or the first node in the cluster"
132
            }),
212✔
133
            {ok, ?DEFAULT_INIT_TXN_ID};
213✔
134
        Nodes ->
135
            sync_cluster_conf2(Nodes)
68✔
136
    end.
137

138
%% @private Some core nodes are running, try to sync the cluster config from them.
139
sync_cluster_conf2(Nodes) ->
140
    {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
68✔
141
    {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
68✔
142
    LogData = #{peer_nodes => Nodes, self_node => node()},
68✔
143
    case Failed ++ NotReady of
68✔
144
        [] ->
145
            ok;
52✔
146
        _ ->
147
            ?SLOG(
16✔
148
                warning,
16✔
149
                LogData#{
150
                    msg => "cluster_config_fetch_failures",
151
                    failed_nodes => Failed,
152
                    booting_nodes => NotReady
153
                }
×
154
            )
155
    end,
156
    MyRole = mria_rlog:role(),
68✔
157
    case Ready of
68✔
158
        [] when MyRole =:= replicant ->
159
            %% replicant should never boot without copying from a core node
UNCOV
160
            delay_and_retry(LogData#{role => replicant});
×
161
        [] ->
162
            %% none of the nodes are ready, either delay-and-retry or boot without wait
163
            TableStatus = tx_commit_table_status(),
16✔
164
            sync_cluster_conf5(TableStatus, LogData);
16✔
165
        _ ->
166
            %% copy config from the best node in the Ready list
167
            sync_cluster_conf3(Ready)
52✔
168
    end.
169

170
%% None of the peer nodes are responsive, so we have to make a decision
171
%% based on the commit lagging (if the commit table is loaded).
172
%%
173
%% It could be that the peer nodes are also booting up,
174
%% however we cannot always wait because it may run into a dead-lock.
175
%%
176
%% Giving up wait here implies that some changes made to the peer node outside
177
%% of cluster-rpc MFAs will be lost.
178
%% e.g. stop all nodes, manually change cluster.hocon in one node
179
%% then boot all nodes around the same time, the changed cluster.hocon may
180
%% get lost if the node happen to copy config from others.
181
sync_cluster_conf5({loaded, local}, LogData) ->
182
    ?SLOG(info, LogData#{
8✔
183
        msg => "skip_copy_cluster_config_from_peer_nodes",
184
        explain => "Commit table loaded locally from disk, assuming that I have the latest config"
185
    }),
8✔
186
    {ok, ?DEFAULT_INIT_TXN_ID};
8✔
187
sync_cluster_conf5({loaded, From}, LogData) ->
188
    case get_commit_lag() of
8✔
189
        #{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
190
            ?SLOG(info, LogData#{
8✔
191
                msg => "skip_copy_cluster_config_from_peer_nodes",
192
                explain => "I have the latest cluster config commit",
193
                commit_loaded_from => From,
194
                lagging_info => Lagging
195
            }),
8✔
196
            {ok, ?DEFAULT_INIT_TXN_ID};
8✔
197
        #{my_id := _MyId, latest := _Latest} = Lagging ->
198
            delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
×
199
    end;
200
sync_cluster_conf5({waiting, Waiting}, LogData) ->
201
    %% this may never happen? since we waited for table before
202
    delay_and_retry(LogData#{table_pending => Waiting}).
×
203

204
get_commit_lag() ->
205
    emqx_cluster_rpc:get_commit_lag().
8✔
206

207
delay_and_retry(LogData) ->
UNCOV
208
    Timeout = sync_delay_timeout(),
×
UNCOV
209
    ?SLOG(warning, LogData#{
×
210
        msg => "sync_cluster_conf_retry",
211
        explain =>
212
            "Cannot boot alone due to potentially stale data. "
213
            "Will try sync cluster config again after delay",
214
        delay => Timeout
215
    }),
×
UNCOV
216
    timer:sleep(Timeout),
×
UNCOV
217
    sync_cluster_conf().
×
218

219
-ifdef(TEST).
220
sync_delay_timeout() ->
UNCOV
221
    Jitter = rand:uniform(200),
×
UNCOV
222
    1_000 + Jitter.
×
223
-else.
224
sync_delay_timeout() ->
225
    Jitter = rand:uniform(2000),
226
    10_000 + Jitter.
227
-endif.
228

229
%% @private Filter out the nodes which are running a newer version than this node.
230
sync_cluster_conf3(Ready) ->
231
    case lists:filter(fun is_older_or_same_version/1, Ready) of
52✔
232
        [] ->
233
            %% All available core nodes are running a newer version than this node.
234
            %% Start this node without syncing cluster config from them.
235
            %% This is likely a restart of an older version node during cluster upgrade.
236
            NodesAndVersions = lists:map(
×
237
                fun({ok, #{node := Node, release := Release}}) ->
238
                    #{node => Node, version => Release}
×
239
                end,
240
                Ready
241
            ),
242
            ?SLOG(warning, #{
×
243
                msg => "all_available_nodes_running_newer_version",
244
                explain =>
245
                    "Booting this node without syncing cluster config from core nodes "
246
                    "because other nodes are running a newer version",
247
                versions => NodesAndVersions
248
            }),
×
249
            {ok, ?DEFAULT_INIT_TXN_ID};
×
250
        Ready2 ->
251
            sync_cluster_conf4(Ready2)
52✔
252
    end.
253

254
is_older_or_same_version({ok, #{release := RemoteRelease}}) ->
255
    try
68✔
256
        emqx_release:vsn_compare(RemoteRelease) =/= newer
68✔
257
    catch
258
        _:_ ->
259
            %% If the version is not valid (without v or e prefix),
260
            %% we know it's older than v5.1.0/e5.1.0
261
            true
×
262
    end;
263
is_older_or_same_version(_) ->
264
    %% older version has no 'release' field
265
    true.
×
266

267
%% @private Some core nodes are running and replied with their configs successfully.
268
%% Try to sort the results and save the first one for local use.
269
sync_cluster_conf4(Ready) ->
270
    [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
52✔
271
    #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
52✔
272
    HasDeprecatedFile = has_deprecated_file(Info),
52✔
273
    ?SLOG(info, #{
52✔
274
        msg => "sync_cluster_conf_success",
275
        synced_from_node => Node,
276
        has_deprecated_file => HasDeprecatedFile,
277
        local_release => emqx_release:version_with_prefix(),
278
        remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
279
        data_dir => emqx:data_dir(),
280
        tnx_id => TnxId
281
    }),
52✔
282
    ok = emqx_config:save_to_override_conf(
52✔
283
        HasDeprecatedFile,
284
        RawOverrideConf,
285
        #{override_to => cluster}
286
    ),
287
    ok = sync_data_from_node(Node),
52✔
288
    {ok, TnxId}.
52✔
289

290
tx_commit_table_status() ->
291
    TablesStatus = emqx_cluster_rpc:get_tables_status(),
16✔
292
    maps:get(?CLUSTER_COMMIT, TablesStatus).
16✔
293

294
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
1✔
295
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
296
    W1 > W2;
15✔
297
conf_sort({ok, _}, {ok, _}) ->
298
    false.
×
299

300
sync_data_from_node(Node) ->
301
    case emqx_conf_proto_v3:sync_data_from_node(Node) of
52✔
302
        {ok, DataBin} ->
303
            case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
52✔
304
                {ok, []} ->
305
                    ?SLOG(debug, #{node => Node, msg => "sync_data_from_node_empty_response"});
52✔
306
                {ok, Files} ->
307
                    ?SLOG(debug, #{
×
308
                        node => Node,
309
                        msg => "sync_data_from_node_non_empty_response",
310
                        files => Files
311
                    })
×
312
            end,
313
            ok;
52✔
314
        Error ->
315
            ?SLOG(emergency, #{node => Node, msg => "sync_data_from_node_failed", reason => Error}),
×
316
            error(Error)
×
317
    end.
318

319
has_deprecated_file(#{conf := Conf} = Info) ->
320
    case maps:find(has_deprecated_file, Info) of
52✔
321
        {ok, HasDeprecatedFile} ->
322
            HasDeprecatedFile;
52✔
323
        error ->
324
            %% The old version don't have emqx_config:has_deprecated_file/0
325
            %% Conf is not empty if deprecated file is found.
326
            Conf =/= #{}
×
327
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc