• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 773

29 Nov 2024 08:57PM UTC coverage: 71.844% (-2.0%) from 73.807%
773

push

github

web-flow
Merge pull request #68 from thalesmg/fix-lookup-redirect-mkII

fix(client): handle `Redirect` in `LookupTopicResponse`

189 of 261 new or added lines in 8 files covered. (72.41%)

29 existing lines in 3 files now uncovered.

939 of 1307 relevant lines covered (71.84%)

313.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.69
/src/pulsar_client_manager.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_client_manager).
16

17
-feature(maybe_expr, enable).
18

19
-behaviour(gen_server).
20

21
-export([start_link/3]).
22

23
%% gen_server Callbacks
24
-export([ init/1
25
        , handle_call/3
26
        , handle_cast/2
27
        , handle_info/2
28
        , terminate/2
29
        ]).
30

31
-export([ get_topic_metadata/3
32
        , lookup_topic/3
33
        , lookup_topic_async/2
34
        ]).
35

36
-export([ get_status/2
37
        , get_alive_pulsar_url/2
38
        ]).
39

40
%% For tests/introspection.
41
-export([get_workers/2]).
42

43
-export_type([topic/0]).
44

45
%%--------------------------------------------------------------------
46
%% Type definitions
47
%%--------------------------------------------------------------------
48

49
-type client_id() :: atom().
50
-type url() :: string().
51
-type topic() :: string().
52
-type partition_topic() :: string().
53

54
-define(client_id, client_id).
55
-define(initial_opts, initial_opts).
56
-define(seed_urls, seed_urls).
57
-define(workers, workers).
58
-define(no_servers_available, no_servers_available).
59

60
-type state() :: #{
61
    ?client_id := client_id(),
62
    ?initial_opts := map(),
63
    ?seed_urls := [url()],
64
    ?workers := #{url() => pid(), pid() => url()}
65
}.
66

67
%% calls/casts/infos
68
-record(get_topic_metadata, {topic :: topic()}).
69
-record(lookup_topic, {deadline :: infinity | integer(), partition_topic :: partition_topic()}).
70
-record(lookup_topic_async, {from, partition_topic :: partition_topic()}).
71
-record(get_status, {}).
72
-record(get_alive_pulsar_url, {}).
73
-record(get_workers, {}).
74

75
%%--------------------------------------------------------------------
76
%% API
77
%%--------------------------------------------------------------------
78

79
start_link(ClientId, Servers, Opts) ->
80
    Args = #{
24✔
81
      client_id => ClientId,
82
      seed_urls => Servers,
83
      initial_opts => Opts
84
    },
85
    gen_server:start_link({local, ClientId}, ?MODULE, Args, []).
24✔
86

87
get_topic_metadata(ClientId, Topic, Timeout) ->
88
    gen_server:call(ClientId, #get_topic_metadata{topic = Topic}, Timeout).
34✔
89

90
lookup_topic(ClientId, PartitionTopic, Timeout) ->
91
    %% We use a deadline here because `pulsar_producers' and `pulsar_consumers' call this
92
    %% with a timeout, and we want to avoid polluting their mailboxes and logs with stale
93
    %% responses.
94
    Deadline = deadline(Timeout),
43✔
95
    gen_server:call(
43✔
96
      ClientId,
97
      #lookup_topic{deadline = Deadline, partition_topic = PartitionTopic},
98
      Timeout).
99

100
-spec lookup_topic_async(gen_server:server_ref(), string()) ->
101
          {ok, reference()} | {error, ?no_servers_available}.
102
lookup_topic_async(ClientId, PartitionTopic) ->
103
    Ref = monitor(process, ClientId, [{alias, reply_demonitor}]),
33✔
104
    From = {self(), Ref},
33✔
105
    gen_server:cast(ClientId, #lookup_topic_async{from = From, partition_topic = PartitionTopic}),
33✔
106
    {ok, Ref}.
33✔
107

108
get_status(ClientId, Timeout) ->
109
    try
1✔
110
        gen_server:call(ClientId, #get_status{}, Timeout)
1✔
111
    catch
112
        exit:{noproc, _} ->
NEW
113
            false;
×
114
        exit:{timeout, _} ->
NEW
115
            false
×
116
    end.
117

118
get_alive_pulsar_url(ClientId, Timeout) ->
119
    gen_server:call(ClientId, #get_alive_pulsar_url{}, Timeout).
68✔
120

121
get_workers(ClientId, Timeout) ->
122
    gen_server:call(ClientId, #get_workers{}, Timeout).
2✔
123

124
%%--------------------------------------------------------------------
125
%% `gen_server' API
126
%%--------------------------------------------------------------------
127

128
-spec init(map()) -> {ok, state()}.
129
init(Args) ->
130
    #{
24✔
131
      client_id := ClientId,
132
      seed_urls := SeedURLs,
133
      initial_opts := Opts
134
    } = Args,
135
    process_flag(trap_exit, true),
24✔
136
    State0 = #{
24✔
137
        ?client_id => ClientId,
138
        ?initial_opts => Opts,
139
        ?seed_urls => SeedURLs,
140
        ?workers => #{}
141
    },
142
    case spawn_any_and_wait_connected(State0) of
24✔
143
        {{ok, _}, State} ->
144
            {ok, State};
21✔
145
        {{error, _} = Error, _State} ->
146
            {stop, Error}
3✔
147
    end.
148

149
handle_call(#get_workers{}, _From, State) ->
150
    #{?workers := Workers} = State,
2✔
151
    {reply, Workers, State};
2✔
152
handle_call(#get_status{}, _From, State0) ->
153
    {Reply, State} = handle_get_status(State0),
1✔
154
    {reply, Reply, State};
1✔
155
handle_call(#get_alive_pulsar_url{}, _From, State0) ->
156
    {Reply, State} = handle_get_alive_pulsar_url(State0),
68✔
157
    {reply, Reply, State};
68✔
158
handle_call(#get_topic_metadata{topic = Topic}, _From, State0) ->
159
    {Reply, State} = handle_get_topic_metadata(State0, Topic),
34✔
160
    {reply, Reply, State};
34✔
161
handle_call(#lookup_topic{deadline = Deadline, partition_topic = PartitionTopic}, From, State0) ->
162
    {Reply, State} = handle_lookup_topic_async(State0, PartitionTopic),
43✔
163
    maybe
43✔
164
        true ?= is_within_deadline(Deadline),
43✔
165
        gen_server:reply(From, Reply)
42✔
166
    end,
167
    {noreply, State};
43✔
168
handle_call(_Req, _From, State) ->
NEW
169
    {reply, {error, unknown_call}, State, hibernate}.
×
170

171
handle_cast(#lookup_topic_async{from = From, partition_topic = PartitionTopic}, State0) ->
172
    {Reply, State} = handle_lookup_topic_async(State0, PartitionTopic),
33✔
173
    gen_server:reply(From, Reply),
33✔
174
    {noreply, State};
33✔
175
handle_cast(_Req, State) ->
NEW
176
    {noreply, State, hibernate}.
×
177

178
handle_info({'EXIT', Pid, Reason}, State0) ->
179
    State = handle_worker_down(State0, Pid, Reason),
9✔
180
    {noreply, State};
9✔
181
handle_info(_Info, State) ->
NEW
182
    log_error("received unknown message: ~p", [_Info]),
×
NEW
183
    {noreply, State, hibernate}.
×
184

185
terminate(_Reason, #{?workers := Workers}) ->
186
    maps:foreach(
20✔
187
      fun(_URL, WorkerPid) ->
188
         exit(WorkerPid, shutdown)
21✔
189
      end,
190
      Workers),
191
    ok.
20✔
192

193
%%--------------------------------------------------------------------
194
%% Internal fns
195
%%--------------------------------------------------------------------
196

197
handle_get_status(State0) ->
198
    case alive_workers(State0) of
1✔
199
        [] ->
NEW
200
            case spawn_any_and_wait_connected(State0) of
×
201
                {{ok, _}, State} ->
NEW
202
                    {true, State};
×
203
                {{error, _}, State} ->
NEW
204
                    {false, State}
×
205
            end;
206
        [_ | _] ->
207
            %% Clients shut themselves down if no pong received after a timeout.
208
            {true, State0}
1✔
209
    end.
210

211
handle_get_alive_pulsar_url(State0) ->
212
    WorkerPids = alive_workers(State0),
68✔
213
    Fun = fun pulsar_client:get_alive_pulsar_url/1,
68✔
214
    Args = [],
68✔
215
    case get_first_successful_call(WorkerPids, Fun, Args) of
68✔
216
        {ok, URI} ->
217
            {{ok, URI}, State0};
68✔
218
        {error, ?no_servers_available} ->
NEW
219
            maybe
×
NEW
220
                {{ok, Pid}, State} ?= spawn_any_and_wait_connected(State0),
×
NEW
221
                Res = get_first_successful_call([Pid], Fun, Args),
×
NEW
222
                {Res, State}
×
223
            end;
224
        {error, _} = Res ->
NEW
225
            {Res, State0}
×
226
    end.
227

228
handle_get_topic_metadata(State0, Topic) ->
229
    WorkerPids = alive_workers(State0),
34✔
230
    Fun = fun pulsar_client:get_topic_metadata/2,
34✔
231
    Args = [Topic],
34✔
232
    case get_first_successful_call(WorkerPids, Fun, Args) of
34✔
233
        {ok, _} = Res ->
234
            {Res, State0};
33✔
235
        {error, ?no_servers_available} ->
236
            maybe
1✔
237
                {{ok, Pid}, State} ?= spawn_any_and_wait_connected(State0),
1✔
238
                Res = get_first_successful_call([Pid], Fun, Args),
1✔
239
                {Res, State}
1✔
240
            end;
241
        {error, _} = Res ->
NEW
242
            {Res, State0}
×
243
    end.
244

245
handle_lookup_topic_async(State0, PartitionTopic) ->
246
    WorkerPids = alive_workers(State0),
76✔
247
    Fun = fun pulsar_client:lookup_topic/2,
76✔
248
    Args = [PartitionTopic],
76✔
249
    case get_first_successful_call(WorkerPids, Fun, Args) of
76✔
250
        {redirect, ServiceURL, Opts} ->
251
            handle_redirect_lookup(State0, ServiceURL, Opts, PartitionTopic);
1✔
252
        {ok, _} = Res ->
253
            {Res, State0};
65✔
254
        {error, ?no_servers_available} ->
255
            handle_lookup_topic_async_fresh_worker(State0, PartitionTopic);
10✔
256
        {error, _} = Res ->
NEW
257
            {Res, State0}
×
258
    end.
259

260
handle_lookup_topic_async_fresh_worker(State0, PartitionTopic) ->
261
    maybe
10✔
262
        {{ok, Pid}, State} ?= spawn_any_and_wait_connected(State0),
10✔
263
        Fun = fun pulsar_client:lookup_topic/2,
8✔
264
        Args = [PartitionTopic],
8✔
265
        case get_first_successful_call([Pid], Fun, Args) of
8✔
266
            {redirect, ServiceURL, Opts} ->
NEW
267
                handle_redirect_lookup(State0, ServiceURL, Opts, PartitionTopic);
×
268
            Res ->
269
                {Res, State}
8✔
270
        end
271
    else
272
        {Error, State1} ->
273
            {Error, State1}
2✔
274
    end.
275

276
handle_redirect_lookup(State0, ServiceURL, Opts, PartitionTopic) ->
277
    case find_alive_worker(State0, ServiceURL) of
2✔
278
        {ok, Pid} ->
279
            try pulsar_client:lookup_topic(Pid, PartitionTopic, Opts) of
1✔
280
                {redirect, ServiceURL, _Opts} ->
281
                    %% Should not respond with this, since we've just used this server.
282
                    %% Replying error to avoid loop
NEW
283
                    {{error, pulsar_unstable}, State0};
×
284
                {redirect, OtherServiceURL, OtherOpts} ->
NEW
285
                    handle_redirect_lookup(State0, OtherServiceURL, OtherOpts, PartitionTopic);
×
286
                Res ->
287
                    {Res, State0}
1✔
288
            catch
289
                exit:{timeout, _} ->
NEW
290
                    {{error, timeout}, State0};
×
291
                exit:{noproc, _} ->
292
                    %% race; retry
NEW
293
                    timer:sleep(10),
×
NEW
294
                    handle_redirect_lookup(State0, ServiceURL, Opts, PartitionTopic)
×
295
            end;
296
        error ->
297
            maybe
1✔
298
                {{ok, _Pid}, State} ?= spawn_any_and_wait_connected(State0, [ServiceURL]),
1✔
299
                handle_redirect_lookup(State, ServiceURL, Opts, PartitionTopic)
1✔
300
            end
301
    end.
302

303
handle_worker_down(State0, Pid, Reason) ->
304
    #{?workers := Workers0} = State0,
9✔
305
    maybe
9✔
306
        {ok, URL} ?= find_url_by_worker_pid(Workers0, Pid),
9✔
307
        log_info("worker for ~0s down: ~0p", [URL, Reason]),
9✔
308
        {_Pid, Workers} = maps:take(URL, Workers0),
9✔
309
        State0#{?workers := Workers}
9✔
310
    else
NEW
311
        error -> State0
×
312
    end.
313

314
find_url_by_worker_pid(Workers, Pid) ->
315
    MURL = [URL || {URL, Pid0} <- maps:to_list(Workers), Pid0 =:= Pid],
9✔
316
    case MURL of
9✔
317
        [] ->
NEW
318
            error;
×
319
        [URL] ->
320
            {ok, URL}
9✔
321
    end.
322

323
find_alive_worker(State, URL) ->
324
    #{?workers := Workers} = State,
2✔
325
    URLKey = bin(URL),
2✔
326
    maybe
2✔
327
        #{URLKey := Pid} ?= Workers,
2✔
328
        true ?= is_process_alive(Pid),
1✔
329
        {ok, Pid}
1✔
330
    else
331
        _ -> error
1✔
332
    end.
333

334
alive_workers(State) ->
335
    #{?workers := Workers} = State,
179✔
336
    Pids = maps:values(Workers),
179✔
337
    lists:filter(fun is_process_alive/1, Pids).
179✔
338

339
get_first_successful_call(WorkerPids, Fun, Args) ->
340
    pulsar_utils:foldl_while(
187✔
341
      fun(WorkerPid, Acc) ->
342
        try
177✔
343
            {halt, apply(Fun, [WorkerPid | Args])}
177✔
344
        catch
345
            _:_ ->
346
                {cont, Acc}
1✔
347
        end
348
      end,
349
      {error, ?no_servers_available},
350
      WorkerPids).
351

352
%% Takes the list of seed URLs and attempts to have at most one worker alive and connected
353
%% to it.
354
-spec spawn_any_and_wait_connected(state()) -> {{ok, pid()} | {error, term()}, state()}.
355
spawn_any_and_wait_connected(State0) ->
356
    #{?seed_urls := SeedURLs} = State0,
35✔
357
    spawn_any_and_wait_connected(State0, SeedURLs).
35✔
358

359
spawn_any_and_wait_connected(State0, SeedURLs) ->
360
    #{ ?client_id := ClientId
36✔
361
     , ?initial_opts := Opts
362
     , ?workers := Workers0
363
     } = State0,
364
    Res =
36✔
365
        pulsar_utils:foldl_while(
366
          fun(URL, _Acc) ->
367
            case pulsar_client:start_link(ClientId, URL, Opts, self()) of
36✔
368
                {error, _} = Error ->
369
                    {cont, Error};
5✔
370
                {ok, Pid} ->
371
                    {halt, {ok, {URL, Pid}}}
31✔
372
            end
373
          end,
374
          {error, ?no_servers_available},
375
          SeedURLs),
376
    %% We currently don't use `ignore'.
377
    case Res of
36✔
378
        {ok, {URL, Pid}} ->
379
            Workers = Workers0#{bin(URL) => Pid},
31✔
380
            State = State0#{workers := Workers},
31✔
381
            {{ok, Pid}, State};
31✔
382
        {error, Reason} ->
383
            {{error, Reason}, State0}
5✔
384
    end.
385

386
log_error(Fmt, Args) ->
NEW
387
    do_log(error, Fmt, Args).
×
388

389
log_info(Fmt, Args) ->
390
    do_log(info, Fmt, Args).
9✔
391

392
do_log(Level, Fmt, Args) ->
393
    logger:log(Level, "[pulsar-client-manager] " ++ Fmt, Args, #{domain => [pulsar, client_manager]}).
9✔
394

395
bin(Str) when is_list(Str) -> list_to_binary(Str);
33✔
NEW
396
bin(Bin) when is_binary(Bin) -> Bin.
×
397

NEW
398
deadline(infinity) -> infinity;
×
399
deadline(Timeout) when Timeout > 0 -> now_ts() + Timeout.
43✔
400

NEW
401
is_within_deadline(infinity) -> true;
×
402
is_within_deadline(Deadline) -> now_ts() < Deadline.
43✔
403

404
now_ts() -> erlang:system_time(millisecond).
86✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc