• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8684992748

15 Apr 2024 07:16AM UTC coverage: 67.831% (+5.4%) from 62.388%
8684992748

push

github

web-flow
Merge pull request #12877 from id/0415-sync-release-56

sync release 56

29 of 40 new or added lines in 7 files covered. (72.5%)

129 existing lines in 17 files now uncovered.

37939 of 55932 relevant lines covered (67.83%)

7734.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.87
/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_dashboard_monitor).
18

19
-include("emqx_dashboard.hrl").
20

21
-include_lib("snabbkaffe/include/trace.hrl").
22
-include_lib("emqx/include/logger.hrl").
23

24
-behaviour(gen_server).
25

26
-export([create_tables/0]).
27
-export([start_link/0]).
28

29
-export([
30
    init/1,
31
    handle_call/3,
32
    handle_cast/2,
33
    handle_info/2,
34
    terminate/2,
35
    code_change/3
36
]).
37

38
-export([
39
    samplers/0,
40
    samplers/2,
41
    current_rate/1,
42
    granularity_adapter/1
43
]).
44

45
-ifdef(TEST).
46
-export([current_rate_cluster/0]).
47
-endif.
48

49
%% for rpc
50
-export([do_sample/2]).
51

52
-define(TAB, ?MODULE).
53

54
%% 1 hour = 60 * 60 * 1000 milliseconds
55
-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
56
%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
57
-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
58

59
-record(state, {
60
    last
61
}).
62

63
-record(emqx_monit, {
64
    time :: integer(),
65
    data :: map()
66
}).
67

68
create_tables() ->
69
    ok = mria:create_table(?TAB, [
186✔
70
        {type, set},
71
        {local_content, true},
72
        {storage, disc_copies},
73
        {record_name, emqx_monit},
74
        {attributes, record_info(fields, emqx_monit)}
75
    ]),
76
    [?TAB].
186✔
77

78
%% -------------------------------------------------------------------------------------------------
79
%% API
80

81
samplers() ->
82
    format(do_sample(all, infinity)).
1✔
83

84
samplers(NodeOrCluster, Latest) ->
85
    Time = latest2time(Latest),
8✔
86
    case format(do_sample(NodeOrCluster, Time)) of
8✔
87
        {badrpc, Reason} ->
88
            {badrpc, Reason};
×
89
        List when is_list(List) ->
90
            granularity_adapter(List)
8✔
91
    end.
92

93
latest2time(infinity) -> infinity;
3✔
94
latest2time(Latest) -> erlang:system_time(millisecond) - (Latest * 1000).
5✔
95

96
%% When the number of samples exceeds 1000, it affects the rendering speed of dashboard UI.
97
%% granularity_adapter is an oversampling of the samples.
98
%% Use more granular data and reduce data density.
99
%%
100
%% [
101
%%   Data1 = #{time => T1, k1 => 1, k2 => 2},
102
%%   Data2 = #{time => T2, k1 => 3, k2 => 4},
103
%%   ...
104
%% ]
105
%% After granularity_adapter, Merge Data1 Data2
106
%%
107
%% [
108
%%   #{time => T2, k1 => 1 + 3, k2 =>  2 + 6},
109
%%   ...
110
%% ]
111
%%
112
granularity_adapter(List) when length(List) > 1000 ->
113
    granularity_adapter(List, []);
×
114
granularity_adapter(List) ->
115
    List.
8✔
116

117
current_rate(all) ->
118
    current_rate_cluster();
12✔
119
current_rate(Node) when Node == node() ->
120
    try
24✔
121
        {ok, Rate} = do_call(current_rate),
24✔
122
        {ok, Rate}
24✔
123
    catch
124
        _E:R ->
125
            ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
×
126
            %% Rate map 0, ensure api will not crash.
127
            %% When joining cluster, dashboard monitor restart.
128
            Rate0 = [
×
129
                {Key, 0}
×
130
             || Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
×
131
            ],
132
            {ok, maps:merge(maps:from_list(Rate0), non_rate_value())}
×
133
    end;
134
current_rate(Node) ->
135
    case emqx_dashboard_proto_v1:current_rate(Node) of
×
136
        {badrpc, Reason} ->
137
            {badrpc, {Node, Reason}};
×
138
        {ok, Rate} ->
139
            {ok, Rate}
×
140
    end.
141

142
%% Get the current rate. Not the current sampler data.
143
current_rate_cluster() ->
144
    Fun =
14✔
145
        fun
146
            (Node, Cluster) when is_map(Cluster) ->
147
                case current_rate(Node) of
14✔
148
                    {ok, CurrentRate} ->
149
                        merge_cluster_rate(CurrentRate, Cluster);
14✔
150
                    {badrpc, Reason} ->
151
                        {badrpc, {Node, Reason}}
×
152
                end;
153
            (_Node, Error) ->
154
                Error
×
155
        end,
156
    case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
14✔
157
        {badrpc, Reason} ->
158
            {badrpc, Reason};
×
159
        Rate ->
160
            {ok, Rate}
14✔
161
    end.
162

163
%% -------------------------------------------------------------------------------------------------
164
%% gen_server functions
165

166
start_link() ->
167
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
187✔
168

169
init([]) ->
170
    sample_timer(),
187✔
171
    clean_timer(),
187✔
172
    {ok, #state{last = undefined}}.
187✔
173

174
handle_call(current_rate, _From, State = #state{last = Last}) ->
175
    NowTime = erlang:system_time(millisecond),
24✔
176
    NowSamplers = sample(NowTime),
24✔
177
    Rate = cal_rate(NowSamplers, Last),
24✔
178
    NonRateValue = non_rate_value(),
24✔
179
    Samples = maps:merge(Rate, NonRateValue),
24✔
180
    {reply, {ok, Samples}, State};
24✔
181
handle_call(_Request, _From, State = #state{}) ->
182
    {reply, ok, State}.
×
183

184
handle_cast(_Request, State = #state{}) ->
185
    {noreply, State}.
×
186

187
handle_info({sample, Time}, State = #state{last = Last}) ->
188
    Now = sample(Time),
1,681✔
189
    {atomic, ok} = flush(Last, Now),
1,681✔
190
    ?tp(dashboard_monitor_flushed, #{}),
1,681✔
191
    sample_timer(),
1,681✔
192
    {noreply, State#state{last = Now}};
1,681✔
193
handle_info(clean_expired, State) ->
194
    clean(),
×
195
    clean_timer(),
×
196
    {noreply, State};
×
197
handle_info(_Info, State = #state{}) ->
198
    {noreply, State}.
×
199

200
terminate(_Reason, _State = #state{}) ->
201
    ok.
×
202

203
code_change(_OldVsn, State = #state{}, _Extra) ->
204
    {ok, State}.
×
205

206
%% -------------------------------------------------------------------------------------------------
207
%% Internal functions
208

209
do_call(Request) ->
210
    gen_server:call(?MODULE, Request, 5000).
24✔
211

212
do_sample(all, Time) ->
213
    do_sample(mria:cluster_nodes(running), Time, #{});
5✔
214
do_sample(Node, Time) when Node == node() ->
215
    MS = match_spec(Time),
9✔
216
    internal_format(ets:select(?TAB, MS));
9✔
217
do_sample(Node, Time) ->
218
    case emqx_dashboard_proto_v1:do_sample(Node, Time) of
×
219
        {badrpc, Reason} ->
220
            {badrpc, {Node, Reason}};
×
221
        Res ->
222
            Res
×
223
    end.
224

225
do_sample([], _Time, Res) ->
226
    Res;
5✔
227
do_sample([Node | Nodes], Time, Res) ->
228
    case do_sample(Node, Time) of
5✔
229
        {badrpc, Reason} ->
230
            {badrpc, Reason};
×
231
        Samplers ->
232
            do_sample(Nodes, Time, merge_cluster_samplers(Samplers, Res))
5✔
233
    end.
234

235
match_spec(infinity) ->
236
    [{'$1', [], ['$1']}];
4✔
237
match_spec(Time) ->
238
    [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
5✔
239

240
merge_cluster_samplers(Node, Cluster) ->
241
    maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
5✔
242

243
merge_cluster_samplers(TS, NodeData, Cluster) ->
244
    case maps:get(TS, Cluster, undefined) of
39✔
245
        undefined ->
246
            Cluster#{TS => NodeData};
39✔
247
        ClusterData ->
248
            Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)}
×
249
    end.
250

251
merge_cluster_sampler_map(M1, M2) ->
252
    Fun =
×
253
        fun
254
            (topics, Map) ->
255
                Map#{topics => maps:get(topics, M1)};
×
256
            (Key, Map) ->
257
                Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
×
258
        end,
259
    lists:foldl(Fun, #{}, ?SAMPLER_LIST).
×
260

261
merge_cluster_rate(Node, Cluster) ->
262
    Fun =
14✔
263
        fun
264
            %% cluster-synced values
265
            (topics, V, NCluster) ->
266
                NCluster#{topics => V};
14✔
267
            (retained_msg_count, V, NCluster) ->
268
                NCluster#{retained_msg_count => V};
14✔
269
            (shared_subscriptions, V, NCluster) ->
270
                NCluster#{shared_subscriptions => V};
14✔
271
            (license_quota, V, NCluster) ->
272
                NCluster#{license_quota => V};
14✔
273
            %% for cluster sample, ignore node_uptime
274
            (node_uptime, _V, NCluster) ->
275
                NCluster;
14✔
276
            (Key, Value, NCluster) ->
277
                ClusterValue = maps:get(Key, NCluster, 0),
126✔
278
                NCluster#{Key => Value + ClusterValue}
126✔
279
        end,
280
    maps:fold(Fun, Cluster, Node).
14✔
281

282
format({badrpc, Reason}) ->
283
    {badrpc, Reason};
×
284
format(Data) ->
285
    All = maps:fold(fun format/3, [], Data),
9✔
286
    Compare = fun(#{time_stamp := T1}, #{time_stamp := T2}) -> T1 =< T2 end,
9✔
287
    lists:sort(Compare, All).
9✔
288

289
format(TimeStamp, Data, All) ->
290
    [Data#{time_stamp => TimeStamp} | All].
49✔
291

292
cal_rate(_Now, undefined) ->
UNCOV
293
    AllSamples = ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP),
×
UNCOV
294
    lists:foldl(fun(Key, Acc) -> Acc#{Key => 0} end, #{}, AllSamples);
×
295
cal_rate(
296
    #emqx_monit{data = NowData, time = NowTime},
297
    #emqx_monit{data = LastData, time = LastTime} = Last
298
) ->
299
    case NowTime - LastTime of
24✔
300
        0 ->
301
            %% make sure: not divide by zero
302
            timer:sleep(5),
×
303
            NewSamplers = sample(erlang:system_time(millisecond)),
×
304
            cal_rate(NewSamplers, Last);
×
305
        TimeDelta ->
306
            Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
24✔
307
            Gauge = maps:filter(Filter, NowData),
24✔
308
            {_, _, _, Rate} =
24✔
309
                lists:foldl(
310
                    fun cal_rate_/2,
311
                    {NowData, LastData, TimeDelta, Gauge},
312
                    ?DELTA_SAMPLER_LIST
313
                ),
314
            Rate
24✔
315
    end.
316

317
cal_rate_(Key, {Now, Last, TDelta, Res}) ->
318
    NewValue = maps:get(Key, Now),
144✔
319
    LastValue = maps:get(Key, Last),
144✔
320
    Rate = ((NewValue - LastValue) * 1000) div TDelta,
144✔
321
    RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
144✔
322
    {Now, Last, TDelta, Res#{RateKey => Rate}}.
144✔
323

324
granularity_adapter([], Res) ->
325
    lists:reverse(Res);
×
326
granularity_adapter([Sampler], Res) ->
327
    granularity_adapter([], [Sampler | Res]);
×
328
granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
329
    Fun =
×
330
        fun(Key, M) ->
331
            Value1 = maps:get(Key, Sampler1),
×
332
            Value2 = maps:get(Key, Sampler2),
×
333
            M#{Key => Value1 + Value2}
×
334
        end,
335
    granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
×
336

337
%% -------------------------------------------------------------------------------------------------
338
%% timer
339

340
sample_timer() ->
341
    {NextTime, Remaining} = next_interval(),
1,868✔
342
    erlang:send_after(Remaining, self(), {sample, NextTime}).
1,868✔
343

344
clean_timer() ->
345
    erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired).
187✔
346

347
%% Per interval seconds.
348
%% As an example:
349
%%  Interval = 10
350
%%  The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
351
%% Ensure that the monitor data of all nodes in the cluster are aligned in time
352
next_interval() ->
353
    Interval = emqx_conf:get([dashboard, sample_interval], ?DEFAULT_SAMPLE_INTERVAL) * 1000,
1,868✔
354
    Now = erlang:system_time(millisecond),
1,868✔
355
    NextTime = ((Now div Interval) + 1) * Interval,
1,868✔
356
    Remaining = NextTime - Now,
1,868✔
357
    {NextTime, Remaining}.
1,868✔
358

359
%% -------------------------------------------------------------------------------------------------
360
%% data
361

362
sample(Time) ->
363
    Fun =
1,705✔
364
        fun(Key, Acc) ->
365
            Acc#{Key => getstats(Key)}
17,050✔
366
        end,
367
    Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
1,705✔
368
    #emqx_monit{time = Time, data = Data}.
1,705✔
369

370
flush(_Last = undefined, Now) ->
371
    store(Now);
133✔
372
flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) ->
373
    Store = Now#emqx_monit{data = delta(LastData, NowData)},
1,548✔
374
    store(Store).
1,548✔
375

376
delta(LastData, NowData) ->
377
    Fun =
1,548✔
378
        fun(Key, Data) ->
379
            Value = maps:get(Key, NowData) - maps:get(Key, LastData),
9,288✔
380
            Data#{Key => Value}
9,288✔
381
        end,
382
    lists:foldl(Fun, NowData, ?DELTA_SAMPLER_LIST).
1,548✔
383

384
store(MonitData) ->
385
    {atomic, ok} =
1,681✔
386
        mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
387

388
clean() ->
389
    Now = erlang:system_time(millisecond),
×
390
    ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
×
391
    Expired = ets:select(?TAB, ExpiredMS),
×
392
    lists:foreach(
×
393
        fun(Data) ->
394
            true = ets:delete_object(?TAB, Data)
×
395
        end,
396
        Expired
397
    ),
398
    ok.
×
399

400
%% To make it easier to do data aggregation
401
internal_format(List) when is_list(List) ->
402
    Fun =
9✔
403
        fun(Data, All) ->
404
            maps:merge(internal_format(Data), All)
49✔
405
        end,
406
    lists:foldl(Fun, #{}, List);
9✔
407
internal_format(#emqx_monit{time = Time, data = Data}) ->
408
    #{Time => Data}.
49✔
409

410
getstats(Key) ->
411
    %% Stats ets maybe not exist when ekka join.
412
    try
17,050✔
413
        stats(Key)
17,050✔
414
    catch
415
        _:_ -> 0
×
416
    end.
417

418
stats(connections) -> emqx_stats:getstat('connections.count');
1,705✔
419
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
1,705✔
420
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
×
421
stats(topics) -> emqx_stats:getstat('topics.count');
1,705✔
422
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
1,705✔
423
stats(shared_subscriptions) -> emqx_stats:getstat('subscriptions.shared.count');
24✔
424
stats(retained_msg_count) -> emqx_stats:getstat('retained.count');
24✔
425
stats(received) -> emqx_metrics:val('messages.received');
1,705✔
426
stats(received_bytes) -> emqx_metrics:val('bytes.received');
×
427
stats(sent) -> emqx_metrics:val('messages.sent');
1,705✔
428
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
×
429
stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
1,705✔
430
stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
1,705✔
431
stats(dropped) -> emqx_metrics:val('messages.dropped');
1,705✔
432
stats(persisted) -> emqx_metrics:val('messages.persisted').
1,705✔
433

434
%% -------------------------------------------------------------------------------------------------
435
%% Retained && License Quota
436

437
%% the non rate values should be same on all nodes
438
non_rate_value() ->
439
    (license_quota())#{
24✔
440
        retained_msg_count => stats(retained_msg_count),
441
        shared_subscriptions => stats(shared_subscriptions),
442
        node_uptime => emqx_sys:uptime()
443
    }.
444

445
-if(?EMQX_RELEASE_EDITION == ee).
446
license_quota() ->
447
    case emqx_license_checker:limits() of
24✔
448
        {ok, #{max_connections := Quota}} ->
449
            #{license_quota => Quota};
×
450
        {error, no_license} ->
451
            #{license_quota => 0}
24✔
452
    end.
453
-else.
454
license_quota() ->
455
    #{}.
456
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc