• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/apps/emqx_conf/src/emqx_conf_app.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_conf_app).
18

19
-behaviour(application).
20

21
-export([start/2, stop/1]).
22
-export([get_override_config_file/0]).
23
-export([sync_data_from_node/0]).
24
-export([unset_config_loaded/0]).
25

26
-include_lib("emqx/include/logger.hrl").
27
-include("emqx_conf.hrl").
28

29
start(_StartType, _StartArgs) ->
30
    ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
205✔
31
    try
205✔
32
        ok = init_conf()
205✔
33
    catch
34
        C:E:St ->
35
            %% logger is not quite ready.
36
            io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
×
37
            init:stop(1)
×
38
    end,
39
    ok = emqx_config_logger:refresh_config(),
205✔
40
    emqx_conf_sup:start_link().
205✔
41

42
stop(_State) ->
43
    ok.
181✔
44

45
%% @doc emqx_conf relies on this flag to synchronize configuration between nodes.
46
%% Therefore, we must clean up this flag when emqx application is restarted by mria.
47
unset_config_loaded() ->
48
    emqx_app:unset_config_loaded().
×
49

50
%% Read the cluster config from the local node.
51
%% This function is named 'override' due to historical reasons.
52
get_override_config_file() ->
53
    Node = node(),
20✔
54
    Data = #{
20✔
55
        wall_clock => erlang:statistics(wall_clock),
56
        node => Node,
57
        release => emqx_release:version_with_prefix()
58
    },
59
    case emqx_app:init_load_done() of
20✔
60
        false ->
UNCOV
61
            {error, Data#{msg => "init_conf_load_not_done"}};
×
62
        true ->
63
            case erlang:whereis(emqx_config_handler) of
20✔
64
                undefined ->
65
                    {error, Data#{msg => "emqx_config_handler_not_ready"}};
10✔
66
                _ ->
67
                    Fun = fun() ->
10✔
68
                        TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
10✔
69
                        Conf = emqx_config_handler:get_raw_cluster_override_conf(),
10✔
70
                        HasDeprecateFile = emqx_config:has_deprecated_file(),
10✔
71
                        Data#{
10✔
72
                            conf => Conf,
73
                            tnx_id => TnxId,
74
                            has_deprecated_file => HasDeprecateFile
75
                        }
76
                    end,
77
                    case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
10✔
78
                        {atomic, Res} -> {ok, Res};
10✔
79
                        {aborted, Reason} -> {error, Data#{msg => Reason}}
×
80
                    end
81
            end
82
    end.
83

84
sync_data_from_node() ->
85
    Dir = emqx:data_dir(),
9✔
86
    TargetDirs = lists:filter(fun(Type) -> filelib:is_dir(filename:join(Dir, Type)) end, [
9✔
87
        "authz", "certs"
88
    ]),
89
    Name = "data.zip",
9✔
90
    case zip:zip(Name, TargetDirs, [memory, {cwd, Dir}]) of
9✔
91
        {ok, {Name, Bin}} -> {ok, Bin};
9✔
92
        {error, Reason} -> {error, Reason}
×
93
    end.
94

95
%% ------------------------------------------------------------------------------
96
%% Internal functions
97
%% ------------------------------------------------------------------------------
98

99
init_load(TnxId) ->
100
    case emqx_app:get_config_loader() of
205✔
101
        Module when Module == emqx; Module == emqx_conf ->
102
            ok = emqx_config:init_load(emqx_conf:schema_module()),
41✔
103
            %% Set load config done after update(init) tnx_id.
104
            ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId),
41✔
105
            ok = emqx_app:set_config_loader(emqx_conf),
41✔
106
            ok;
41✔
107
        Module ->
108
            ?SLOG(info, #{
164✔
109
                msg => "skip_init_config_load",
110
                reason => "Some application has set another config loader",
111
                loader => Module
112
            })
163✔
113
    end.
114

115
init_conf() ->
116
    emqx_cluster_rpc:wait_for_cluster_rpc(),
205✔
117
    {ok, TnxId} = sync_cluster_conf(),
205✔
118
    ok = init_load(TnxId),
205✔
119
    ok.
205✔
120

121
cluster_nodes() ->
122
    mria:cluster_nodes(cores) -- [node()].
206✔
123

124
%% @doc Try to sync the cluster config from other core nodes.
125
sync_cluster_conf() ->
126
    case cluster_nodes() of
206✔
127
        [] ->
128
            %% The first core nodes is self.
129
            ?SLOG(info, #{
188✔
130
                msg => "skip_sync_cluster_conf",
131
                reason => "This is a single node, or the first node in the cluster"
132
            }),
187✔
133
            {ok, ?DEFAULT_INIT_TXN_ID};
188✔
134
        Nodes ->
135
            sync_cluster_conf2(Nodes)
18✔
136
    end.
137

138
%% @private Some core nodes are running, try to sync the cluster config from them.
139
sync_cluster_conf2(Nodes) ->
140
    {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
18✔
141
    {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
18✔
142
    LogData = #{peer_nodes => Nodes, self_node => node()},
18✔
143
    case Failed ++ NotReady of
18✔
144
        [] ->
145
            ok;
9✔
146
        _ ->
147
            ?SLOG(
9✔
148
                warning,
9✔
149
                LogData#{
150
                    msg => "cluster_config_fetch_failures",
151
                    failed_nodes => Failed,
152
                    booting_nodes => NotReady
153
                }
×
154
            )
155
    end,
156
    MyRole = mria_rlog:role(),
18✔
157
    case Ready of
18✔
158
        [] when MyRole =:= replicant ->
159
            %% replicant should never boot without copying from a core node
160
            delay_and_retry(LogData#{role => replicant});
1✔
161
        [] ->
162
            %% none of the nodes are ready, either delay-and-retry or boot without wait
163
            TableStatus = tx_commit_table_status(),
8✔
164
            sync_cluster_conf5(TableStatus, LogData);
8✔
165
        _ ->
166
            %% copy config from the best node in the Ready list
167
            sync_cluster_conf3(Ready)
9✔
168
    end.
169

170
%% None of the peer nodes are responsive, so we have to make a decision
171
%% based on the commit lagging (if the commit table is loaded).
172
%%
173
%% It could be that the peer nodes are also booting up,
174
%% however we cannot always wait because it may run into a dead-lock.
175
%%
176
%% Giving up wait here implies that some changes made to the peer node outside
177
%% of cluster-rpc MFAs will be lost.
178
%% e.g. stop all nodes, manually change cluster.hocon in one node
179
%% then boot all nodes around the same time, the changed cluster.hocon may
180
%% get lost if the node happen to copy config from others.
181
sync_cluster_conf5({loaded, local}, LogData) ->
182
    ?SLOG(info, LogData#{
4✔
183
        msg => "skip_copy_cluster_config_from_peer_nodes",
184
        explain => "Commit table loaded locally from disk, assuming that I have the latest config"
185
    }),
4✔
186
    {ok, ?DEFAULT_INIT_TXN_ID};
4✔
187
sync_cluster_conf5({loaded, From}, LogData) ->
188
    case get_commit_lag() of
4✔
189
        #{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
190
            ?SLOG(info, LogData#{
4✔
191
                msg => "skip_copy_cluster_config_from_peer_nodes",
192
                explain => "I have the latest cluster config commit",
193
                commit_loaded_from => From,
194
                lagging_info => Lagging
195
            }),
4✔
196
            {ok, ?DEFAULT_INIT_TXN_ID};
4✔
197
        #{my_id := _MyId, latest := _Latest} = Lagging ->
198
            delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
×
199
    end;
200
sync_cluster_conf5({waiting, Waiting}, LogData) ->
201
    %% this may never happen? since we waited for table before
202
    delay_and_retry(LogData#{table_pending => Waiting}).
×
203

204
get_commit_lag() ->
205
    emqx_cluster_rpc:get_commit_lag().
4✔
206

207
delay_and_retry(LogData) ->
208
    Timeout = sync_delay_timeout(),
1✔
209
    ?SLOG(warning, LogData#{
1✔
210
        msg => "sync_cluster_conf_retry",
211
        explain =>
212
            "Cannot boot alone due to potentially stale data. "
213
            "Will try sync cluster config again after delay",
214
        delay => Timeout
215
    }),
×
216
    timer:sleep(Timeout),
1✔
217
    sync_cluster_conf().
1✔
218

219
-ifdef(TEST).
220
sync_delay_timeout() ->
221
    Jitter = rand:uniform(200),
1✔
222
    1_000 + Jitter.
1✔
223
-else.
224
sync_delay_timeout() ->
225
    Jitter = rand:uniform(2000),
226
    10_000 + Jitter.
227
-endif.
228

229
%% @private Filter out the nodes which are running a newer version than this node.
230
sync_cluster_conf3(Ready) ->
231
    case lists:filter(fun is_older_or_same_version/1, Ready) of
9✔
232
        [] ->
233
            %% All available core nodes are running a newer version than this node.
234
            %% Start this node without syncing cluster config from them.
235
            %% This is likely a restart of an older version node during cluster upgrade.
236
            NodesAndVersions = lists:map(
×
237
                fun({ok, #{node := Node, release := Release}}) ->
238
                    #{node => Node, version => Release}
×
239
                end,
240
                Ready
241
            ),
242
            ?SLOG(warning, #{
×
243
                msg => "all_available_nodes_running_newer_version",
244
                explain =>
245
                    "Booting this node without syncing cluster config from core nodes "
246
                    "because other nodes are running a newer version",
247
                versions => NodesAndVersions
248
            }),
×
249
            {ok, ?DEFAULT_INIT_TXN_ID};
×
250
        Ready2 ->
251
            sync_cluster_conf4(Ready2)
9✔
252
    end.
253

254
is_older_or_same_version({ok, #{release := RemoteRelease}}) ->
255
    try
10✔
256
        emqx_release:vsn_compare(RemoteRelease) =/= newer
10✔
257
    catch
258
        _:_ ->
259
            %% If the version is not valid (without v or e prefix),
260
            %% we know it's older than v5.1.0/e5.1.0
261
            true
×
262
    end;
263
is_older_or_same_version(_) ->
264
    %% older version has no 'release' field
265
    true.
×
266

267
%% @private Some core nodes are running and replied with their configs successfully.
268
%% Try to sort the results and save the first one for local use.
269
sync_cluster_conf4(Ready) ->
270
    [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
9✔
271
    #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
9✔
272
    HasDeprecatedFile = has_deprecated_file(Info),
9✔
273
    ?SLOG(info, #{
9✔
274
        msg => "sync_cluster_conf_success",
275
        synced_from_node => Node,
276
        has_deprecated_file => HasDeprecatedFile,
277
        local_release => emqx_release:version_with_prefix(),
278
        remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
279
        data_dir => emqx:data_dir(),
280
        tnx_id => TnxId
281
    }),
9✔
282
    ok = emqx_config:save_to_override_conf(
9✔
283
        HasDeprecatedFile,
284
        RawOverrideConf,
285
        #{override_to => cluster}
286
    ),
287
    ok = sync_data_from_node(Node),
9✔
288
    {ok, TnxId}.
9✔
289

290
tx_commit_table_status() ->
291
    TablesStatus = emqx_cluster_rpc:get_tables_status(),
8✔
292
    maps:get(?CLUSTER_COMMIT, TablesStatus).
8✔
293

294
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
×
295
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
296
    W1 > W2;
1✔
297
conf_sort({ok, _}, {ok, _}) ->
298
    false.
×
299

300
sync_data_from_node(Node) ->
301
    case emqx_conf_proto_v3:sync_data_from_node(Node) of
9✔
302
        {ok, DataBin} ->
303
            case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
9✔
304
                {ok, []} ->
305
                    ?SLOG(debug, #{node => Node, msg => "sync_data_from_node_empty_response"});
9✔
306
                {ok, Files} ->
307
                    ?SLOG(debug, #{
×
308
                        node => Node,
309
                        msg => "sync_data_from_node_non_empty_response",
310
                        files => Files
311
                    })
×
312
            end,
313
            ok;
9✔
314
        Error ->
315
            ?SLOG(emergency, #{node => Node, msg => "sync_data_from_node_failed", reason => Error}),
×
316
            error(Error)
×
317
    end.
318

319
has_deprecated_file(#{conf := Conf} = Info) ->
320
    case maps:find(has_deprecated_file, Info) of
9✔
321
        {ok, HasDeprecatedFile} ->
322
            HasDeprecatedFile;
9✔
323
        error ->
324
            %% The old version don't have emqx_config:has_deprecated_file/0
325
            %% Conf is not empty if deprecated file is found.
326
            Conf =/= #{}
×
327
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc