• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 773

29 Nov 2024 08:57PM UTC coverage: 71.844% (-2.0%) from 73.807%
773

push

github

web-flow
Merge pull request #68 from thalesmg/fix-lookup-redirect-mkII

fix(client): handle `Redirect` in `LookupTopicResponse`

189 of 261 new or added lines in 8 files covered. (72.41%)

29 existing lines in 3 files now uncovered.

939 of 1307 relevant lines covered (71.84%)

313.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.36
/src/pulsar_client.erl
1
%% Copyright (c) 2013-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_client).
16

17
-behaviour(gen_server).
18

19
-export([start_link/4]).
20

21
%% gen_server Callbacks
22
-export([ init/1
23
        , handle_call/3
24
        , handle_cast/2
25
        , handle_info/2
26
        , terminate/2
27
        , format_status/1
28
        , format_status/2
29
        ]).
30

31
-export([ get_topic_metadata/2
32
        , lookup_topic/2
33
        , lookup_topic/3
34
        , lookup_topic_async/2
35
        ]).
36

37
-export([ get_status/1
38
        , get_alive_pulsar_url/1
39
        ]).
40

41
-export([try_initial_connection/3]).
42

43
%% Internal export for tests
44
-export([handle_response/2]).
45

46
%%--------------------------------------------------------------------
47
%% Type definitions
48
%%--------------------------------------------------------------------
49

50
-record(state, { client_id
51
               , parent
52
               , sock
53
               , server
54
               , opts
55
               , request_id = 0
56
               , requests = #{}
57
               , from
58
               , last_bin = <<>>
59
               , extra = #{}
60
               }).
61

62
-export_type([lookup_topic_response/0]).
63
-type lookup_topic_response() :: {ok, #{ brokerServiceUrl => string()
64
                                       , proxy_through_service_url => boolean()
65
                                       }}
66
                               | {error, #{error => term(), message => term()}}.
67
%% `gen_server:server_ref()' exists only on OTP 25+
68
-type server_ref() ::
69
        pid() |
70
        (LocalName :: atom()) |
71
        {Name :: atom(), Node :: atom()} |
72
        {global, GlobalName :: term()} |
73
        {via, RegMod :: module(), ViaName :: term()}.
74

75
-define(PING_INTERVAL, 30000). %% 30s
76
-define(PONG_TS, {pulsar_rcvd, pong}).
77
-define(PONG_TIMEOUT, ?PING_INTERVAL * 2). %% 60s
78
-define(CONN_TIMEOUT, 30000).
79

80
%% calls/casts/infos
81
-record(get_topic_metadata, {topic :: pulsar_client_manager:topic()}).
82
-record(lookup_topic, {partition_topic, opts = #{}}).
83
-record(lookup_topic_async, {from, partition_topic, opts = #{}}).
84

85
%%--------------------------------------------------------------------
86
%% API
87
%%--------------------------------------------------------------------
88

89
start_link(ClientId, Server, Opts, Owner) ->
90
    gen_server:start_link(?MODULE, [ClientId, Server, Opts, Owner], []).
36✔
91

92
get_topic_metadata(Pid, Topic) ->
93
    gen_server:call(Pid, #get_topic_metadata{topic = Topic}, 30_000).
34✔
94

95
lookup_topic(Pid, PartitionTopic) ->
96
    lookup_topic(Pid, PartitionTopic, _Opts = #{}).
75✔
97

98
lookup_topic(Pid, PartitionTopic, Opts) ->
99
    gen_server:call(Pid, #lookup_topic{partition_topic = PartitionTopic, opts = Opts}, 30_000).
76✔
100

101
-spec lookup_topic_async(server_ref(), binary()) -> {ok, reference()}.
102
lookup_topic_async(Pid, PartitionTopic) ->
NEW
103
    lookup_topic_async(Pid, PartitionTopic, _Opts = #{}).
×
104

105
lookup_topic_async(Pid, PartitionTopic, Opts) ->
UNCOV
106
    Ref = monitor(process, Pid, [{alias, reply_demonitor}]),
×
UNCOV
107
    From = {self(), Ref},
×
NEW
108
    gen_server:cast(Pid, #lookup_topic_async{from = From, partition_topic = PartitionTopic, opts = Opts}),
×
UNCOV
109
    {ok, Ref}.
×
110

111
get_status(Pid) ->
UNCOV
112
    gen_server:call(Pid, get_status, 5000).
×
113

114
get_alive_pulsar_url(Pid) ->
115
    gen_server:call(Pid, get_alive_pulsar_url, 5000).
68✔
116

117
%%--------------------------------------------------------------------
118
%% gen_server callback
119
%%--------------------------------------------------------------------
120

121
init([ClientId, Server, Opts, Parent]) ->
122
    process_flag(trap_exit, true),
36✔
123
    ConnTimeout = maps:get(connect_timeout, Opts, ?CONN_TIMEOUT),
36✔
124
    Me = self(),
36✔
125
    Pid = spawn_link(fun() -> try_initial_connection(Me, Server, Opts) end),
36✔
126
    TRef = erlang:send_after(ConnTimeout, self(), timeout),
36✔
127
    Result = wait_for_socket_and_opts(ClientId, Server, Pid, Parent, timeout),
36✔
128
    _ = erlang:cancel_timer(TRef),
36✔
129
    exit(Pid, kill),
36✔
130
    receive
36✔
131
        timeout -> ok
×
132
    after 0 ->
133
        ok
36✔
134
    end,
135
    receive
36✔
136
        {'EXIT', Pid, _Error} -> ok
31✔
137
    after 0 ->
138
        ok
5✔
139
    end,
140
    Result.
36✔
141

142
wait_for_socket_and_opts(ClientId, Server, Pid, Parent, LastError) ->
143
    receive
1,299✔
144
        {Pid, {ok, {Sock, Opts}}} ->
145
            State = #state{
31✔
146
              client_id = ClientId,
147
              parent = Parent,
148
              sock = Sock,
149
              server = Server,
150
              opts = Opts
151
            },
152
            {ok, State};
31✔
153
        {Pid, {error, Error}} ->
154
            case contains_authn_error(Error) of
1,265✔
155
                true ->
156
                    log_error("authentication error starting pulsar client: ~p", [Error]),
2✔
157
                    {stop, Error};
2✔
158
                false ->
159
                    wait_for_socket_and_opts(ClientId, Server, Pid, Parent, Error)
1,263✔
160
            end;
161
        timeout ->
162
            log_error("timed out when starting pulsar client; last error: ~p", [LastError]),
3✔
163
            {stop, LastError}
3✔
164
    end.
165

166
contains_authn_error(#{error := 'AuthenticationError'}) -> true;
2✔
167
contains_authn_error(_Reason) -> false.
1,263✔
168

169
handle_call(#get_topic_metadata{topic = Topic}, From,
170
        State = #state{
171
            sock = Sock,
172
            opts = Opts,
173
            request_id = RequestId,
174
            requests = Reqs,
175
            server = Server
176
        }) ->
177
    case get_alive_sock_opts(Server, Sock, Opts) of
34✔
178
        {error, Reason} ->
UNCOV
179
            log_error("get_topic_metadata from pulsar servers failed: ~p", [Reason]),
×
NEW
180
            Reply = {error, unavailable},
×
NEW
181
            {stop, {shutdown, Reason}, Reply, State};
×
182
        {ok, {Sock1, Opts1}} ->
183
            pulsar_socket:send_topic_metadata_packet(Sock1, Topic, RequestId, Opts1),
34✔
184
            {noreply, next_request_id(State#state{
34✔
185
                requests = maps:put(RequestId, {From, Topic}, Reqs),
186
                sock = Sock1,
187
                opts = Opts1
188
            })}
189
    end;
190
handle_call(#lookup_topic{partition_topic = Topic, opts = ReqOpts}, From,
191
        State = #state{
192
            sock = Sock,
193
            opts = Opts,
194
            request_id = RequestId,
195
            requests = Reqs,
196
            server = Server
197
        }) ->
198
    case get_alive_sock_opts(Server, Sock, Opts) of
76✔
199
        {error, Reason} ->
NEW
200
            log_error("lookup_topic from pulsar failed: ~0p down", [Reason]),
×
NEW
201
            {stop, {shutdown, Reason}, {error, unavailable}, State};
×
202
        {ok, {Sock1, Opts1}} ->
203
            pulsar_socket:send_lookup_topic_packet(Sock1, Topic, RequestId, ReqOpts, Opts1),
76✔
204
            {noreply, next_request_id(State#state{
76✔
205
                requests = maps:put(RequestId, {From, Topic}, Reqs),
206
                sock = Sock1,
207
                opts = Opts1
208
            })}
209
    end;
210
handle_call(get_status, From, State = #state{sock = undefined, opts = Opts, server = Server}) ->
NEW
211
    case get_alive_sock_opts(Server, undefined, Opts) of
×
212
        {error, Reason} ->
NEW
213
            log_error("get_status from pulsar failed: ~0p", [Reason]),
×
NEW
214
            {stop, {shutdown, Reason}, false, State};
×
215
        {ok, {Sock, Opts1}} ->
NEW
216
            IsHealthy = not is_pong_longtime_no_received(),
×
NEW
217
            case IsHealthy of
×
218
                true ->
NEW
219
                    {reply, IsHealthy, State#state{from = From, sock = Sock, opts = Opts1}};
×
220
                false ->
NEW
221
                    {stop, {shutdown, no_pong_received}, IsHealthy, State}
×
222
            end
223
    end;
224
handle_call(get_status, _From, State) ->
NEW
225
    IsHealthy = not is_pong_longtime_no_received(),
×
NEW
226
    case IsHealthy of
×
227
        true ->
NEW
228
            {reply, IsHealthy, State};
×
229
        false ->
NEW
230
            {stop, {shutdown, no_pong_received}, IsHealthy, State}
×
231
    end;
232
handle_call(get_alive_pulsar_url, From, State = #state{sock = Sock, opts = Opts, server = Server}) ->
233
    case get_alive_sock_opts(Server, Sock, Opts) of
68✔
234
        {error, Reason} ->
NEW
235
            {stop, {shutdown, Reason}, {error, Reason}, State};
×
236
        {ok, {Sock1, Opts1}} ->
237
            URI = pulsar_socket:get_pulsar_uri(Sock1, Opts),
68✔
238
            {reply, URI, State#state{from = From, sock = Sock1, opts = Opts1}}
68✔
239
    end;
240
handle_call(_Req, _From, State) ->
NEW
241
    {reply, {error, unknown_call}, State, hibernate}.
×
242

243
handle_cast(#lookup_topic_async{from = From, partition_topic = PartitionTopic, opts = Opts}, State) ->
244
    %% re-use the same logic as the call, as the process of looking up
245
    %% a topic is itself async in the gen_server:call.
NEW
246
    self() ! {'$gen_call', From, #lookup_topic{partition_topic = PartitionTopic, opts = Opts}},
×
UNCOV
247
    {noreply, State};
×
248
handle_cast(_Req, State) ->
249
    {noreply, State, hibernate}.
×
250

251
handle_info({'EXIT', Parent, Reason}, State = #state{parent = Parent}) ->
NEW
252
    {stop, {shutdown, Reason}, State};
×
253
handle_info({Transport, Sock, Bin}, State = #state{sock = Sock, last_bin = LastBin})
254
        when Transport == tcp; Transport == ssl ->
255
    {noreply, parse_packet(pulsar_protocol_frame:parse(<<LastBin/binary, Bin/binary>>), State)};
114✔
256
handle_info({Error, Sock, Reason}, State = #state{sock = Sock})
257
        when Error == ssl_error; Error == tcp_error ->
258
    log_error("transport layer error: ~p", [Reason]),
×
NEW
259
    {stop, {shutdown, Reason}, State#state{sock = undefined}};
×
260
handle_info({Closed, Sock}, State = #state{sock = Sock})
261
        when Closed == tcp_closed; Closed == ssl_closed ->
262
    log_error("connection closed by peer", []),
9✔
263
    {stop, {shutdown, connection_closed}, State#state{sock = undefined}};
9✔
264
handle_info(ping, State = #state{sock = undefined, opts = Opts, server = Server}) ->
NEW
265
    case get_alive_sock_opts(Server, undefined, Opts) of
×
266
        {error, Reason} ->
UNCOV
267
            log_error("ping to pulsar servers failed: ~p", [Reason]),
×
NEW
268
            {stop, {shutdown, Reason}, State};
×
269
        {ok, {Sock, Opts1}} ->
UNCOV
270
            pulsar_socket:ping(Sock, Opts1),
×
NEW
271
            start_check_pong_timeout(),
×
UNCOV
272
            {noreply, State#state{sock = Sock, opts = Opts1}, hibernate}
×
273
    end;
274
handle_info(ping, State = #state{sock = Sock, opts = Opts}) ->
UNCOV
275
    pulsar_socket:ping(Sock, Opts),
×
NEW
276
    start_check_pong_timeout(),
×
UNCOV
277
    {noreply, State, hibernate};
×
278
handle_info(check_pong, State) ->
NEW
279
    case is_pong_longtime_no_received() of
×
280
        true ->
NEW
281
            {stop, {shutdown, no_pong_received}, State};
×
282
        false ->
NEW
283
            {noreply, State, hibernate}
×
284
    end;
285
handle_info(_Info, State) ->
NEW
286
    log_warning("received unknown message: ~p", [_Info]),
×
287
    {noreply, State, hibernate}.
×
288

289
terminate(_Reason, #state{sock = undefined}) ->
290
    ok;
9✔
291
terminate(_Reason, #state{sock = Sock, opts = Opts}) ->
292
    _ = pulsar_socket:close(Sock, Opts),
22✔
293
    ok.
22✔
294

295
format_status(Status) ->
296
    maps:map(
1✔
297
      fun(state, State0) ->
298
              censor_secrets(State0);
1✔
299
         (_Key, Value)->
300
              Value
3✔
301
      end,
302
      Status).
303

304
%% `format_status/2' is deprecated as of OTP 25.0
305
format_status(_Opt, [_PDict, State0]) ->
306
    State = censor_secrets(State0),
×
307
    [{data, [{"State", State}]}].
×
308

309
censor_secrets(State0 = #state{opts = Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
310
    State0#state{opts = Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
311
censor_secrets(State) ->
312
    State.
1✔
313

314
parse_packet({incomplete, Bin}, State) ->
315
    State#state{last_bin = Bin};
×
316
parse_packet({Cmd, <<>>}, State) ->
317
    ?MODULE:handle_response(Cmd, State#state{last_bin = <<>>});
114✔
318
parse_packet({Cmd, LastBin}, State) ->
NEW
319
    State2 = ?MODULE:handle_response(Cmd, State),
×
UNCOV
320
    parse_packet(pulsar_protocol_frame:parse(LastBin), State2).
×
321

322
handle_response({connected, _ConnectedData}, State = #state{from = undefined}) ->
323
    start_keepalive(),
×
324
    State;
×
325
handle_response({connected, _ConnectedData}, State = #state{from = From}) ->
326
    start_keepalive(),
×
327
    gen_server:reply(From, true),
×
328
    State#state{from = undefined};
×
329
handle_response({partitionMetadataResponse, #{error := Reason, message := Msg,
330
                                        request_id := RequestId, response := 'Failed'}},
331
                State = #state{requests = Reqs}) ->
332
    case maps:get(RequestId, Reqs, undefined) of
×
333
        {From, _} ->
334
            gen_server:reply(From, {error, #{error => Reason, message => Msg}}),
×
335
            State#state{requests = maps:remove(RequestId, Reqs)};
×
336
        undefined ->
337
            State
×
338
    end;
339
handle_response({partitionMetadataResponse, #{partitions := Partitions,
340
                                              request_id := RequestId}},
341
                State = #state{requests = Reqs}) ->
342
    case maps:get(RequestId, Reqs, undefined) of
34✔
343
        {From, Topic} ->
344
            gen_server:reply(From, {ok, {Topic, Partitions}}),
34✔
345
            State#state{requests = maps:remove(RequestId, Reqs)};
34✔
346
        undefined ->
347
            State
×
348
    end;
349
handle_response({lookupTopicResponse, #{error := Reason, message := Msg,
350
                                        request_id := RequestId, response := 'Failed'}},
351
                State = #state{requests = Reqs}) ->
352
    case maps:get(RequestId, Reqs, undefined) of
×
353
        {From, _} ->
354
            gen_server:reply(From, {error, #{error => Reason, message => Msg}}),
×
355
            State#state{requests = maps:remove(RequestId, Reqs)};
×
356
        undefined ->
357
            State
×
358
    end;
359
handle_response({lookupTopicResponse, #{response := 'Redirect'} = Response}, State0) ->
360
    #state{requests = Requests0} = State0,
1✔
361
    #{request_id := RequestId} = Response,
1✔
362
    case maps:take(RequestId, Requests0) of
1✔
363
        {{From, _Topic}, Requests} ->
364
            State = State0#state{requests = Requests},
1✔
365
            handle_redirect_lookup_response(State, From, Response);
1✔
366
        error ->
NEW
367
            State0
×
368
    end;
369
handle_response({lookupTopicResponse, #{request_id := RequestId, response := 'Connect'} = Response},
370
                State = #state{requests = Reqs, opts = Opts}) ->
371
    case maps:get(RequestId, Reqs, undefined) of
74✔
372
        {From, _} ->
373
            ServiceURL = get_service_url_from_lookup_response(Response, Opts),
74✔
374
            gen_server:reply(From, {ok,
74✔
375
                #{ brokerServiceUrl => ServiceURL
376
                 , proxy_through_service_url => maps:get(proxy_through_service_url, Response, false)
377
                 }}),
378
            State#state{requests = maps:remove(RequestId, Reqs)};
74✔
379
        undefined ->
380
            State
×
381
    end;
382
handle_response({ping, #{}}, State = #state{sock = Sock, opts = Opts}) ->
383
    pulsar_socket:pong(Sock, Opts),
5✔
384
    State;
5✔
385
handle_response({pong, #{}}, State) ->
UNCOV
386
    pong_received(),
×
UNCOV
387
    start_keepalive(),
×
UNCOV
388
    State;
×
389
handle_response(_Info, State) ->
390
    log_error("handle unknown response: ~p", [_Info]),
×
391
    State.
×
392

393
get_alive_sock_opts(Server, undefined, Opts) ->
394
    try_connect(Server, Opts);
1,296✔
395
get_alive_sock_opts(Server, Sock, Opts) ->
396
    case pulsar_socket:getstat(Sock, Opts) of
178✔
397
        {ok, _} ->
398
            {ok, {Sock, Opts}};
178✔
399
        {error, _} ->
NEW
400
            try_connect(Server, Opts)
×
401
    end.
402

403
try_connect(URI, Opts0) ->
404
    {Type, {Host, Port}} = pulsar_utils:parse_url(URI),
1,296✔
405
    Opts = pulsar_utils:maybe_enable_ssl_opts(Type, Opts0),
1,296✔
406
    case pulsar_socket:connect(Host, Port, Opts) of
1,296✔
407
        {ok, Sock} ->
408
            pulsar_socket:send_connect_packet(Sock, Opts),
34✔
409
            case wait_for_conn_response(Sock, Opts) of
34✔
410
                {ok, Result} ->
411
                    {ok, Result};
31✔
412
                {error, Reason} ->
413
                    ok = close_socket_and_flush_signals(Sock, Opts),
3✔
414
                    {error, Reason}
3✔
415
            end;
416
        {error, Reason} ->
417
            {error, Reason}
1,262✔
418
    end.
419

420
wait_for_conn_response(Sock, Opts) ->
421
    receive
34✔
422
        {Transport, Sock, Bin} when Transport == tcp; Transport == ssl ->
423
            case pulsar_protocol_frame:parse(Bin) of
33✔
424
                {{connected, _CommandConnected}, <<>>} ->
425
                    {ok, {Sock, Opts}};
31✔
426
                {{error, CommandError}, <<>>} ->
427
                    {error, CommandError}
2✔
428
            end;
429
        {Error, Sock, Reason} when Error == ssl_error; Error == tcp_error ->
430
            {error, {Error, Reason}};
×
431
        {Closed, Sock} when Closed == tcp_closed; Closed == ssl_closed ->
432
            {error, Closed}
1✔
433
    after
434
        15000 ->
435
            {error, wait_connect_response_timeout}
×
436
    end.
437

438
next_request_id(State = #state{request_id = 65535}) ->
439
    State#state{request_id = 1};
×
440
next_request_id(State = #state{request_id = RequestId}) ->
441
    State#state{request_id = RequestId+1}.
110✔
442

443
log_error(Fmt, Args) ->
444
    do_log(error, Fmt, Args).
14✔
445

446
log_warning(Fmt, Args) ->
NEW
447
    do_log(warning, Fmt, Args).
×
448

449
do_log(Level, Fmt, Args) ->
450
    logger:log(Level, "[pulsar-client] " ++ Fmt, Args, #{domain => [pulsar, client_worker]}).
14✔
451

452
%% we use the same ping workflow as it attempts the connection
453
start_keepalive() ->
UNCOV
454
    erlang:send_after(?PING_INTERVAL, self(), ping).
×
455

456
start_check_pong_timeout() ->
NEW
457
    erlang:send_after(?PONG_TIMEOUT, self(), check_pong).
×
458

459
%% TODO: use explicit state instead of dictionary
460
pong_received() ->
UNCOV
461
    _ = erlang:put(?PONG_TS, now_ts()),
×
UNCOV
462
    ok.
×
463

464
%% TODO: use explicit state instead of dictionary
465
is_pong_longtime_no_received() ->
UNCOV
466
    case erlang:get(?PONG_TS) of
×
UNCOV
467
        undefined -> false;
×
468
        Ts -> now_ts() - Ts > ?PONG_TIMEOUT
×
469
    end.
470

471
now_ts() ->
UNCOV
472
    erlang:system_time(millisecond).
×
473

474
%% close sockt and flush socket error and closed signals
475
close_socket_and_flush_signals(Sock, Opts) ->
476
    _ = pulsar_socket:close(Sock, Opts),
3✔
477
    receive
3✔
478
        {Transport, Sock, _} when Transport == tcp; Transport == ssl ->
479
            %% race condition
480
            ok;
×
481
        {Error, Sock, _Reason} when Error == ssl_error; Error == tcp_error ->
482
            ok;
×
483
        {Closed, Sock} when Closed == tcp_closed; Closed == ssl_closed ->
484
            ok
×
485
    after
486
        0 ->
487
            ok
3✔
488
    end.
489

490
try_initial_connection(Parent, Server, Opts) ->
491
    case get_alive_sock_opts(Server, undefined, Opts) of
1,296✔
492
        {error, Reason} ->
493
            %% to avoid a hot restart loop leading to max restart
494
            %% intensity when pulsar (or the connection to it) is
495
            %% having a bad day...
496
            Parent ! {self(), {error, Reason}},
1,265✔
497
            timer:sleep(100),
1,265✔
498
            ?MODULE:try_initial_connection(Parent, Server, Opts);
1,260✔
499
        {ok, {Sock, Opts1}} ->
500
            pulsar_socket:controlling_process(Sock, Parent, Opts1),
31✔
501
            Parent ! {self(), {ok, {Sock, Opts1}}}
31✔
502
    end.
503

504
get_service_url_from_lookup_response(Response, Opts) ->
505
    case {Opts, Response} of
75✔
506
        {#{enable_ssl := true}, #{brokerServiceUrlTls := BrokerServiceUrlTls}} ->
NEW
507
            BrokerServiceUrlTls;
×
508
        {#{enable_ssl := true}, #{brokerServiceUrl := BrokerServiceUrl}} ->
NEW
509
            log_error("SSL enabled but brokerServiceUrlTls is not provided by pulsar,"
×
510
                      " falling back to brokerServiceUrl: ~p", [BrokerServiceUrl]),
NEW
511
            BrokerServiceUrl;
×
512
        {_, #{brokerServiceUrl := BrokerServiceUrl}} ->
513
            %% the 'brokerServiceUrl' is a mandatory field in case the SSL is disabled
514
            BrokerServiceUrl
75✔
515
    end.
516

517
%% If we receive a response of lookup type `Redirect', we must re-issue the lookup
518
%% connecting to the returned broker.
519
%% https://pulsar.apache.org/docs/2.11.x/developing-binary-protocol/#lookuptopicresponse
520
handle_redirect_lookup_response(State, From, Response) ->
521
    #state{opts = Opts} = State,
1✔
522
    ServiceURL = get_service_url_from_lookup_response(Response, Opts),
1✔
523
    ReqOpts = #{authoritative => maps:get(authoritative, Response, false)},
1✔
524
    gen_server:reply(From, {redirect, ServiceURL, ReqOpts}),
1✔
525
    State.
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc