• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 798

27 Feb 2025 11:27AM UTC coverage: 72.028%. First build
798

Pull #72

github

zmstone
perf: upgrade crc32cer to 0.1.12
Pull Request #72: 250226 change framing from packet-raw to packet-4

11 of 12 new or added lines in 4 files covered. (91.67%)

927 of 1287 relevant lines covered (72.03%)

326.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.12
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        , format_status/1
41
        , format_status/2
42
        ]).
43

44
%% replayq API
45
-export([ queue_item_sizer/1
46
        , queue_item_marshaller/1
47
        ]).
48

49
-export([handle_response/2]).
50

51
%% for testing only
52
-ifdef(TEST).
53
-export([make_queue_item/2]).
54
-endif.
55

56
-type statem() :: idle | connecting | connected.
57
-type sequence_id() :: integer().
58
-type send_receipt() :: #{ sequence_id := sequence_id()
59
                         , producer_id := integer()
60
                         , highest_sequence_id := sequence_id()
61
                         , message_id := map()
62
                         , any() => term()
63
                         }.
64
-type timestamp() :: integer().
65
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
66
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
67
-type config() :: #{ replayq_dir := string()
68
                   , replayq_max_total_bytes => pos_integer()
69
                   , replayq_seg_bytes => pos_integer()
70
                   , replayq_offload_mode => boolean()
71
                   , max_batch_bytes => pos_integer()
72
                   , producer_name => atom()
73
                   , clientid => atom()
74
                   , callback => callback()
75
                   , batch_size => non_neg_integer()
76
                   , retention_period => timeout()
77
                   }.
78
-export_type([ config/0
79
             ]).
80

81
-define(RECONNECT_TIMEOUT, 5_000).
82
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
83
-define(GET_ALIVE_PULSAR_URL, 5_000).
84

85
-define(MAX_REQ_ID, 4294836225).
86
-define(MAX_SEQ_ID, 18445618199572250625).
87

88
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
89
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
90
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
91
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
92
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
93
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined},
94
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
95
-define(buffer_overflow_discarded, buffer_overflow_discarded).
96
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
97
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
98
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
99

100
%% poorman's error handling.
101
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
102
-define(POORMAN(SOCK, EXPR),
103
        case (EXPR) of
104
            ok ->
105
                ok;
106
            {error, Reason} ->
107
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
108
                ok
109
        end).
110

111
-type state_observer_callback() :: {function(), [term()]}.
112
-type state() :: #{
113
                   batch_size := non_neg_integer(),
114
                   broker_server := {binary(), pos_integer()},
115
                   callback := undefined | mfa() | fun((map()) -> ok),
116
                   clientid := atom(),
117
                   inflight_calls := non_neg_integer(),
118
                   lookup_topic_request_ref := reference() | undefined,
119
                   opts := map(),
120
                   parent_pid := undefined | pid(),
121
                   partitiontopic := string(),
122
                   producer_id := integer(),
123
                   producer_name := atom(),
124
                   proxy_to_broker_url := undefined | string(),
125
                   replayq := replayq:q(),
126
                   replayq_offload_mode := boolean(),
127
                   request_id := integer(),
128
                   requests := #{sequence_id() =>
129
                                     ?INFLIGHT_REQ(
130
                                        replayq:ack_ref(),
131
                                        [{gen_statem:from() | undefined,
132
                                          {timestamp(), [pulsar:message()]}}],
133
                                        _BatchSize :: non_neg_integer()
134
                                       )},
135
                   sequence_id := sequence_id(),
136
                   state_observer_callback := undefined | state_observer_callback(),
137
                   sock := undefined | port(),
138
                   telemetry_metadata := map()
139
                  }.
140
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
141
-type per_request_callback() :: {function(), [term()]}.
142
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
143
-type send_opts() :: #{callback_fn => per_request_callback()}.
144
-export_type([send_opts/0]).
145

146
callback_mode() -> [state_functions, state_enter].
22✔
147

148
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
149
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, []).
22✔
150

151
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
152
send(Pid, Messages) ->
153
    send(Pid, Messages, _Opts = #{}).
×
154

155
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
156
send(Pid, Messages, Opts) ->
157
    From = case maps:get(callback_fn, Opts, undefined) of
1,816✔
158
               undefined -> undefined;
1,813✔
159
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
160
           end,
161
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,816✔
162
    {ok, Pid}.
1,816✔
163

164
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
165
          {ok, send_receipt()}
166
        | {error, producer_connecting
167
                | producer_disconnected
168
                | term()}.
169
send_sync(Pid, Messages) ->
170
    send_sync(Pid, Messages, 5_000).
×
171

172
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
173
          {ok, send_receipt()}
174
        | {error, producer_connecting
175
                | producer_disconnected
176
                | term()}.
177
send_sync(Pid, Messages, Timeout) ->
178
    Caller = self(),
6✔
179
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
6✔
180
    %% Mimicking gen_statem's From, so the reply can be sent with
181
    %% `gen_statem:reply/2'
182
    From = {Caller, MRef},
6✔
183
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
6✔
184
    receive
6✔
185
        {MRef, Response} ->
186
            erlang:demonitor(MRef, [flush]),
6✔
187
            Response;
6✔
188
        {'DOWN', MRef, process, Pid, Reason} ->
189
            error({producer_down, Reason})
×
190
    after
191
        Timeout ->
192
            erlang:demonitor(MRef, [flush]),
×
193
            receive
×
194
                {MRef, Response} ->
195
                    Response
×
196
            after
197
                0 ->
198
                    error(timeout)
×
199
            end
200
    end.
201

202
-spec get_state(pid()) -> statem().
203
get_state(Pid) ->
204
    gen_statem:call(Pid, get_state, 5_000).
2✔
205

206
%%--------------------------------------------------------------------
207
%% gen_statem callback
208
%%--------------------------------------------------------------------
209

210
-spec init({string(), string(), string() | undefined, config()}) ->
211
          gen_statem:init_result(statem(), state()).
212
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
213
    process_flag(trap_exit, true),
22✔
214
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
22✔
215
    ProducerID = maps:get(producer_id, ProducerOpts0),
22✔
216
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
22✔
217
    ReplayqCfg0 =
22✔
218
        case maps:get(replayq_dir, ProducerOpts0, false) of
219
            false ->
220
                #{mem_only => true};
15✔
221
            BaseDir ->
222
                PartitionTopicPath = escape(PartitionTopic),
7✔
223
                Dir = filename:join([BaseDir, PartitionTopicPath]),
7✔
224
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
7✔
225
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
7✔
226
        end,
227
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
22✔
228
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
22✔
229
    ReplayqCfg =
22✔
230
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
231
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
232
                    , max_total_bytes => MaxTotalBytes
233
                    },
234
    Q = replayq:open(ReplayqCfg),
22✔
235
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
22✔
236
    %% drop replayq options, now that it's open.
237
    ProducerOpts = maps:without([ replayq_dir
22✔
238
                                , replayq_seg_bytes
239
                                , replayq_offload_mode
240
                                , replayq_max_total_bytes
241
                                ],
242
                                ProducerOpts1),
243
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
22✔
244
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
22✔
245
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
22✔
246
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
22✔
247
    State = #{
22✔
248
        batch_size => maps:get(batch_size, ProducerOpts, 0),
249
        broker_server => BrokerServer,
250
        callback => maps:get(callback, ProducerOpts, undefined),
251
        clientid => maps:get(clientid, ProducerOpts),
252
        inflight_calls => 0,
253
        lookup_topic_request_ref => undefined,
254
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
255
        parent_pid => ParentPid,
256
        partitiontopic => PartitionTopic,
257
        producer_id => ProducerID,
258
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
259
        proxy_to_broker_url => ProxyToBrokerUrl,
260
        replayq => Q,
261
        replayq_offload_mode => Offload,
262
        request_id => 1,
263
        requests => #{},
264
        sequence_id => 1,
265
        state_observer_callback => StateObserverCallback,
266
        sock => undefined,
267
        telemetry_metadata => TelemetryMetadata
268
    },
269
    pulsar_metrics:inflight_set(State, 0),
22✔
270
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
22✔
271
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
22✔
272
    {ok, idle, State, [{next_event, internal, do_connect}]}.
22✔
273

274
%% idle state
275
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
276
          handler_result().
277
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
278
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
30✔
279
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
30✔
280
    keep_state_and_data;
30✔
281
idle(internal, do_connect, State) ->
282
    refresh_urls_and_connect(State);
22✔
283
idle(state_timeout, do_connect, State) ->
284
    refresh_urls_and_connect(State);
11✔
285
idle(state_timeout, lookup_topic_timeout, State0) ->
286
    log_error("timed out waiting for lookup topic response", [], State0),
4✔
287
    State = State0#{lookup_topic_request_ref := undefined},
4✔
288
    ?NEXT_STATE_IDLE_RECONNECT(State);
4✔
289
idle({call, From}, get_state, _State) ->
290
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
291
idle({call, From}, _EventContent, _State) ->
292
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
293
idle(cast, _EventContent, _State) ->
294
    keep_state_and_data;
×
295
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
296
    State = enqueue_send_requests([SendRequest], State0),
653✔
297
    {keep_state, State};
653✔
298
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
299
    State = State0#{lookup_topic_request_ref := undefined},
27✔
300
    erlang:demonitor(Ref, [flush]),
27✔
301
    handle_lookup_topic_reply(Reply, State);
27✔
302
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
303
    {stop, Reason};
×
304
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
305
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
306
    State = State0#{lookup_topic_request_ref := undefined},
×
307
    try_close_socket(State),
×
308
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
309
idle(_EventType, _Event, _State) ->
310
    keep_state_and_data.
8✔
311

312
%% connecting state
313
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
314
          handler_result().
315
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
316
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
317
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
318
    keep_state_and_data;
27✔
319
connecting(state_timeout, do_connect, State) ->
320
    refresh_urls_and_connect(State);
×
321
connecting(state_timeout, lookup_topic_timeout, State0) ->
322
    log_error("timed out waiting for lookup topic response", [], State0),
×
323
    State = State0#{lookup_topic_request_ref := undefined},
×
324
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
325
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
326
    State = enqueue_send_requests([SendRequest], State0),
×
327
    {keep_state, State};
×
328
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
329
    State = State0#{lookup_topic_request_ref := undefined},
×
330
    erlang:demonitor(Ref, [flush]),
×
331
    handle_lookup_topic_reply(Reply, State);
×
332
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
333
    {stop, Reason};
×
334
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
335
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
336
    State = State0#{lookup_topic_request_ref := undefined},
×
337
    try_close_socket(State),
×
338
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
339
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
340
    handle_socket_close(connecting, Sock, Reason, State);
×
341
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
342
    handle_socket_close(connecting, Sock, closed, State);
×
343
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
344
    handle_socket_close(connecting, Sock, Reason, State);
×
345
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
346
    handle_socket_close(connecting, Sock, Reason, State);
×
347
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
348
    Cmd = pulsar_protocol_frame:parse(Bin),
54✔
349
    ?MODULE:handle_response(Cmd, State);
54✔
350
connecting(info, Msg, State) ->
351
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
352
    keep_state_and_data;
×
353
connecting({call, From}, get_state, _State) ->
354
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
355
connecting({call, From}, _EventContent, _State) ->
356
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
357
connecting(cast, _EventContent, _State) ->
358
   keep_state_and_data.
×
359

360
%% connected state
361
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
362
          handler_result().
363
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
364
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
365
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
366
    State1 = resend_sent_requests(State0),
27✔
367
    State = maybe_send_to_pulsar(State1),
27✔
368
    {keep_state, State};
27✔
369
connected(state_timeout, do_connect, _State) ->
370
    keep_state_and_data;
×
371
connected(state_timeout, lookup_topic_timeout, State0) ->
372
    log_error("timed out waiting for lookup topic response", [], State0),
×
373
    %% todo: should demonitor reference
374
    State = State0#{lookup_topic_request_ref := undefined},
×
375
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
376
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
377
    ?tp(pulsar_producer_send_req_enter, #{}),
1,169✔
378
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,169✔
379
    State1 = enqueue_send_requests(SendRequests, State0),
1,169✔
380
    State = maybe_send_to_pulsar(State1),
1,169✔
381
    ?tp(pulsar_producer_send_req_exit, #{}),
1,169✔
382
    {keep_state, State};
1,169✔
383
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
384
    State = State0#{lookup_topic_request_ref := undefined},
×
385
    erlang:demonitor(Ref, [flush]),
×
386
    handle_lookup_topic_reply(Reply, State);
×
387
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
388
    {stop, Reason};
×
389
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
390
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
391
    State = State0#{lookup_topic_request_ref := undefined},
×
392
    try_close_socket(State),
×
393
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
394
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
395
    handle_socket_close(connected, Sock, Reason, State);
1✔
396
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
397
    handle_socket_close(connected, Sock, closed, State);
8✔
398
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
399
    handle_socket_close(connected, Sock, Reason, State);
×
400
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
401
    handle_socket_close(connected, Sock, Reason, State);
×
402
connected(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
403
    Cmd = pulsar_protocol_frame:parse(Bin),
1,035✔
404
    ?MODULE:handle_response(Cmd, State);
1,035✔
405
connected(_EventType, ping, State = #{sock := Sock, opts := Opts}) ->
406
    ?POORMAN(Sock, pulsar_socket:ping(Sock, Opts)),
6✔
407
    {keep_state, State};
6✔
408
connected({call, From}, get_state, _State) ->
409
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
410
connected({call, From}, _EventContent, _State) ->
411
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
412
connected(cast, _EventContent, _State) ->
413
    keep_state_and_data;
×
414
connected(_EventType, EventContent, State) ->
NEW
415
    ?MODULE:handle_response(EventContent, State).
×
416

417
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
418
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
419
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
420
    try_close_socket(State),
8✔
421
    ?NEXT_STATE_IDLE_RECONNECT(State);
8✔
422
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
423
    %% stale close event
424
    keep_state_and_data.
1✔
425

426
-spec refresh_urls_and_connect(state()) -> handler_result().
427
refresh_urls_and_connect(State0) ->
428
    %% if Pulsar went down and then restarted later, we must issue a
429
    %% LookupTopic command again after reconnecting.
430
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
431
    %% > Topic lookup needs to be performed each time a client needs
432
    %% > to create or reconnect a producer or a consumer. Lookup is used
433
    %% > to discover which particular broker is serving the topic we are
434
    %% > about to use.
435
    %% Simply looking up the topic (even from a distinct connection)
436
    %% will "unblock" the topic so we may send messages to it.  The
437
    %% producer may be started only after that.
438
    #{ clientid := ClientId
33✔
439
     , partitiontopic := PartitionTopic
440
     } = State0,
441
    ?tp(debug, pulsar_producer_refresh_start, #{}),
33✔
442
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
33✔
443
        {ok, LookupTopicRequestRef} ->
444
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
33✔
445
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
33✔
446
    catch
447
        exit:{noproc, _} ->
448
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
449
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
450
    end.
451

452
-spec do_connect(state()) -> handler_result().
453
do_connect(State) ->
454
    #{ broker_server := {Host, Port}
27✔
455
     , opts := Opts
456
     , proxy_to_broker_url := ProxyToBrokerUrl
457
     } = State,
458
    try pulsar_socket:connect(Host, Port, Opts) of
27✔
459
        {ok, Sock} ->
460
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
27✔
461
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
27✔
462
            {next_state, connecting, State#{sock := Sock}};
27✔
463
        {error, Reason} ->
464
            log_error("error connecting: ~p", [Reason], State),
×
465
            try_close_socket(State),
×
466
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
467
    catch
468
        Kind:Error:Stacktrace ->
469
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
470
            try_close_socket(State),
×
471
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
472
    end.
473

474
format_status(Status) ->
475
    maps:map(
2✔
476
      fun(data, Data0) ->
477
              censor_secrets(Data0);
2✔
478
         (_Key, Value)->
479
              Value
12✔
480
      end,
481
      Status).
482

483
%% `format_status/2' is deprecated as of OTP 25.0
484
format_status(_Opt, [_PDict, _State0, Data0]) ->
485
    Data = censor_secrets(Data0),
×
486
    [{data, [{"State", Data}]}].
×
487

488
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
489
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
490
censor_secrets(Data) ->
491
    Data.
2✔
492

493
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
494
    ok = replayq:close(Q),
17✔
495
    ok = clear_gauges(State, Q),
17✔
496
    ok.
17✔
497

498
clear_gauges(State, Q) ->
499
    pulsar_metrics:inflight_set(State, 0),
17✔
500
    maybe_reset_queuing(State, Q),
17✔
501
    ok.
17✔
502

503
maybe_reset_queuing(State, Q) ->
504
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
17✔
505
        {0, _} ->
506
            pulsar_metrics:queuing_set(State, 0),
17✔
507
            pulsar_metrics:queuing_bytes_set(State, 0);
17✔
508
        {_, false} ->
509
            pulsar_metrics:queuing_set(State, 0),
×
510
            pulsar_metrics:queuing_bytes_set(State, 0);
×
511
        {_, _} ->
512
            ok
×
513
    end.
514

515
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
516
    false;
×
517
is_replayq_durable(_, Q) ->
518
    not replayq:is_mem_only(Q).
17✔
519

520
-spec handle_response(_EventContent, state()) ->
521
          handler_result().
522
handle_response({connected, _ConnectedData}, State0 = #{
523
        sock := Sock,
524
        opts := Opts,
525
        producer_id := ProducerId,
526
        request_id := RequestId,
527
        partitiontopic := PartitionTopic
528
    }) ->
529
    start_keepalive(),
27✔
530
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
27✔
531
    {keep_state, next_request_id(State0)};
27✔
532
handle_response({producer_success, #{producer_name := ProName}}, State) ->
533
    {next_state, connected, State#{producer_name := ProName}};
27✔
534
handle_response({pong, #{}}, _State) ->
535
    start_keepalive(),
3✔
536
    keep_state_and_data;
3✔
537
handle_response({ping, #{}}, #{sock := Sock, opts := Opts}) ->
538
    ?POORMAN(Sock, pulsar_socket:pong(Sock, Opts)),
5✔
539
    keep_state_and_data;
5✔
540
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
541
                                                }) ->
542
    log_error("Close producer: ~p~n", [Topic], State),
×
543
    try_close_socket(State),
×
544
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
545
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
546
    #{ callback := Callback
1,026✔
547
     , inflight_calls := InflightCalls0
548
     , requests := Reqs
549
     , replayq := Q
550
     } = State,
551
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
1,026✔
552
    case maps:get(SequenceId, Reqs, undefined) of
1,026✔
553
        undefined ->
554
            _ = invoke_callback(Callback, {ok, Resp}),
×
555
            {keep_state, State};
×
556
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
557
            ok = replayq:ack(Q, QAckRef),
1,026✔
558
            lists:foreach(
1,026✔
559
              fun({undefined, {_TS, Messages}}) ->
560
                   BatchLen = length(Messages),
1,805✔
561
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,805✔
562
                   ok;
1,805✔
563
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
564
                   %% No need to count the messages, as we invoke
565
                   %% per-request callbacks once for the whole batch.
566
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
567
                   ok;
1✔
568
                 ({From, {_TS, _Messages}}) ->
569
                   gen_statem:reply(From, {ok, Resp})
6✔
570
              end,
571
              FromsToMessages),
572
            InflightCalls = InflightCalls0 - BatchSize,
1,026✔
573
            pulsar_metrics:inflight_set(State, InflightCalls),
1,026✔
574
            {keep_state, State#{ requests := maps:remove(SequenceId, Reqs)
1,026✔
575
                               , inflight_calls := InflightCalls
576
                               }}
577
    end;
578
handle_response({error, #{error := Error, message := Msg}}, State) ->
579
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
580
    try_close_socket(State),
×
581
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
582
handle_response(Msg, State) ->
583
    log_error("Receive unknown message:~p~n", [Msg], State),
×
584
    keep_state_and_data.
×
585

586
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
587
send_batch_payload(Messages, SequenceId, #{
588
            partitiontopic := Topic,
589
            producer_id := ProducerId,
590
            producer_name := ProducerName,
591
            sock := Sock,
592
            opts := Opts
593
        }) ->
594
    ?POORMAN(Sock, pulsar_socket:send_batch_message_packet(Sock, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts)).
1,581✔
595

596
start_keepalive() ->
597
    erlang:send_after(30_000, self(), ping).
30✔
598

599
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
600
    State#{request_id := 1};
×
601
next_request_id(State = #{request_id := RequestId}) ->
602
    State#{request_id := RequestId + 1}.
27✔
603

604
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
605
    State#{sequence_id := 1};
×
606
next_sequence_id(State = #{sequence_id := SequenceId}) ->
607
    State#{sequence_id := SequenceId + 1}.
1,180✔
608

609
-spec log_debug(string(), [term()], state()) -> ok.
610
log_debug(Fmt, Args, State) ->
611
    do_log(debug, Fmt, Args, State).
54✔
612

613
-spec log_info(string(), [term()], state()) -> ok.
614
log_info(Fmt, Args, State) ->
615
    do_log(info, Fmt, Args, State).
×
616

617
-spec log_warn(string(), [term()], state()) -> ok.
618
log_warn(Fmt, Args, State) ->
619
    do_log(warning, Fmt, Args, State).
2✔
620

621
-spec log_error(string(), [term()], state()) -> ok.
622
log_error(Fmt, Args, State) ->
623
    do_log(error, Fmt, Args, State).
12✔
624

625
-spec do_log(atom(), string(), [term()], state()) -> ok.
626
do_log(Level, Format, Args, State) ->
627
    #{partitiontopic := PartitionTopic} = State,
68✔
628
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
68✔
629
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
630

631
-spec invoke_callback(callback(), callback_input()) -> ok.
632
invoke_callback(Callback, Resp) ->
633
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
634

635
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
636
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
637
    ok;
×
638
invoke_callback({M, F, A}, Resp, BatchLen) ->
639
    lists:foreach(
1,812✔
640
      fun(_) ->
641
        erlang:apply(M, F, [Resp] ++ A)
2,259✔
642
      end,  lists:seq(1, BatchLen));
643
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
644
    lists:foreach(
×
645
      fun(_) ->
646
        Callback(Resp)
×
647
      end,  lists:seq(1, BatchLen));
648
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
649
    %% for per-request callbacks, we invoke it only once, regardless
650
    %% of how many messages were sent.
651
    apply(Fn, Args ++ [Resp]).
3✔
652

653
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
654
    erlang:external_size(Item).
2,125✔
655

656
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
657
  term_to_binary(I);
508✔
658
queue_item_marshaller(Bin) when is_binary(Bin) ->
659
  case binary_to_term(Bin) of
304✔
660
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
661
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
662
              true ->
663
                  Item;
1✔
664
              false ->
665
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
666
          end;
667
      Item ->
668
          Item
302✔
669
  end.
670

671
now_ts() ->
672
    erlang:system_time(millisecond).
3,041✔
673

674
make_queue_item(From, Messages) ->
675
    ?Q_ITEM(From, now_ts(), Messages).
1,824✔
676

677
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
678
    QItems = lists:map(
1,822✔
679
               fun(?SEND_REQ(From, Messages)) ->
680
                 make_queue_item(From, Messages)
1,822✔
681
               end,
682
               Requests),
683
    NewQ = replayq:append(Q, QItems),
1,822✔
684
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,822✔
685
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(NewQ)),
1,822✔
686
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,822✔
687
    Overflow = replayq:overflow(NewQ),
1,822✔
688
    handle_overflow(State#{replayq := NewQ}, Overflow).
1,822✔
689

690
-spec handle_overflow(state(), integer()) -> state().
691
handle_overflow(State, Overflow) when Overflow =< 0 ->
692
    %% no overflow
693
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,818✔
694
    State;
1,818✔
695
handle_overflow(State0 = #{replayq := Q, callback := Callback}, Overflow) ->
696
    {NewQ, QAckRef, Items0} =
4✔
697
        replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
698
    ok = replayq:ack(NewQ, QAckRef),
4✔
699
    maybe_log_discard(State0, length(Items0)),
4✔
700
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
701
    reply_with_error(Items, Callback, {error, overflow}),
4✔
702
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
703
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
704
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
705
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
706
    State0#{replayq := NewQ}.
4✔
707

708
maybe_log_discard(State, Increment) ->
709
    Last = get_overflow_log_state(),
1,825✔
710
    #{ count_since_last_log := CountLast
1,825✔
711
     , total_count := TotalCount
712
     } = Last,
713
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,825✔
714
        true -> %% no change
715
            ok;
1,819✔
716
        false ->
717
            maybe_log_discard(State, Increment, Last)
6✔
718
    end.
719

720
-spec maybe_log_discard(
721
        state(),
722
        non_neg_integer(),
723
        #{ last_log_inst => non_neg_integer()
724
         , count_since_last_log => non_neg_integer()
725
         , total_count => non_neg_integer()
726
         }) -> ok.
727
maybe_log_discard(State,
728
                  Increment,
729
                  #{ last_log_inst := LastInst
730
                   , count_since_last_log := CountLast
731
                   , total_count := TotalCount
732
                   }) ->
733
    NowInst = now_ts(),
6✔
734
    NewTotalCount = TotalCount + Increment,
6✔
735
    Delta = NewTotalCount - CountLast,
6✔
736
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
737
        true ->
738
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
739
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
740
                                    , count_since_last_log => NewTotalCount
741
                                    , total_count => NewTotalCount
742
                                    });
743
        false ->
744
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
745
                                    , count_since_last_log => CountLast
746
                                    , total_count => NewTotalCount
747
                                    })
748
    end.
749

750
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
751
                                   , count_since_last_log => non_neg_integer()
752
                                   , total_count => non_neg_integer()
753
                                   }.
754
get_overflow_log_state() ->
755
    case get(?buffer_overflow_discarded) of
1,827✔
756
        undefined ->
757
            #{ last_log_inst => 0
1,820✔
758
             , count_since_last_log => 0
759
             , total_count => 0
760
             };
761
        Stats = #{} ->
762
            Stats
7✔
763
    end.
764

765
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
766
                              , count_since_last_log => non_neg_integer()
767
                              , total_count => non_neg_integer()
768
                              }) -> ok.
769
put_overflow_log_state(#{ last_log_inst := _LastInst
770
                        , count_since_last_log := _CountLast
771
                        , total_count := _TotalCount
772
                        } = Stats) ->
773
    put(?buffer_overflow_discarded, Stats),
8✔
774
    ok.
8✔
775

776
maybe_send_to_pulsar(State) ->
777
    #{replayq := Q} = State,
2,378✔
778
    case replayq:count(Q) =:= 0 of
2,378✔
779
        true ->
780
            State;
1,196✔
781
        false ->
782
            do_send_to_pulsar(State)
1,182✔
783
    end.
784

785
do_send_to_pulsar(State0) ->
786
    #{ batch_size := BatchSize
1,182✔
787
     , inflight_calls := InflightCalls0
788
     , sequence_id := SequenceId
789
     , requests := Requests0
790
     , replayq := Q
791
     , opts := ProducerOpts
792
     } = State0,
793
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
1,182✔
794
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
1,182✔
795
                                             , bytes_limit => MaxBatchBytes
796
                                             }),
797
    State1 = State0#{replayq := NewQ},
1,182✔
798
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
1,182✔
799
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
1,182✔
800
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
1,182✔
801
    Now = now_ts(),
1,182✔
802
    {Expired, FromsToMessages} =
1,182✔
803
       lists:foldr(
804
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
805
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,970✔
806
             true ->
807
               {[{From, Msgs} | Expired], Acc};
4✔
808
             false ->
809
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,966✔
810
           end
811
         end,
812
         {[], []},
813
         Items),
814
    reply_expired_messages(Expired, State1),
1,182✔
815
    pulsar_metrics:dropped_inc(State1, length(Expired)),
1,182✔
816
    case FromsToMessages of
1,182✔
817
        [] ->
818
            %% all expired, immediately ack replayq batch and continue
819
            ok = replayq:ack(Q, QAckRef),
2✔
820
            maybe_send_to_pulsar(State1);
2✔
821
        [_ | _] ->
822
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
1,180✔
823
                                     FromsToMessages,
1,180✔
824
                                 Msg <- Msgs],
1,966✔
825
            FinalBatchSize = length(FinalBatch),
1,180✔
826
            send_batch_payload(FinalBatch, SequenceId, State0),
1,180✔
827
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
1,180✔
828
            InflightCalls = InflightCalls0 + FinalBatchSize,
1,180✔
829
            pulsar_metrics:inflight_set(State1, InflightCalls),
1,180✔
830
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
1,180✔
831
            State = next_sequence_id(State2),
1,180✔
832
            maybe_send_to_pulsar(State)
1,180✔
833
    end.
834

835
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
836
                               [pulsar:message()]}],
837
                             state()) -> ok.
838
reply_expired_messages(Expired, #{callback := Callback}) ->
839
    reply_with_error(Expired, Callback, {error, expired}).
1,183✔
840

841
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
842
                         [pulsar:message()]}],
843
                       callback(), {error, expired | overflow}) -> ok.
844
reply_with_error(Items, Callback, Error) ->
845
    lists:foreach(
1,187✔
846
      fun({undefined, Msgs}) ->
847
              invoke_callback(Callback, Error, length(Msgs));
7✔
848
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
849
              %% No need to count the messages, as we invoke
850
              %% per-request callbacks once for the whole batch.
851
              invoke_callback({Fn, Args}, Error);
2✔
852
         ({From, _Msgs}) ->
853
              gen_statem:reply(From, Error)
×
854
      end,
855
      Items).
856

857
collect_send_requests(Acc, Limit) ->
858
    Count = length(Acc),
1,169✔
859
    do_collect_send_requests(Acc, Count, Limit).
1,169✔
860

861
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
862
    lists:reverse(Acc);
×
863
do_collect_send_requests(Acc, Count, Limit) ->
864
    receive
1,169✔
865
        ?SEND_REQ(_, _) = Req ->
866
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
867
    after
868
        0 ->
869
            lists:reverse(Acc)
1,169✔
870
    end.
871

872
try_close_socket(#{sock := undefined}) ->
873
    ok;
×
874
try_close_socket(#{sock := Sock, opts := Opts}) ->
875
    _ = pulsar_socket:close(Sock, Opts),
8✔
876
    ok.
8✔
877

878
resend_sent_requests(State) ->
879
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
27✔
880
    #{ inflight_calls := InflightCalls0
27✔
881
     , requests := Requests0
882
     , replayq := Q
883
     , opts := ProducerOpts
884
     } = State,
885
    Now = now_ts(),
27✔
886
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
27✔
887
    {Requests, Dropped} =
27✔
888
        maps:fold(
889
          fun(SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize), {AccIn, DroppedAcc}) ->
890
               {Messages, Expired} =
402✔
891
                  lists:partition(
892
                    fun({_From, {Ts, _Msgs}}) ->
893
                      not is_batch_expired(Ts, RetentionPeriod, Now)
402✔
894
                    end,
895
                    FromsToMessages),
896
               lists:foreach(
402✔
897
                 fun({From, {_Ts, Msgs}}) ->
898
                   reply_expired_messages([{From, Msgs}], State)
1✔
899
                 end,
900
                 Expired),
901
               Dropped = length(Expired),
402✔
902
               Acc = case Messages of
402✔
903
                   [] ->
904
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
905
                       ok = replayq:ack(Q, QAckRef),
1✔
906
                       AccIn;
1✔
907
                   [_ | _] ->
908
                       send_batch_payload([Msg || {_From, {_Ts, Msgs}} <- Messages,
401✔
909
                                                  Msg <- Msgs],
401✔
910
                                          SequenceId, State),
911
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(Messages))}
401✔
912
               end,
913
               {Acc, DroppedAcc + Dropped}
402✔
914
          end,
915
          {#{}, 0},
916
          Requests0),
917
    InflightCalls = InflightCalls0 - Dropped,
27✔
918
    pulsar_metrics:dropped_inc(State, Dropped),
27✔
919
    pulsar_metrics:inflight_set(State, InflightCalls),
27✔
920
    State#{requests := Requests, inflight_calls := InflightCalls}.
27✔
921

922
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
923
    false;
2,363✔
924
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
925
    Timestamp =< Now - RetentionPeriod.
9✔
926

927
-spec escape(string()) -> binary().
928
escape(Str) ->
929
    NormalizedStr = unicode:characters_to_nfd_list(Str),
7✔
930
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
7✔
931

932
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
933
handle_lookup_topic_reply({error, Error}, State) ->
934
    log_error("error looking up topic: ~0p", [Error], State),
×
935
    try_close_socket(State),
×
936
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
937
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
938
                                , brokerServiceUrl := NewBrokerServiceURL
939
                                }}, State0) ->
940
    #{clientid := ClientId} = State0,
27✔
941
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
27✔
942
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
27✔
943
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
27✔
944
        {ok, AlivePulsarURL} ->
945
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
27✔
946
                           , alive_pulsar_url => AlivePulsarURL
947
                           }, State0);
948
        {error, Reason} ->
949
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
950
            try_close_socket(State0),
×
951
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
952
    catch
953
        exit:{noproc, _} ->
954
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
955
            try_close_socket(State0),
×
956
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
957
        exit:{timeout, _} ->
958
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
×
959
            try_close_socket(State0),
×
960
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
961
    end;
962
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
963
                                , brokerServiceUrl := NewBrokerServiceURL
964
                                }},
965
                         State) ->
966
    log_debug("received topic lookup reply: ~0p",
×
967
              [#{proxy_through_service_url => false,
968
                 broker_service_url => NewBrokerServiceURL}], State),
969
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
970
                   , broker_service_url => undefined
971
                   }, State).
972

973
-spec maybe_connect(#{ alive_pulsar_url := string()
974
                     , broker_service_url := string() | undefined
975
                     }, state()) -> handler_result().
976
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
977
               , alive_pulsar_url := AlivePulsarURL
978
               }, State0) ->
979
    #{ broker_server := OldBrokerServer
27✔
980
     , proxy_to_broker_url := OldBrokerServiceURL
981
     } = State0,
982
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
27✔
983
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
27✔
984
        true ->
985
            log_debug("connecting to ~0p",
27✔
986
                      [#{broker_server => NewBrokerServer,
987
                         service_url => NewBrokerServiceURL}], State0),
988
            do_connect(State0);
27✔
989
        false ->
990
            %% broker changed; reconnect.
991
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
992
                      [ #{ broker_server => OldBrokerServer
993
                         , proxy_url => OldBrokerServiceURL
994
                         }
995
                      , #{ broker_server => NewBrokerServer
996
                         , proxy_url => NewBrokerServiceURL
997
                         }
998
                      ],
999
                      State0),
1000
            try_close_socket(State0),
×
1001
            State = State0#{
×
1002
                broker_server := NewBrokerServer,
1003
                proxy_to_broker_url := NewBrokerServiceURL
1004
            },
1005
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1006
    end.
1007

1008
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1009
notify_state_change(undefined, _ProducerState) ->
1010
    ok;
×
1011
notify_state_change({Fun, Args}, ProducerState) ->
1012
    _ = apply(Fun, [ProducerState | Args]),
84✔
1013
    ok.
84✔
1014

1015
-ifdef(TEST).
1016
-include_lib("eunit/include/eunit.hrl").
1017

1018
maybe_log_discard_test_() ->
1019
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1020
    , {"fake-last-old",
1021
       fun() ->
1022
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1023
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1024
                                      , count_since_last_log => 2
1025
                                      , total_count => 2
1026
                                      }),
1027
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1028
         Stats = get_overflow_log_state(),
1✔
1029
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1030
         %% greater than the minimum interval because we just logged
1031
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1032
         ok
1✔
1033
       end}
1034
    , {"fake-last-fresh",
1035
       fun() ->
1036
         Inst0 = now_ts(),
1✔
1037
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1038
                                      , count_since_last_log => 2
1039
                                      , total_count => 2
1040
                                      }),
1041
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1042
         Stats = get_overflow_log_state(),
1✔
1043
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1044
         %% less than the minimum interval because we didn't log and just accumulated
1045
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1046
         ok
1✔
1047
       end}
1048
    ].
1049
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc