• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/apps/emqx/src/emqx_broker_helper.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_broker_helper).
18

19
-behaviour(gen_server).
20

21
-include("emqx_router.hrl").
22
-include("emqx_shared_sub.hrl").
23
-include("logger.hrl").
24
-include("types.hrl").
25

26
-export([start_link/0]).
27

28
%% APIs
29
-export([
30
    register_sub/2,
31
    lookup_subid/1,
32
    lookup_subpid/1,
33
    get_sub_shard/2,
34
    create_seq/1,
35
    reclaim_seq/1
36
]).
37

38
%% Stats fun
39
-export([stats_fun/0]).
40

41
%% gen_server callbacks
42
-export([
43
    init/1,
44
    handle_call/3,
45
    handle_cast/2,
46
    handle_info/2,
47
    terminate/2,
48
    code_change/3
49
]).
50

51
-ifdef(TEST).
52
-compile(export_all).
53
-compile(nowarn_export_all).
54
-endif.
55

56
-define(HELPER, ?MODULE).
57
-define(SUBID, emqx_subid).
58
-define(SUBMON, emqx_submon).
59
-define(SUBSEQ, emqx_subseq).
60
-define(SHARD, 1024).
61

62
-define(BATCH_SIZE, 100000).
63

64
-spec start_link() -> startlink_ret().
65
start_link() ->
66
    gen_server:start_link({local, ?HELPER}, ?MODULE, [], []).
504✔
67

68
-spec register_sub(pid(), emqx_types:subid()) -> ok.
69
register_sub(SubPid, SubId) when is_pid(SubPid) ->
70
    case ets:lookup(?SUBMON, SubPid) of
14,521✔
71
        [] ->
72
            gen_server:cast(?HELPER, {register_sub, SubPid, SubId});
5,649✔
73
        [{_, SubId}] ->
74
            ok;
8,871✔
75
        _Other ->
76
            error(subid_conflict)
1✔
77
    end.
78

79
-spec lookup_subid(pid()) -> option(emqx_types:subid()).
80
lookup_subid(SubPid) when is_pid(SubPid) ->
81
    emqx_utils_ets:lookup_value(?SUBMON, SubPid).
2✔
82

83
-spec lookup_subpid(emqx_types:subid()) -> option(pid()).
84
lookup_subpid(SubId) ->
85
    emqx_utils_ets:lookup_value(?SUBID, SubId).
17✔
86

87
-spec get_sub_shard(pid(), emqx_types:topic()) -> non_neg_integer().
88
get_sub_shard(SubPid, Topic) ->
89
    case create_seq(Topic) of
14,447✔
90
        Seq when Seq =< ?SHARD -> 0;
14,447✔
91
        _ -> erlang:phash2(SubPid, shards_num()) + 1
×
92
    end.
93

94
-spec shards_num() -> pos_integer().
95
shards_num() ->
96
    %% Dynamic sharding later...
97
    ets:lookup_element(?HELPER, shards, 2).
1✔
98

99
-spec create_seq(emqx_types:topic()) -> emqx_sequence:seqid().
100
create_seq(Topic) ->
101
    emqx_sequence:nextval(?SUBSEQ, Topic).
14,448✔
102

103
-spec reclaim_seq(emqx_types:topic()) -> emqx_sequence:seqid().
104
reclaim_seq(Topic) ->
105
    emqx_sequence:reclaim(?SUBSEQ, Topic).
14,415✔
106

107
%%--------------------------------------------------------------------
108
%% Stats fun
109
%%--------------------------------------------------------------------
110

111
stats_fun() ->
112
    safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
2,660✔
113
    safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'),
2,660✔
114
    safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
2,660✔
115

116
safe_update_stats(undefined, _Stat, _MaxStat) ->
UNCOV
117
    ok;
×
118
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
119
    emqx_stats:setstat(Stat, MaxStat, Val).
7,980✔
120

121
subscriber_val() ->
122
    sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
2,660✔
123

UNCOV
124
sum_subscriber(undefined, undefined) -> undefined;
×
125
sum_subscriber(undefined, V2) when is_integer(V2) -> V2;
×
126
sum_subscriber(V1, undefined) when is_integer(V1) -> V1;
×
127
sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2.
2,660✔
128

129
table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size).
10,640✔
130

131
%%--------------------------------------------------------------------
132
%% gen_server callbacks
133
%%--------------------------------------------------------------------
134

135
init([]) ->
136
    %% Helper table
137
    ok = emqx_utils_ets:new(?HELPER, [{read_concurrency, true}]),
504✔
138
    %% Shards: CPU * 32
139
    true = ets:insert(?HELPER, {shards, emqx_vm:schedulers() * 32}),
504✔
140
    %% SubSeq: Topic -> SeqId
141
    ok = emqx_sequence:create(?SUBSEQ),
504✔
142
    %% SubId: SubId -> SubPid
143
    ok = emqx_utils_ets:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]),
504✔
144
    %% SubMon: SubPid -> SubId
145
    ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]),
504✔
146
    %% Stats timer
147
    ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0),
504✔
148
    {ok, #{pmon => emqx_pmon:new()}}.
504✔
149

150
handle_call(Req, _From, State) ->
151
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
1✔
152
    {reply, ignored, State}.
1✔
153

154
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
155
    true = (SubId =:= undefined) orelse ets:insert(?SUBID, {SubId, SubPid}),
5,649✔
156
    true = ets:insert(?SUBMON, {SubPid, SubId}),
5,649✔
157
    {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
5,649✔
158
handle_cast(Msg, State) ->
159
    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
1✔
160
    {noreply, State}.
1✔
161

162
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
163
    SubPids = [SubPid | emqx_utils:drain_down(?BATCH_SIZE)],
4,410✔
164
    ok = emqx_pool:async_submit(
4,410✔
165
        fun lists:foreach/2, [fun clean_down/1, SubPids]
166
    ),
167
    {_, PMon1} = emqx_pmon:erase_all(SubPids, PMon),
4,410✔
168
    {noreply, State#{pmon := PMon1}};
4,410✔
169
handle_info(Info, State) ->
170
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
1✔
171
    {noreply, State}.
1✔
172

173
terminate(_Reason, _State) ->
174
    true = emqx_sequence:delete(?SUBSEQ),
1✔
175
    emqx_stats:cancel_update(broker_stats).
1✔
176

177
code_change(_OldVsn, State, _Extra) ->
178
    {ok, State}.
×
179

180
%%--------------------------------------------------------------------
181
%% Internal functions
182
%%--------------------------------------------------------------------
183

184
clean_down(SubPid) ->
185
    try
5,566✔
186
        case ets:lookup(?SUBMON, SubPid) of
5,566✔
187
            [{_, SubId}] ->
188
                true = ets:delete(?SUBMON, SubPid),
5,566✔
189
                true =
5,566✔
190
                    (SubId =:= undefined) orelse
68✔
191
                        ets:delete_object(?SUBID, {SubId, SubPid}),
5,498✔
192
                emqx_broker:subscriber_down(SubPid);
5,566✔
193
            [] ->
194
                ok
×
195
        end
196
    catch
197
        error:badarg -> ok
×
198
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc