• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 850

18 Dec 2024 02:02PM UTC coverage: 62.397% (-0.5%) from 62.861%
850

push

github

web-flow
Merge pull request #240 from keynslug/fix/EEC-112/autoheal-asymm

fix(autoheal): attempt healing complex asymmetric partitions

2 of 49 new or added lines in 2 files covered. (4.08%)

94 existing lines in 8 files now uncovered.

682 of 1093 relevant lines covered (62.4%)

49.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/src/ekka_cluster_etcd.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(ekka_cluster_etcd).
18

19
-behaviour(ekka_cluster_strategy).
20

21
-export([ discover/1
22
        , lock/1
23
        , unlock/1
24
        , register/1
25
        , unregister/1
26
        ]).
27

28
%% TTL callback
29
-export([etcd_set_node_key/1]).
30

31
-define(LOG(Level, Format, Args), logger:Level("Ekka(etcd): " ++ Format, Args)).
32

33
%%--------------------------------------------------------------------
34
%% ekka_cluster_strategy callbacks
35
%%--------------------------------------------------------------------
36

37
discover(Options) ->
38
    case etcd_get_nodes_key(Options) of
3✔
39
        {ok, Response} ->
40
            {ok, extract_nodes(Response)};
3✔
41
        {error, {404, _}} ->
UNCOV
42
            case ensure_nodes_path(Options) of
×
UNCOV
43
                {ok, _} -> discover(Options);
×
UNCOV
44
                Error   -> Error
×
45
            end;
46
        {error, Reason} ->
UNCOV
47
            {error, Reason}
×
48
    end.
49

50
lock(Options) ->
51
    lock(Options, 10).
3✔
52

53
lock(_Options, 0) ->
UNCOV
54
    {error, failed};
×
55

56
lock(Options, Retries) ->
57
    case etcd_set_lock_key(Options) of
3✔
58
        {ok, _Response} -> ok;
3✔
59
        {error, {412, _}} ->
UNCOV
60
            timer:sleep(1000),
×
UNCOV
61
            lock(Options, Retries -1);
×
62
        {error, Reason} ->
UNCOV
63
            {error, Reason}
×
64
    end.
65

66
unlock(Options) ->
67
    case etcd_del_lock_key(Options) of
3✔
68
        {ok, _Response} -> ok;
3✔
69
        {error, Reason} ->
UNCOV
70
            {error, Reason}
×
71
    end.
72

73
register(Options) ->
74
    case etcd_set_node_key(Options) of
3✔
75
        {ok, _Response} ->
76
            ensure_node_ttl(Options);
3✔
77
        {error, Reason} ->
UNCOV
78
            {error, Reason}
×
79
    end.
80

81
unregister(Options) ->
82
    ok = ekka_cluster_sup:stop_child(ekka_node_ttl),
1✔
83
    case etcd_del_node_key(Options) of
1✔
84
        {ok, _Response} -> ok;
1✔
85
        {error, Reason} ->
UNCOV
86
            {error, Reason}
×
87
    end.
88

89
%%--------------------------------------------------------------------
90
%% Internal functions
91
%%--------------------------------------------------------------------
92

93
extract_nodes([]) ->
UNCOV
94
    [];
×
95
extract_nodes(Response) ->
96
    [extract_node(V) || V <- maps:get(<<"nodes">>, maps:get(<<"node">>, Response), [])].
3✔
97

98
ensure_node_ttl(Options) ->
99
    Ttl = proplists:get_value(node_ttl, Options),
3✔
100
    MFA = {?MODULE, etcd_set_node_key, [Options]},
3✔
101
    case ekka_cluster_sup:start_child(ekka_node_ttl, [Ttl, MFA]) of
3✔
102
        {ok, _Pid} -> ok;
3✔
UNCOV
103
        {error, {already_started, _Pid}} -> ok;
×
UNCOV
104
        Err = {error, _} -> Err
×
105
    end.
106

107
extract_node(V) ->
108
    list_to_atom(binary_to_list(lists:last(binary:split(maps:get(<<"key">>, V), <<"/">>, [global])))).
7✔
109

110
ensure_nodes_path(Options) ->
111
    etcd_set(server(Options), nodes_path(Options), [{dir, true}], ssl_options(Options)).
×
112

113
etcd_get_nodes_key(Options) ->
114
    etcd_get(server(Options), nodes_path(Options), [{recursive, true}], ssl_options(Options)).
3✔
115

116
etcd_set_node_key(Options) ->
117
    Ttl = config(node_ttl, Options) div 1000,
423✔
118
    etcd_set(server(Options), node_path(Options), [{ttl, Ttl}], ssl_options(Options)).
423✔
119

120
etcd_del_node_key(Options) ->
121
    etcd_del(server(Options), node_path(Options), [], ssl_options(Options)).
1✔
122

123
etcd_set_lock_key(Options) ->
124
    Values = [{ttl, 30}, {'prevExist', false}, {value, node()}],
3✔
125
    etcd_set(server(Options), lock_path(Options), Values, ssl_options(Options)).
3✔
126

127
etcd_del_lock_key(Options) ->
128
    Values = [{'prevExist', true}, {'prevValue', node()}],
3✔
129
    etcd_del(server(Options), lock_path(Options), Values, ssl_options(Options)).
3✔
130

131
server(Options) ->
132
    config(server, Options).
433✔
133

134
ssl_options(Options) ->
135
    case proplists:get_value(ssl_options, Options, []) of
433✔
136
        [] -> [];
433✔
UNCOV
137
        SSLOptions -> [{ssl, SSLOptions}]
×
138
    end.
139

140
config(Key, Options) ->
141
    proplists:get_value(Key, Options).
1,289✔
142

143
etcd_get(Servers, Key, Params, HttpOpts) ->
144
    ekka_httpc:get(rand_addr(Servers), Key, Params, HttpOpts).
3✔
145

146
etcd_set(Servers, Key, Params, HttpOpts) ->
147
    ekka_httpc:put(rand_addr(Servers), Key, Params, HttpOpts).
426✔
148

149
etcd_del(Servers, Key, Params, HttpOpts) ->
150
    ekka_httpc:delete(rand_addr(Servers), Key, Params, HttpOpts).
4✔
151

152
nodes_path(Options) ->
153
    with_prefix(config(prefix, Options), "/nodes").
3✔
154

155
node_path(Options) ->
156
    with_prefix(config(prefix, Options), "/nodes/" ++ atom_to_list(node())).
424✔
157

158
lock_path(Options) ->
159
    with_prefix(config(prefix, Options), "/lock").
6✔
160

161
with_prefix(Prefix, Path) ->
162
    Cluster = atom_to_list(ekka:env(cluster_name, ekka)),
433✔
163
    lists:concat(["v2/keys/", Prefix, "/", Cluster, Path]).
433✔
164

165
rand_addr([Addr]) ->
166
    Addr;
427✔
167
rand_addr(AddrList) ->
168
    lists:nth(rand:uniform(length(AddrList)), AddrList).
6✔
169

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc