• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8644497754

11 Apr 2024 09:24AM UTC coverage: 62.388% (-0.05%) from 62.44%
8644497754

push

github

web-flow
Merge pull request #12858 from zmstone/0410-fix-variform-number-handling

fix(variform): allow numbers to be numbers

2 of 3 new or added lines in 1 file covered. (66.67%)

67 existing lines in 12 files now uncovered.

34873 of 55897 relevant lines covered (62.39%)

6476.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/apps/emqx/src/emqx_router_helper.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_router_helper).
18

19
-behaviour(gen_server).
20

21
-include("emqx.hrl").
22
-include("emqx_router.hrl").
23
-include("logger.hrl").
24
-include("types.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26

27
%% Mnesia bootstrap
28
-export([create_tables/0]).
29

30
%% API
31
-export([
32
    start_link/0,
33
    monitor/1
34
]).
35

36
%% Internal export
37
-export([stats_fun/0]).
38

39
%% gen_server callbacks
40
-export([
41
    init/1,
42
    handle_call/3,
43
    handle_cast/2,
44
    handle_info/2,
45
    terminate/2,
46
    code_change/3
47
]).
48

49
%% Internal exports (RPC)
50
-export([
51
    cleanup_routes/1
52
]).
53

54
-record(routing_node, {name, const = unused}).
55

56
-define(LOCK, {?MODULE, cleanup_routes}).
57

58
-dialyzer({nowarn_function, [cleanup_routes/1]}).
59

60
%%--------------------------------------------------------------------
61
%% Mnesia bootstrap
62
%%--------------------------------------------------------------------
63

64
create_tables() ->
65
    ok = mria:create_table(?ROUTING_NODE, [
495✔
66
        {type, set},
67
        {rlog_shard, ?ROUTE_SHARD},
68
        {storage, ram_copies},
69
        {record_name, routing_node},
70
        {attributes, record_info(fields, routing_node)},
71
        {storage_properties, [{ets, [{read_concurrency, true}]}]}
72
    ]),
73
    [?ROUTING_NODE].
495✔
74

75
%%--------------------------------------------------------------------
76
%% API
77
%%--------------------------------------------------------------------
78

79
%% @doc Starts the router helper
80
-spec start_link() -> startlink_ret().
81
start_link() ->
82
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
495✔
83

84
%% @doc Monitor routing node
85
-spec monitor(node() | {binary(), node()}) -> ok.
86
monitor({_Group, Node}) ->
87
    monitor(Node);
52✔
88
monitor(Node) when is_atom(Node) ->
89
    case
5,020✔
90
        ekka:is_member(Node) orelse
4,987✔
91
            ets:member(?ROUTING_NODE, Node)
33✔
92
    of
93
        true -> ok;
5,001✔
94
        false -> mria:dirty_write(?ROUTING_NODE, #routing_node{name = Node})
19✔
95
    end.
96

97
%%--------------------------------------------------------------------
98
%% gen_server callbacks
99
%%--------------------------------------------------------------------
100

101
init([]) ->
102
    process_flag(trap_exit, true),
495✔
103
    ok = ekka:monitor(membership),
495✔
104
    _ = mria:wait_for_tables([?ROUTING_NODE]),
495✔
105
    {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
495✔
106
    Nodes = lists:foldl(
495✔
107
        fun(Node, Acc) ->
108
            case ekka:is_member(Node) of
×
109
                true ->
110
                    Acc;
×
111
                false ->
112
                    true = erlang:monitor_node(Node, true),
×
113
                    [Node | Acc]
×
114
            end
115
        end,
116
        [],
117
        mnesia:dirty_all_keys(?ROUTING_NODE)
118
    ),
119
    ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
495✔
120
    {ok, #{nodes => Nodes}, hibernate}.
495✔
121

122
handle_call(Req, _From, State) ->
123
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
4✔
124
    {reply, ignored, State}.
4✔
125

126
handle_cast(Msg, State) ->
127
    ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
4✔
128
    {noreply, State}.
4✔
129

130
handle_info(
131
    {mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
132
    State = #{nodes := Nodes}
133
) ->
134
    case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
25✔
135
        true ->
UNCOV
136
            {noreply, State};
×
137
        false ->
138
            true = erlang:monitor_node(Node, true),
25✔
139
            {noreply, State#{nodes := [Node | Nodes]}}
25✔
140
    end;
141
handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
142
    %% ignore
143
    {noreply, State};
222✔
144
handle_info({mnesia_table_event, Event}, State) ->
145
    ?SLOG(debug, #{msg => "unexpected_mnesia_table_event", event => Event}),
14✔
146
    {noreply, State};
14✔
147
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
148
    case mria_rlog:role() of
217✔
149
        core ->
150
            % TODO
151
            % Node may flap, do we need to wait for any pending cleanups in `init/1`
152
            % on the flapping node?
153
            global:trans(
202✔
154
                {?LOCK, self()},
155
                fun() -> cleanup_routes(Node) end
197✔
156
            ),
157
            ok = mria:dirty_delete(?ROUTING_NODE, Node);
197✔
158
        replicant ->
159
            ok
15✔
160
    end,
161
    ?tp(emqx_router_helper_cleanup_done, #{node => Node}),
212✔
162
    {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
212✔
163
handle_info({membership, {mnesia, down, Node}}, State) ->
164
    handle_info({nodedown, Node}, State);
53✔
165
handle_info({membership, {node, down, Node}}, State) ->
166
    handle_info({nodedown, Node}, State);
148✔
167
handle_info({membership, _Event}, State) ->
168
    {noreply, State};
119✔
169
handle_info(Info, State) ->
170
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
4✔
171
    {noreply, State}.
4✔
172

173
terminate(_Reason, _State) ->
174
    try
385✔
175
        ok = ekka:unmonitor(membership),
385✔
176
        emqx_stats:cancel_update(route_stats),
385✔
177
        mnesia:unsubscribe({table, ?ROUTING_NODE, simple})
385✔
178
    catch
179
        exit:{noproc, {gen_server, call, [mria_membership, _]}} ->
180
            ?SLOG(warning, #{msg => "mria_membership_down"}),
×
181
            ok
×
182
    end.
183

184
code_change(_OldVsn, State, _Extra) ->
185
    {ok, State}.
×
186

187
%%--------------------------------------------------------------------
188
%% Internal functions
189
%%--------------------------------------------------------------------
190

191
stats_fun() ->
192
    emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)).
2,700✔
193

194
cleanup_routes(Node) ->
195
    emqx_router:cleanup_routes(Node).
197✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc