• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / ekka / 739

pending completion
739

push

github

web-flow
Merge pull request #209 from SergeTupchiy/EMQX-10631-fix-etcd-discovery

fix etcd lock issues

14 of 14 new or added lines in 2 files covered. (100.0%)

466 of 734 relevant lines covered (63.49%)

294.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.69
/src/ekka_cluster_etcd.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(ekka_cluster_etcd).
17

18
-behaviour(ekka_cluster_strategy).
19

20
-behaviour(gen_server).
21

22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
-export([ discover/1
25
        , lock/1
26
        , unlock/1
27
        , register/1
28
        , unregister/1
29
        ]).
30

31
-export([start_link/1]).
32

33
-export([ init/1
34
        , handle_call/3
35
        , handle_cast/2
36
        , handle_info/2
37
        , terminate/2
38
        , code_change/3
39
        ]).
40

41
-define(SERVER, ?MODULE).
42

43
-record(state, {
44
    prefix,
45
    lease_id
46
}).
47

48
%% TTL callback
49
-export([etcd_set_node_key/1]).
50

51
%% for erlang apply
52
-export([ v2_discover/1
53
        , v2_lock/1
54
        , v2_unlock/1
55
        , v2_register/1
56
        , v2_unregister/1
57
        ]).
58

59
-export([ v3_discover/1
60
        , v3_lock/1
61
        , v3_unlock/1
62
        , v3_register/1
63
        , v3_unregister/1
64
        ]).
65

66
-define(LOG(Level, Format, Args), logger:Level("Ekka(etcd): " ++ Format, Args)).
67

68
start_link(Options) ->
69
    gen_server:start_link({local, ?SERVER}, ?MODULE, Options, []).
×
70

71
%%--------------------------------------------------------------------
72
%% ekka_cluster_strategy callbacks
73
%%--------------------------------------------------------------------
74

75
discover(Options) ->
76
    etcd_apply(?FUNCTION_NAME, Options).
23✔
77

78
lock(Options) ->
79
    etcd_apply(?FUNCTION_NAME, Options).
12✔
80

81
unlock(Options) ->
82
    etcd_apply(?FUNCTION_NAME, Options).
10✔
83

84
register(Options) ->
85
    etcd_apply(?FUNCTION_NAME, Options).
10✔
86

87
unregister(Options) ->
88
    etcd_apply(?FUNCTION_NAME, Options).
1✔
89

90
%%--------------------------------------------------------------------
91
%% Internal functions
92
%%--------------------------------------------------------------------
93
etcd_apply(Action, Options) ->
94
    case proplists:get_value(version, Options, v3) of
56✔
95
        v3 -> etcd_v3(Action);
×
96
        v2 -> etcd_v2(Action, Options)
56✔
97
    end.
98
%%--------------------------------------------------------------------
99
%% v2
100
%%--------------------------------------------------------------------
101
etcd_v2(Action, Options) ->
102
    Function = list_to_atom("v2_" ++ atom_to_list(Action)),
56✔
103
    erlang:apply(?MODULE, Function, [Options]).
56✔
104

105
v2_discover(Options) ->
106
    case etcd_get_nodes_key(Options) of
23✔
107
        {ok, Response} ->
108
            {ok, extract_nodes(Response)};
23✔
109
        {error, {404, _}} ->
110
            case ensure_nodes_path(Options) of
×
111
                {ok, _} -> discover(Options);
×
112
                Error -> Error
×
113
            end;
114
        {error, Reason} ->
115
            {error, Reason}
×
116
    end.
117

118
v2_lock(Options) ->
119
    v2_lock(Options, 10).
12✔
120
v2_lock(_Options, 0) ->
121
    {error, failed};
×
122
v2_lock(Options, Retries) ->
123
    case etcd_set_lock_key(Options) of
12✔
124
        {ok, _Response} -> ok;
12✔
125
        {error, {412, _}} ->
126
            timer:sleep(1000),
×
127
            v2_lock(Options, Retries -1);
×
128
        {error, Reason} ->
129
            {error, Reason}
×
130
    end.
131

132
v2_unlock(Options) ->
133
    case etcd_del_lock_key(Options) of
10✔
134
        {ok, _Response} -> ok;
10✔
135
        {error, Reason} ->
136
            {error, Reason}
×
137
    end.
138

139
v2_register(Options) ->
140
    ?tp(ekka_cluster_etcd_v2_register, #{}),
10✔
141
    case etcd_set_node_key(Options) of
10✔
142
        {ok, _Response} ->
143
            ensure_node_ttl(Options);
10✔
144
        {error, Reason} ->
145
            {error, Reason}
×
146
    end.
147

148
v2_unregister(Options) ->
149
    ok = ekka_cluster_sup:stop_child(ekka_node_ttl),
1✔
150
    case etcd_del_node_key(Options) of
1✔
151
        {ok, _Response} -> ok;
1✔
152
        {error, Reason} ->
153
            {error, Reason}
×
154
    end.
155

156
extract_nodes([]) ->
157
    [];
×
158
extract_nodes(Response) ->
159
    [extract_node(V) || V <- maps:get(<<"nodes">>, maps:get(<<"node">>, Response), [])].
23✔
160

161
ensure_node_ttl(Options) ->
162
    Ttl = proplists:get_value(node_ttl, Options),
10✔
163
    MFA = {?MODULE, etcd_set_node_key, [Options]},
10✔
164
    case ekka_cluster_sup:start_child(ekka_node_ttl, [Ttl, MFA]) of
10✔
165
        {ok, _Pid} -> ok;
6✔
166
        {error, {already_started, _Pid}} -> ok;
4✔
167
        Err = {error, _} -> Err
×
168
    end.
169

170
extract_node(V) ->
171
    list_to_atom(binary_to_list(lists:last(binary:split(maps:get(<<"key">>, V), <<"/">>, [global])))).
63✔
172

173
ensure_nodes_path(Options) ->
174
    etcd_set(server(Options), nodes_path(Options), [{dir, true}], ssl_options(Options)).
×
175

176
etcd_get_nodes_key(Options) ->
177
    etcd_get(server(Options), nodes_path(Options), [{recursive, true}], ssl_options(Options)).
23✔
178

179
etcd_set_node_key(Options) ->
180
    Ttl = config(node_ttl, Options),
829✔
181
    etcd_set(server(Options), node_path(Options), [{ttl, Ttl}], ssl_options(Options)).
829✔
182

183
etcd_del_node_key(Options) ->
184
    etcd_del(server(Options), node_path(Options), [], ssl_options(Options)).
1✔
185

186
etcd_set_lock_key(Options) ->
187
    Values = [{ttl, 30}, {'prevExist', false}, {value, node()}],
12✔
188
    etcd_set(server(Options), lock_path(Options), Values, ssl_options(Options)).
12✔
189

190
etcd_del_lock_key(Options) ->
191
    Values = [{'prevExist', true}, {'prevValue', node()}],
10✔
192
    etcd_del(server(Options), lock_path(Options), Values, ssl_options(Options)).
10✔
193

194
server(Options) ->
195
    config(server, Options).
875✔
196

197
ssl_options(Options) ->
198
    case proplists:get_value(ssl_options, Options, []) of
875✔
199
        [] -> [];
875✔
200
        SSLOptions ->
201
            case proplists:get_value(enable, SSLOptions, true) of
×
202
                true -> [{ssl, proplists:delete(enable, SSLOptions)}];
×
203
                false -> []
×
204
            end
205
    end.
206

207
config(Key, Options) ->
208
    proplists:get_value(Key, Options).
2,579✔
209

210
etcd_get(Servers, Key, Params, HttpOpts) ->
211
    ekka_httpc:get(scheme(rand_addr(Servers)), Key, Params, HttpOpts).
23✔
212

213
etcd_set(Servers, Key, Params, HttpOpts) ->
214
    ekka_httpc:put(scheme(rand_addr(Servers)), Key, Params, HttpOpts).
841✔
215

216
etcd_del(Servers, Key, Params, HttpOpts) ->
217
    ekka_httpc:delete(scheme(rand_addr(Servers)), Key, Params, HttpOpts).
11✔
218

219
nodes_path(Options) ->
220
    with_prefix(config(prefix, Options), "/nodes").
23✔
221

222
node_path(Options) ->
223
    with_prefix(config(prefix, Options), "/nodes/" ++ atom_to_list(node())).
830✔
224

225
lock_path(Options) ->
226
    with_prefix(config(prefix, Options), "/lock").
22✔
227

228
with_prefix(Prefix, Path) ->
229
    Cluster = atom_to_list(ekka:env(cluster_name, ekka)),
875✔
230
    lists:concat(["v2/keys/", Prefix, "/", Cluster, Path]).
875✔
231

232
rand_addr([Addr]) ->
233
    Addr;
875✔
234
rand_addr(AddrList) ->
235
    lists:nth(rand:uniform(length(AddrList)), AddrList).
×
236

237
%%--------------------------------------------------------------------
238
%% v3
239
%%--------------------------------------------------------------------
240
etcd_v3(Action) ->
241
    Timeout = case Action of
×
242
                  %% etcd would keep a dangling lock if we don't wait for it
243
                  lock -> infinity;
×
244
                  %% sligthly higher than the default eetcd timeout
245
                  _ -> 10000
×
246
              end,
247
    gen_server:call(?SERVER, Action, Timeout).
×
248

249
v3_discover(#state{prefix = Prefix}) ->
250
    Context = v3_nodes_context(Prefix),
×
251
    case eetcd_kv:get(Context) of
×
252
        {ok, Response} ->
253
            case maps:get(kvs, Response) of
×
254
                [] ->
255
                    {ok, []};
×
256
                KvsList ->
257
                    Nodes = [
×
258
                        binary_to_atom(maps:get(value, Kvs), utf8) || Kvs <- KvsList],
×
259
                    {ok, Nodes}
×
260
            end;
261
        Error ->
262
            Error
×
263
    end.
264

265
v3_lock(#state{prefix = Prefix, lease_id = ID}) ->
266
    Context = eetcd:with_timeout(eetcd:new(?MODULE), infinity),
×
267
    Name = list_to_binary(v3_lock_key(Prefix)),
×
268
    Context1 = eetcd_lock:with_lease(eetcd_lock:with_name(Context, Name), ID),
×
269
    case eetcd_lock:lock(Context1) of
×
270
        {ok, #{key := LockKey}} ->
271
            persistent_term:put(ekka_cluster_etcd_lock_key, LockKey),
×
272
            ok;
×
273
        Error ->
274
            Error
×
275
    end.
276

277
v3_unlock(_) ->
278
    case persistent_term:get(ekka_cluster_etcd_lock_key, undefined) of
×
279
        undefined ->
280
            {error, lock_lose};
×
281
        LockKey ->
282
            case eetcd_lock:unlock(?MODULE, LockKey) of
×
283
                {ok, _} ->
284
                    persistent_term:erase(ekka_cluster_etcd_lock_key),
×
285
                    ok;
×
286
                Error ->
287
                    Error
×
288
            end
289
    end.
290

291
v3_register(#state{prefix = Prefix ,lease_id = ID}) ->
292
    ?tp(ekka_cluster_etcd_v3_register, #{prefix => Prefix, lease_id => ID}),
×
293
    Context = v3_node_context(Prefix, ID),
×
294
    case eetcd_kv:put(Context) of
×
295
        {ok, _Response} ->
296
            ok;
×
297
        Error ->
298
            Error
×
299
    end.
300

301
v3_unregister(#state{prefix = Prefix}) ->
302
    Context = v3_node_context_only_key(Prefix),
×
303
    case eetcd_kv:delete(Context) of
×
304
        {ok, _} ->
305
            ok;
×
306
        Error ->
307
            Error
×
308
    end.
309

310
v3_nodes_context(Prefix) ->
311
    Ctx = eetcd_kv:new(?MODULE),
×
312
    Ctx1 = eetcd_kv:with_key(Ctx, v3_nodes_key(Prefix)),
×
313
    Ctx2 = eetcd_kv:with_range_end(Ctx1, "\0"),
×
314
    eetcd_kv:with_sort(Ctx2, 'KEY', 'ASCEND').
×
315

316
v3_node_context(Prefix, ID) ->
317
    Ctx = eetcd_kv:new(?MODULE),
×
318
    Ctx1 = eetcd_kv:with_key(Ctx, v3_node_key(Prefix)),
×
319
    Ctx2 = eetcd_kv:with_value(Ctx1, atom_to_binary(node(), utf8)),
×
320
    eetcd_kv:with_lease(Ctx2, ID).
×
321

322
v3_node_context_only_key(Prefix) ->
323
    Ctx = eetcd_kv:new(?MODULE),
×
324
    eetcd_kv:with_key(Ctx, v3_node_key(Prefix)).
×
325

326
v3_lock_key(Prefix) ->
327
    Prefix ++ "/ekkacl/lock/".
×
328

329
v3_nodes_key(Prefix) ->
330
    Prefix ++ "/ekkacl/nodes/".
×
331

332
v3_node_key(Prefix) ->
333
    v3_node_key(Prefix, atom_to_list(node())).
×
334

335
v3_node_key(Prefix, Node) ->
336
    v3_nodes_key(Prefix) ++ Node.
×
337

338
%%--------------------------------------------------------------------
339
%% gen_server callback
340
%%--------------------------------------------------------------------
341
init(Options) ->
342
    process_flag(trap_exit, true),
×
343
    Servers = proplists:get_value(server, Options, []),
×
344
    Prefix = proplists:get_value(prefix, Options),
×
345
    Hosts = [remove_scheme(Server) || Server <- Servers],
×
346
    {Transport, TransportOpts} = case ssl_options(Options) of
×
347
        [] -> {tcp, []};
×
348
        [SSL] -> SSL
×
349
    end,
350
    %% At the time of writing, the etcd connection process does not
351
    %% close when this process dies.  So, when this processes is
352
    %% restarted by its supervisor, the `eetcd:open' call fails with
353
    %% `{error,[{{"localhost",2379},already_started}]}'.  This ensures
354
    %% that no connection with this name exists before opening it
355
    %% (again).
356
    eetcd:close(?MODULE),
×
357
    {ok, _Pid} = eetcd:open(?MODULE, Hosts, Transport, TransportOpts),
×
358
    {ok, #{'ID' := ID}} = eetcd_lease:grant(?MODULE, 5),
×
359
    {ok, Pid2} = eetcd_lease:keep_alive(?MODULE, ID),
×
360
    true = link(Pid2),
×
361
    {ok, #state{prefix = Prefix, lease_id = ID}}.
×
362

363
handle_call(Action, _From, State) when is_atom(Action) ->
364
    Function = list_to_atom("v3_" ++ atom_to_list(Action)),
×
365
    Reply = erlang:apply(?MODULE, Function, [State]),
×
366
    {reply, Reply, State};
×
367

368
handle_call(_Request, _From, State = #state{}) ->
369
    {reply, ok, State}.
×
370

371
handle_cast(_Request, State = #state{}) ->
372
    {noreply, State}.
×
373

374
handle_info({'EXIT', _From, Reason}, State) ->
375
    {stop, Reason, State};
×
376

377
handle_info(_Info, State = #state{}) ->
378
    {noreply, State}.
×
379

380
terminate(_Reason, _State = #state{lease_id = ID}) ->
381
    eetcd_lease:revoke(?MODULE, ID),
×
382
    eetcd:close(?MODULE).
×
383

384
code_change(_OldVsn, State = #state{}, _Extra) ->
385
    {ok, State}.
×
386

387
remove_scheme("http://" ++ Url) ->
388
    Url;
×
389
remove_scheme("https://" ++ Url) ->
390
    Url;
×
391
remove_scheme(Url) ->
392
    Url.
×
393

394
scheme("http://" ++ _ = Url) ->
395
    Url;
875✔
396
scheme("https://" ++ _ = Url) ->
397
    Url;
×
398
scheme(Url) ->
399
    "http://" ++ Url.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc