• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 858

24 Nov 2025 04:28PM UTC coverage: 73.517%. First build
858

Pull #79

github

zmstone
ci: fix python venv
Pull Request #79: 251124 publish single message if batch size is 1

60 of 64 new or added lines in 8 files covered. (93.75%)

1041 of 1416 relevant lines covered (73.52%)

254.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.51
/src/pulsar_client.erl
1
%% Copyright (c) 2013-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_client).
16

17
-behaviour(gen_server).
18

19
-export([start_link/4]).
20

21
%% gen_server Callbacks
22
-export([ init/1
23
        , handle_call/3
24
        , handle_cast/2
25
        , handle_info/2
26
        , terminate/2
27
        ]).
28

29
-if(?OTP_RELEASE >= 25).
30
-export([format_status/1]).
31
-else.
32
-export([format_status/2]).
33
-endif.
34

35
-export([ get_topic_metadata/2
36
        , lookup_topic/2
37
        , lookup_topic/3
38
        , lookup_topic_async/2
39
        ]).
40

41
-export([ get_status/1
42
        , get_alive_pulsar_url/1
43
        ]).
44

45
-export([try_initial_connection/3]).
46

47
%% Internal export for tests
48
-export([handle_response/2]).
49

50
%%--------------------------------------------------------------------
51
%% Type definitions
52
%%--------------------------------------------------------------------
53

54
-record(state, { client_id
55
               , parent
56
               , sock
57
               , server
58
               , opts
59
               , request_id = 0
60
               , requests = #{}
61
               , from
62
               , extra = #{}
63
               }).
64

65
-export_type([lookup_topic_response/0]).
66
-type lookup_topic_response() :: {ok, #{ brokerServiceUrl => string()
67
                                       , proxy_through_service_url => boolean()
68
                                       }}
69
                               | {error, #{error => term(), message => term()}}.
70
%% `gen_server:server_ref()' exists only on OTP 25+
71
-type server_ref() ::
72
        pid() |
73
        (LocalName :: atom()) |
74
        {Name :: atom(), Node :: atom()} |
75
        {global, GlobalName :: term()} |
76
        {via, RegMod :: module(), ViaName :: term()}.
77

78
-define(PING_INTERVAL, 30000). %% 30s
79
-define(PONG_TS, {pulsar_rcvd, pong}).
80
-define(PONG_TIMEOUT, ?PING_INTERVAL * 2). %% 60s
81
-define(CONN_TIMEOUT, 30000).
82

83
%% calls/casts/infos
84
-record(get_topic_metadata, {topic :: pulsar_client_manager:topic()}).
85
-record(lookup_topic, {partition_topic, opts = #{}}).
86
-record(lookup_topic_async, {from, partition_topic, opts = #{}}).
87

88
%%--------------------------------------------------------------------
89
%% API
90
%%--------------------------------------------------------------------
91

92
start_link(ClientId, Server, Opts, Owner) ->
93
    gen_server:start_link(?MODULE, [ClientId, Server, Opts, Owner], []).
38✔
94

95
get_topic_metadata(Pid, Topic) ->
96
    gen_server:call(Pid, #get_topic_metadata{topic = Topic}, 30_000).
36✔
97

98
lookup_topic(Pid, PartitionTopic) ->
99
    lookup_topic(Pid, PartitionTopic, _Opts = #{}).
79✔
100

101
lookup_topic(Pid, PartitionTopic, Opts) ->
102
    gen_server:call(Pid, #lookup_topic{partition_topic = PartitionTopic, opts = Opts}, 30_000).
80✔
103

104
-spec lookup_topic_async(server_ref(), binary()) -> {ok, reference()}.
105
lookup_topic_async(Pid, PartitionTopic) ->
106
    lookup_topic_async(Pid, PartitionTopic, _Opts = #{}).
×
107

108
lookup_topic_async(Pid, PartitionTopic, Opts) ->
109
    Ref = monitor(process, Pid, [{alias, reply_demonitor}]),
×
110
    From = {self(), Ref},
×
111
    gen_server:cast(Pid, #lookup_topic_async{from = From, partition_topic = PartitionTopic, opts = Opts}),
×
112
    {ok, Ref}.
×
113

114
get_status(Pid) ->
115
    gen_server:call(Pid, get_status, 5000).
×
116

117
get_alive_pulsar_url(Pid) ->
118
    gen_server:call(Pid, get_alive_pulsar_url, 5000).
73✔
119

120
%%--------------------------------------------------------------------
121
%% gen_server callback
122
%%--------------------------------------------------------------------
123

124
init([ClientId, Server, Opts, Parent]) ->
125
    process_flag(trap_exit, true),
38✔
126
    ConnTimeout = maps:get(connect_timeout, Opts, ?CONN_TIMEOUT),
38✔
127
    Me = self(),
38✔
128
    Pid = spawn_link(fun() -> try_initial_connection(Me, Server, Opts) end),
38✔
129
    TRef = erlang:send_after(ConnTimeout, self(), timeout),
38✔
130
    Result = wait_for_socket_and_opts(ClientId, Server, Pid, Parent, timeout),
38✔
131
    _ = erlang:cancel_timer(TRef),
38✔
132
    exit(Pid, kill),
38✔
133
    receive
38✔
134
        timeout -> ok
×
135
    after 0 ->
136
        ok
38✔
137
    end,
138
    receive
38✔
139
        {'EXIT', Pid, _Error} -> ok
33✔
140
    after 0 ->
141
        ok
5✔
142
    end,
143
    Result.
38✔
144

145
wait_for_socket_and_opts(ClientId, Server, Pid, Parent, LastError) ->
146
    receive
1,151✔
147
        {Pid, {ok, {Sock, Opts}}} ->
148
            State = #state{
33✔
149
              client_id = ClientId,
150
              parent = Parent,
151
              sock = Sock,
152
              server = Server,
153
              opts = Opts
154
            },
155
            {ok, State};
33✔
156
        {Pid, {error, Error}} ->
157
            case contains_authn_error(Error) of
1,115✔
158
                true ->
159
                    log_error("authentication error starting pulsar client: ~p", [Error]),
2✔
160
                    {stop, Error};
2✔
161
                false ->
162
                    wait_for_socket_and_opts(ClientId, Server, Pid, Parent, Error)
1,113✔
163
            end;
164
        timeout ->
165
            log_error("timed out when starting pulsar client; last error: ~p", [LastError]),
3✔
166
            {stop, LastError}
3✔
167
    end.
168

169
contains_authn_error(#{error := 'AuthenticationError'}) -> true;
2✔
170
contains_authn_error(_Reason) -> false.
1,113✔
171

172
handle_call(#get_topic_metadata{topic = Topic}, From,
173
        State = #state{
174
            sock = Sock,
175
            opts = Opts,
176
            request_id = RequestId,
177
            requests = Reqs,
178
            server = Server
179
        }) ->
180
    case get_alive_sock_opts(Server, Sock, Opts) of
36✔
181
        {error, Reason} ->
182
            log_error("get_topic_metadata from pulsar servers failed: ~p", [Reason]),
×
183
            Reply = {error, unavailable},
×
184
            {stop, {shutdown, Reason}, Reply, State};
×
185
        {ok, {Sock1, Opts1}} ->
186
            pulsar_socket:send_topic_metadata_packet(Sock1, Topic, RequestId, Opts1),
36✔
187
            {noreply, next_request_id(State#state{
36✔
188
                requests = maps:put(RequestId, {From, Topic}, Reqs),
189
                sock = Sock1,
190
                opts = Opts1
191
            })}
192
    end;
193
handle_call(#lookup_topic{partition_topic = Topic, opts = ReqOpts}, From,
194
        State = #state{
195
            sock = Sock,
196
            opts = Opts,
197
            request_id = RequestId,
198
            requests = Reqs,
199
            server = Server
200
        }) ->
201
    case get_alive_sock_opts(Server, Sock, Opts) of
80✔
202
        {error, Reason} ->
203
            log_error("lookup_topic from pulsar failed: ~0p down", [Reason]),
×
204
            {stop, {shutdown, Reason}, {error, unavailable}, State};
×
205
        {ok, {Sock1, Opts1}} ->
206
            pulsar_socket:send_lookup_topic_packet(Sock1, Topic, RequestId, ReqOpts, Opts1),
80✔
207
            {noreply, next_request_id(State#state{
80✔
208
                requests = maps:put(RequestId, {From, Topic}, Reqs),
209
                sock = Sock1,
210
                opts = Opts1
211
            })}
212
    end;
213
handle_call(get_status, From, State = #state{sock = undefined, opts = Opts, server = Server}) ->
214
    case get_alive_sock_opts(Server, undefined, Opts) of
×
215
        {error, Reason} ->
216
            log_error("get_status from pulsar failed: ~0p", [Reason]),
×
217
            {stop, {shutdown, Reason}, false, State};
×
218
        {ok, {Sock, Opts1}} ->
219
            IsHealthy = not is_pong_longtime_no_received(),
×
220
            case IsHealthy of
×
221
                true ->
222
                    {reply, IsHealthy, State#state{from = From, sock = Sock, opts = Opts1}};
×
223
                false ->
224
                    {stop, {shutdown, no_pong_received}, IsHealthy, State}
×
225
            end
226
    end;
227
handle_call(get_status, _From, State) ->
228
    IsHealthy = not is_pong_longtime_no_received(),
×
229
    case IsHealthy of
×
230
        true ->
231
            {reply, IsHealthy, State};
×
232
        false ->
233
            {stop, {shutdown, no_pong_received}, IsHealthy, State}
×
234
    end;
235
handle_call(get_alive_pulsar_url, From, State = #state{sock = Sock, opts = Opts, server = Server}) ->
236
    case get_alive_sock_opts(Server, Sock, Opts) of
73✔
237
        {error, Reason} ->
238
            {stop, {shutdown, Reason}, {error, Reason}, State};
×
239
        {ok, {Sock1, Opts1}} ->
240
            URI = pulsar_socket:get_pulsar_uri(Sock1, Opts),
73✔
241
            {reply, URI, State#state{from = From, sock = Sock1, opts = Opts1}}
73✔
242
    end;
243
handle_call(_Req, _From, State) ->
244
    {reply, {error, unknown_call}, State, hibernate}.
×
245

246
handle_cast(#lookup_topic_async{from = From, partition_topic = PartitionTopic, opts = Opts}, State) ->
247
    %% re-use the same logic as the call, as the process of looking up
248
    %% a topic is itself async in the gen_server:call.
249
    self() ! {'$gen_call', From, #lookup_topic{partition_topic = PartitionTopic, opts = Opts}},
×
250
    {noreply, State};
×
251
handle_cast(_Req, State) ->
252
    {noreply, State, hibernate}.
×
253

254
handle_info({'EXIT', Parent, Reason}, State = #state{parent = Parent}) ->
NEW
255
    {stop, Reason, State};
×
256
handle_info({Transport, Sock, Bin}, State = #state{sock = Sock}) when Transport == tcp; Transport == ssl ->
257
    {noreply, ?MODULE:handle_response(pulsar_protocol_frame:parse(Bin), State)};
120✔
258
handle_info({Error, Sock, Reason}, State = #state{sock = Sock})
259
        when Error == ssl_error; Error == tcp_error ->
260
    log_error("transport layer error: ~p", [Reason]),
×
261
    {stop, {shutdown, Reason}, State#state{sock = undefined}};
×
262
handle_info({Closed, Sock}, State = #state{sock = Sock})
263
        when Closed == tcp_closed; Closed == ssl_closed ->
264
    log_error("connection closed by peer", []),
9✔
265
    {stop, {shutdown, connection_closed}, State#state{sock = undefined}};
9✔
266
handle_info(ping, State = #state{sock = undefined, opts = Opts, server = Server}) ->
267
    case get_alive_sock_opts(Server, undefined, Opts) of
×
268
        {error, Reason} ->
269
            log_error("ping to pulsar servers failed: ~p", [Reason]),
×
270
            {stop, {shutdown, Reason}, State};
×
271
        {ok, {Sock, Opts1}} ->
272
            pulsar_socket:ping(Sock, Opts1),
×
273
            start_check_pong_timeout(),
×
274
            {noreply, State#state{sock = Sock, opts = Opts1}, hibernate}
×
275
    end;
276
handle_info(ping, State = #state{sock = Sock, opts = Opts}) ->
277
    pulsar_socket:ping(Sock, Opts),
×
278
    start_check_pong_timeout(),
×
279
    {noreply, State, hibernate};
×
280
handle_info(check_pong, State) ->
281
    case is_pong_longtime_no_received() of
×
282
        true ->
283
            {stop, {shutdown, no_pong_received}, State};
×
284
        false ->
285
            {noreply, State, hibernate}
×
286
    end;
287
handle_info(_Info, State) ->
288
    log_warning("received unknown message: ~p", [_Info]),
×
289
    {noreply, State, hibernate}.
×
290

291
terminate(_Reason, #state{sock = undefined}) ->
292
    ok;
9✔
293
terminate(_Reason, #state{sock = Sock, opts = Opts}) ->
294
    _ = pulsar_socket:close(Sock, Opts),
24✔
295
    ok.
24✔
296

297
-if(?OTP_RELEASE >= 25).
298
format_status(Status) ->
299
    maps:map(
1✔
300
      fun(state, State0) ->
301
              censor_secrets(State0);
1✔
302
         (_Key, Value)->
303
              Value
3✔
304
      end,
305
      Status).
306
-else.
307
%% `format_status/2' is deprecated as of OTP 25.0
308
format_status(_Opt, [_PDict, State0]) ->
309
    State = censor_secrets(State0),
310
    [{data, [{"State", State}]}].
311
-endif.
312

313
censor_secrets(State0 = #state{opts = Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
314
    State0#state{opts = Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
315
censor_secrets(State) ->
316
    State.
1✔
317

318
handle_response({connected, _ConnectedData}, State = #state{from = undefined}) ->
319
    start_keepalive(),
×
320
    State;
×
321
handle_response({connected, _ConnectedData}, State = #state{from = From}) ->
322
    start_keepalive(),
×
323
    gen_server:reply(From, true),
×
324
    State#state{from = undefined};
×
325
handle_response({partitionMetadataResponse, #{error := Reason, message := Msg,
326
                                        request_id := RequestId, response := 'Failed'}},
327
                State = #state{requests = Reqs}) ->
328
    case maps:get(RequestId, Reqs, undefined) of
×
329
        {From, _} ->
330
            gen_server:reply(From, {error, #{error => Reason, message => Msg}}),
×
331
            State#state{requests = maps:remove(RequestId, Reqs)};
×
332
        undefined ->
333
            State
×
334
    end;
335
handle_response({partitionMetadataResponse, #{partitions := Partitions,
336
                                              request_id := RequestId}},
337
                State = #state{requests = Reqs}) ->
338
    case maps:get(RequestId, Reqs, undefined) of
36✔
339
        {From, Topic} ->
340
            gen_server:reply(From, {ok, {Topic, Partitions}}),
36✔
341
            State#state{requests = maps:remove(RequestId, Reqs)};
36✔
342
        undefined ->
343
            State
×
344
    end;
345
handle_response({lookupTopicResponse, #{error := Reason, message := Msg,
346
                                        request_id := RequestId, response := 'Failed'}},
347
                State = #state{requests = Reqs}) ->
348
    case maps:get(RequestId, Reqs, undefined) of
×
349
        {From, _} ->
350
            gen_server:reply(From, {error, #{error => Reason, message => Msg}}),
×
351
            State#state{requests = maps:remove(RequestId, Reqs)};
×
352
        undefined ->
353
            State
×
354
    end;
355
handle_response({lookupTopicResponse, #{response := 'Redirect'} = Response}, State0) ->
356
    #state{requests = Requests0} = State0,
1✔
357
    #{request_id := RequestId} = Response,
1✔
358
    case maps:take(RequestId, Requests0) of
1✔
359
        {{From, _Topic}, Requests} ->
360
            State = State0#state{requests = Requests},
1✔
361
            handle_redirect_lookup_response(State, From, Response);
1✔
362
        error ->
363
            State0
×
364
    end;
365
handle_response({lookupTopicResponse, #{request_id := RequestId, response := 'Connect'} = Response},
366
                State = #state{requests = Reqs, opts = Opts}) ->
367
    case maps:get(RequestId, Reqs, undefined) of
78✔
368
        {From, _} ->
369
            ServiceURL = get_service_url_from_lookup_response(Response, Opts),
78✔
370
            gen_server:reply(From, {ok,
78✔
371
                #{ brokerServiceUrl => ServiceURL
372
                 , proxy_through_service_url => maps:get(proxy_through_service_url, Response, false)
373
                 }}),
374
            State#state{requests = maps:remove(RequestId, Reqs)};
78✔
375
        undefined ->
376
            State
×
377
    end;
378
handle_response({ping, #{}}, State = #state{sock = Sock, opts = Opts}) ->
379
    pulsar_socket:pong(Sock, Opts),
5✔
380
    State;
5✔
381
handle_response({pong, #{}}, State) ->
382
    pong_received(),
×
383
    start_keepalive(),
×
384
    State;
×
385
handle_response(_Info, State) ->
386
    log_error("handle unknown response: ~p", [_Info]),
×
387
    State.
×
388

389
get_alive_sock_opts(Server, undefined, Opts) ->
390
    try_connect(Server, Opts);
1,148✔
391
get_alive_sock_opts(Server, Sock, Opts) ->
392
    case pulsar_socket:getstat(Sock, Opts) of
189✔
393
        {ok, _} ->
394
            {ok, {Sock, Opts}};
189✔
395
        {error, _} ->
396
            try_connect(Server, Opts)
×
397
    end.
398

399
try_connect(URI, Opts0) ->
400
    {Type, {Host, Port}} = pulsar_utils:parse_url(URI),
1,148✔
401
    Opts = pulsar_utils:maybe_enable_ssl_opts(Type, Opts0),
1,148✔
402
    case pulsar_socket:connect(Host, Port, Opts) of
1,148✔
403
        {ok, Sock} ->
404
            pulsar_socket:send_connect_packet(Sock, Opts),
36✔
405
            case wait_for_conn_response(Sock, Opts) of
36✔
406
                {ok, Result} ->
407
                    {ok, Result};
33✔
408
                {error, Reason} ->
409
                    ok = close_socket_and_flush_signals(Sock, Opts),
3✔
410
                    {error, Reason}
3✔
411
            end;
412
        {error, Reason} ->
413
            {error, Reason}
1,112✔
414
    end.
415

416
wait_for_conn_response(Sock, Opts) ->
417
    receive
36✔
418
        {Transport, Sock, Bin} when Transport == tcp; Transport == ssl ->
419
            case pulsar_protocol_frame:parse(Bin) of
35✔
420
                {connected, _CommandConnected} ->
421
                    {ok, {Sock, Opts}};
33✔
422
                {error, CommandError} ->
423
                    {error, CommandError}
2✔
424
            end;
425
        {Error, Sock, Reason} when Error == ssl_error; Error == tcp_error ->
426
            {error, {Error, Reason}};
×
427
        {Closed, Sock} when Closed == tcp_closed; Closed == ssl_closed ->
428
            {error, Closed}
1✔
429
    after
430
        15000 ->
431
            {error, wait_connect_response_timeout}
×
432
    end.
433

434
next_request_id(State = #state{request_id = 65535}) ->
435
    State#state{request_id = 1};
×
436
next_request_id(State = #state{request_id = RequestId}) ->
437
    State#state{request_id = RequestId+1}.
116✔
438

439
log_error(Fmt, Args) ->
440
    do_log(error, Fmt, Args).
14✔
441

442
log_warning(Fmt, Args) ->
443
    do_log(warning, Fmt, Args).
×
444

445
do_log(Level, Fmt, Args) ->
446
    logger:log(Level, "[pulsar-client] " ++ Fmt, Args, #{domain => [pulsar, client_worker]}).
14✔
447

448
%% we use the same ping workflow as it attempts the connection
449
start_keepalive() ->
450
    erlang:send_after(?PING_INTERVAL, self(), ping).
×
451

452
start_check_pong_timeout() ->
453
    erlang:send_after(?PONG_TIMEOUT, self(), check_pong).
×
454

455
%% TODO: use explicit state instead of dictionary
456
pong_received() ->
457
    _ = erlang:put(?PONG_TS, now_ts()),
×
458
    ok.
×
459

460
%% TODO: use explicit state instead of dictionary
461
is_pong_longtime_no_received() ->
462
    case erlang:get(?PONG_TS) of
×
463
        undefined -> false;
×
464
        Ts -> now_ts() - Ts > ?PONG_TIMEOUT
×
465
    end.
466

467
now_ts() ->
468
    erlang:system_time(millisecond).
×
469

470
%% close sockt and flush socket error and closed signals
471
close_socket_and_flush_signals(Sock, Opts) ->
472
    _ = pulsar_socket:close(Sock, Opts),
3✔
473
    receive
3✔
474
        {Transport, Sock, _} when Transport == tcp; Transport == ssl ->
475
            %% race condition
476
            ok;
×
477
        {Error, Sock, _Reason} when Error == ssl_error; Error == tcp_error ->
478
            ok;
×
479
        {Closed, Sock} when Closed == tcp_closed; Closed == ssl_closed ->
480
            ok
×
481
    after
482
        0 ->
483
            ok
3✔
484
    end.
485

486
try_initial_connection(Parent, Server, Opts) ->
487
    case get_alive_sock_opts(Server, undefined, Opts) of
1,148✔
488
        {error, Reason} ->
489
            %% to avoid a hot restart loop leading to max restart
490
            %% intensity when pulsar (or the connection to it) is
491
            %% having a bad day...
492
            Parent ! {self(), {error, Reason}},
1,115✔
493
            timer:sleep(100),
1,115✔
494
            ?MODULE:try_initial_connection(Parent, Server, Opts);
1,110✔
495
        {ok, {Sock, Opts1}} ->
496
            pulsar_socket:controlling_process(Sock, Parent, Opts1),
33✔
497
            Parent ! {self(), {ok, {Sock, Opts1}}}
33✔
498
    end.
499

500
get_service_url_from_lookup_response(Response, Opts) ->
501
    case {Opts, Response} of
79✔
502
        {#{enable_ssl := true}, #{brokerServiceUrlTls := BrokerServiceUrlTls}} ->
503
            BrokerServiceUrlTls;
×
504
        {#{enable_ssl := true}, #{brokerServiceUrl := BrokerServiceUrl}} ->
505
            log_error("SSL enabled but brokerServiceUrlTls is not provided by pulsar,"
×
506
                      " falling back to brokerServiceUrl: ~p", [BrokerServiceUrl]),
507
            BrokerServiceUrl;
×
508
        {_, #{brokerServiceUrl := BrokerServiceUrl}} ->
509
            %% the 'brokerServiceUrl' is a mandatory field in case the SSL is disabled
510
            BrokerServiceUrl
79✔
511
    end.
512

513
%% If we receive a response of lookup type `Redirect', we must re-issue the lookup
514
%% connecting to the returned broker.
515
%% https://pulsar.apache.org/docs/2.11.x/developing-binary-protocol/#lookuptopicresponse
516
handle_redirect_lookup_response(State, From, Response) ->
517
    #state{opts = Opts} = State,
1✔
518
    ServiceURL = get_service_url_from_lookup_response(Response, Opts),
1✔
519
    ReqOpts = #{authoritative => maps:get(authoritative, Response, false)},
1✔
520
    gen_server:reply(From, {redirect, ServiceURL, ReqOpts}),
1✔
521
    State.
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc