• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 850

18 Dec 2024 02:02PM UTC coverage: 62.397% (-0.5%) from 62.861%
850

push

github

web-flow
Merge pull request #240 from keynslug/fix/EEC-112/autoheal-asymm

fix(autoheal): attempt healing complex asymmetric partitions

2 of 49 new or added lines in 2 files covered. (4.08%)

94 existing lines in 8 files now uncovered.

682 of 1093 relevant lines covered (62.4%)

49.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.22
/src/ekka_dist.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_dist).
18

19
-export([listen/1,
20
         listen/2,
21
         select/1,
22
         accept/1,
23
         accept_connection/5,
24
         setup/5,
25
         close/1,
26
         childspecs/0]).
27

28
-export([port/1]).
29

30
-define(DEFAULT_PORT, 4370).
31
-define(MIN_RAND_PORT, 10000).
32
-define(MAX_PORT_LIMIT, 60000).
33

34
listen(Name) ->
UNCOV
35
    listen(Name, undefined).
×
36

37
listen(Name, Host) ->
38
    %% Here we figure out what port we want to listen on.
UNCOV
39
    Port = port(Name),
×
UNCOV
40
    set_port_env(Port),
×
41
    %% Finally run the real function!
UNCOV
42
    with_module(fun(M) -> do_listen(M, Name, Host, Port) end).
×
43

44
select(Node) ->
UNCOV
45
    with_module(fun(M) -> M:select(Node) end).
×
46

47
accept(Listen) ->
48
    with_module(fun(M) -> M:accept(Listen) end).
×
49

50
accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
51
    with_module(fun(M) ->
×
UNCOV
52
                    M:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime)
×
53
                end).
54

55
setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
UNCOV
56
    with_module(fun(M) ->
×
57
                    M:setup(Node, Type, MyNode, LongOrShortNames, SetupTime)
×
58
                end).
59

60
close(Listen) ->
61
    with_module(fun(M) -> M:close(Listen) end).
×
62

63
childspecs() ->
UNCOV
64
    with_module(fun(M) -> M:childspecs() end).
×
65

66
%% Internal functions
67

68
do_listen(M, Name, Host, Port) ->
69
    case try_listen(M, Name, Host) of
×
70
        {error, eaddrinuse} when Port > 0 ->
UNCOV
71
            {error, "port " ++ integer_to_list(Port) ++ " is in use"};
×
72
        Other ->
UNCOV
73
            Other
×
74
    end.
75

76
%% The `undefined` in the first clause is from listen/1
UNCOV
77
try_listen(M, Name, undefined) -> M:listen(Name);
×
UNCOV
78
try_listen(M, Name, Host) -> M:listen(Name, Host).
×
79

80
set_port_env(Port) ->
81
    case Port > 0 of
×
82
        true ->
83
            %% Set both "min" and "max" variables, to force the port number to
84
            %% this one.
85
            ok = application:set_env(kernel, inet_dist_listen_min, Port),
×
UNCOV
86
            ok = application:set_env(kernel, inet_dist_listen_max, Port);
×
87
        false ->
88
            ok = application:set_env(kernel, inet_dist_listen_min, ?MIN_RAND_PORT),
×
UNCOV
89
            ok = application:set_env(kernel, inet_dist_listen_max, ?MAX_PORT_LIMIT)
×
90
    end.
91

92
with_module(Fun) ->
93
    try
×
94
        Proto = resolve_proto(),
×
UNCOV
95
        Module = list_to_atom(Proto ++ "_dist"),
×
UNCOV
96
        Fun(Module)
×
97
    catch
98
        C:E->
99
            %% this exception is caught by net_kernel
UNCOV
100
            error({failed_to_call_ekka_dist_module, C, E})
×
101
    end.
102

103
resolve_proto() ->
UNCOV
104
    Fallback = atom_to_list(application:get_env(ekka, proto_dist, inet_tcp)),
×
105
    %% the -proto_dist boot arg is 'ekka'
106
    %% and there is a lack of a 'ekka_tls' module.
107
    %% Also when starting remote console etc, there is no application env to
108
    %% read from, so we have to find another way to pass the module name (prefix)
109
    Mod =
×
110
        case os:getenv("EKKA_PROTO_DIST_MOD") of
UNCOV
111
            false -> Fallback;
×
112
            "" -> Fallback;
×
113
            M -> M
×
114
        end,
115
    case Mod of
×
116
        "inet_tcp" -> ok;
×
117
        "inet_tls" -> ok;
×
UNCOV
118
        "inet6_tcp" -> ok;
×
119
        Other -> error({unsupported_proto_dist, Other})
×
120
    end,
UNCOV
121
    Mod.
×
122

123
%% @doc Figure out dist port from node's name.
124
-spec(port(node() | string()) -> inet:port_number()).
125
port(Name) when is_atom(Name) ->
126
    port(atom_to_list(Name));
7✔
127
port("remsh" ++ _) ->
128
    %% outgoing port for remsh,
129
    %% it should never accept incoming connections
130
    %% i.e. no one else should need to know the actual port
UNCOV
131
    0;
×
132
port(Name) when is_list(Name) ->
133
    %% Figure out the base port.  If not specified using the
134
    %% inet_dist_base_port kernel environment variable, default to
135
    %% 4370, one above the epmd port.
136
    BasePort = application:get_env(kernel, inet_dist_base_port, ?DEFAULT_PORT),
7✔
137

138
    %% Now, figure out our "offset" on top of the base port.  The
139
    %% offset is the integer just to the left of the @ sign in our node
140
    %% name.  If there is no such number, the offset is 0.
141
    %%
142
    %% Also handle the case when no hostname was specified.
143
    BasePort + offset(Name).
7✔
144

145
%% @doc Figure out the offset by node's name
146
offset(NodeName) ->
147
    ShortName = re:replace(NodeName, "@.*$", ""),
7✔
148
    case re:run(ShortName, "[0-9]+$", [{capture, first, list}]) of
7✔
149
        nomatch ->
150
            0;
1✔
151
        {match, [OffsetAsString]} ->
152
            (list_to_integer(OffsetAsString) rem ?MAX_PORT_LIMIT)
6✔
153
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc