• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 785

26 Feb 2025 09:01PM UTC coverage: 72.184%. First build
785

Pull #71

github

zmstone
fix: handle_response should always return new state
Pull Request #71: fix: handle_response should always return new state

13 of 19 new or added lines in 1 file covered. (68.42%)

942 of 1305 relevant lines covered (72.18%)

315.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        , format_status/1
41
        , format_status/2
42
        ]).
43

44
%% replayq API
45
-export([ queue_item_sizer/1
46
        , queue_item_marshaller/1
47
        ]).
48

49
%% for testing only
50
-ifdef(TEST).
51
-export([make_queue_item/2]).
52
-endif.
53

54
-type statem() :: idle | connecting | connected.
55
-type sequence_id() :: integer().
56
-type send_receipt() :: #{ sequence_id := sequence_id()
57
                         , producer_id := integer()
58
                         , highest_sequence_id := sequence_id()
59
                         , message_id := map()
60
                         , any() => term()
61
                         }.
62
-type timestamp() :: integer().
63
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
64
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
65
-type config() :: #{ replayq_dir := string()
66
                   , replayq_max_total_bytes => pos_integer()
67
                   , replayq_seg_bytes => pos_integer()
68
                   , replayq_offload_mode => boolean()
69
                   , max_batch_bytes => pos_integer()
70
                   , producer_name => atom()
71
                   , clientid => atom()
72
                   , callback => callback()
73
                   , batch_size => non_neg_integer()
74
                   , retention_period => timeout()
75
                   }.
76
-export_type([ config/0
77
             ]).
78

79
-define(RECONNECT_TIMEOUT, 5_000).
80
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
81
-define(GET_ALIVE_PULSAR_URL, 5_000).
82

83
-define(MAX_REQ_ID, 4294836225).
84
-define(MAX_SEQ_ID, 18445618199572250625).
85

86
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
87
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
88
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
89
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
90
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
91
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined},
92
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
93
-define(buffer_overflow_discarded, buffer_overflow_discarded).
94
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
95
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
96
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
97

98
%% poorman's error handling.
99
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
100
-define(POORMAN(SOCK, EXPR),
101
        case (EXPR) of
102
            ok ->
103
                ok;
104
            {error, Reason} ->
105
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
106
                ok
107
        end).
108

109
-type state_observer_callback() :: {function(), [term()]}.
110
-type state() :: #{
111
                   batch_size := non_neg_integer(),
112
                   broker_server := {binary(), pos_integer()},
113
                   callback := undefined | mfa() | fun((map()) -> ok),
114
                   clientid := atom(),
115
                   inflight_calls := non_neg_integer(),
116
                   last_bin := binary(),
117
                   lookup_topic_request_ref := reference() | undefined,
118
                   opts := map(),
119
                   parent_pid := undefined | pid(),
120
                   partitiontopic := string(),
121
                   producer_id := integer(),
122
                   producer_name := atom(),
123
                   proxy_to_broker_url := undefined | string(),
124
                   replayq := replayq:q(),
125
                   replayq_offload_mode := boolean(),
126
                   request_id := integer(),
127
                   requests := #{sequence_id() =>
128
                                     ?INFLIGHT_REQ(
129
                                        replayq:ack_ref(),
130
                                        [{gen_statem:from() | undefined,
131
                                          {timestamp(), [pulsar:message()]}}],
132
                                        _BatchSize :: non_neg_integer()
133
                                       )},
134
                   sequence_id := sequence_id(),
135
                   state_observer_callback := undefined | state_observer_callback(),
136
                   sock := undefined | port(),
137
                   telemetry_metadata := map()
138
                  }.
139
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
140
-type per_request_callback() :: {function(), [term()]}.
141
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
142
-type send_opts() :: #{callback_fn => per_request_callback()}.
143
-export_type([send_opts/0]).
144

145
callback_mode() -> [state_functions, state_enter].
22✔
146

147
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
148
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, []).
22✔
149

150
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
151
send(Pid, Messages) ->
152
    send(Pid, Messages, _Opts = #{}).
×
153

154
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
155
send(Pid, Messages, Opts) ->
156
    From = case maps:get(callback_fn, Opts, undefined) of
1,816✔
157
               undefined -> undefined;
1,813✔
158
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
159
           end,
160
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,816✔
161
    {ok, Pid}.
1,816✔
162

163
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
164
          {ok, send_receipt()}
165
        | {error, producer_connecting
166
                | producer_disconnected
167
                | term()}.
168
send_sync(Pid, Messages) ->
169
    send_sync(Pid, Messages, 5_000).
×
170

171
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
172
          {ok, send_receipt()}
173
        | {error, producer_connecting
174
                | producer_disconnected
175
                | term()}.
176
send_sync(Pid, Messages, Timeout) ->
177
    Caller = self(),
6✔
178
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
6✔
179
    %% Mimicking gen_statem's From, so the reply can be sent with
180
    %% `gen_statem:reply/2'
181
    From = {Caller, MRef},
6✔
182
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
6✔
183
    receive
6✔
184
        {MRef, Response} ->
185
            erlang:demonitor(MRef, [flush]),
6✔
186
            Response;
6✔
187
        {'DOWN', MRef, process, Pid, Reason} ->
188
            error({producer_down, Reason})
×
189
    after
190
        Timeout ->
191
            erlang:demonitor(MRef, [flush]),
×
192
            receive
×
193
                {MRef, Response} ->
194
                    Response
×
195
            after
196
                0 ->
197
                    error(timeout)
×
198
            end
199
    end.
200

201
-spec get_state(pid()) -> statem().
202
get_state(Pid) ->
203
    gen_statem:call(Pid, get_state, 5_000).
2✔
204

205
%%--------------------------------------------------------------------
206
%% gen_statem callback
207
%%--------------------------------------------------------------------
208

209
-spec init({string(), string(), string() | undefined, config()}) ->
210
          gen_statem:init_result(statem(), state()).
211
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
212
    process_flag(trap_exit, true),
22✔
213
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
22✔
214
    ProducerID = maps:get(producer_id, ProducerOpts0),
22✔
215
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
22✔
216
    ReplayqCfg0 =
22✔
217
        case maps:get(replayq_dir, ProducerOpts0, false) of
218
            false ->
219
                #{mem_only => true};
15✔
220
            BaseDir ->
221
                PartitionTopicPath = escape(PartitionTopic),
7✔
222
                Dir = filename:join([BaseDir, PartitionTopicPath]),
7✔
223
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
7✔
224
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
7✔
225
        end,
226
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
22✔
227
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
22✔
228
    ReplayqCfg =
22✔
229
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
230
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
231
                    , max_total_bytes => MaxTotalBytes
232
                    },
233
    Q = replayq:open(ReplayqCfg),
22✔
234
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
22✔
235
    %% drop replayq options, now that it's open.
236
    ProducerOpts = maps:without([ replayq_dir
22✔
237
                                , replayq_seg_bytes
238
                                , replayq_offload_mode
239
                                , replayq_max_total_bytes
240
                                ],
241
                                ProducerOpts1),
242
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
22✔
243
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
22✔
244
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
22✔
245
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
22✔
246
    State = #{
22✔
247
        batch_size => maps:get(batch_size, ProducerOpts, 0),
248
        broker_server => BrokerServer,
249
        callback => maps:get(callback, ProducerOpts, undefined),
250
        clientid => maps:get(clientid, ProducerOpts),
251
        inflight_calls => 0,
252
        last_bin => <<>>,
253
        lookup_topic_request_ref => undefined,
254
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
255
        parent_pid => ParentPid,
256
        partitiontopic => PartitionTopic,
257
        producer_id => ProducerID,
258
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
259
        proxy_to_broker_url => ProxyToBrokerUrl,
260
        replayq => Q,
261
        replayq_offload_mode => Offload,
262
        request_id => 1,
263
        requests => #{},
264
        sequence_id => 1,
265
        state_observer_callback => StateObserverCallback,
266
        sock => undefined,
267
        telemetry_metadata => TelemetryMetadata
268
    },
269
    pulsar_metrics:inflight_set(State, 0),
22✔
270
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
22✔
271
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
22✔
272
    {ok, idle, State, [{next_event, internal, do_connect}]}.
22✔
273

274
%% idle state
275
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
276
          handler_result().
277
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
278
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
30✔
279
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
30✔
280
    keep_state_and_data;
30✔
281
idle(internal, do_connect, State) ->
282
    refresh_urls_and_connect(State);
22✔
283
idle(state_timeout, do_connect, State) ->
284
    refresh_urls_and_connect(State);
11✔
285
idle(state_timeout, lookup_topic_timeout, State0) ->
286
    log_error("timed out waiting for lookup topic response", [], State0),
4✔
287
    State = State0#{lookup_topic_request_ref := undefined},
4✔
288
    ?NEXT_STATE_IDLE_RECONNECT(State);
4✔
289
idle({call, From}, get_state, _State) ->
290
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
291
idle({call, From}, _EventContent, _State) ->
292
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
293
idle(cast, _EventContent, _State) ->
294
    keep_state_and_data;
×
295
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
296
    State = enqueue_send_requests([SendRequest], State0),
653✔
297
    {keep_state, State};
653✔
298
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
299
    State = State0#{lookup_topic_request_ref := undefined},
27✔
300
    erlang:demonitor(Ref, [flush]),
27✔
301
    handle_lookup_topic_reply(Reply, State);
27✔
302
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
303
    {stop, Reason};
×
304
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
305
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
306
    State = State0#{lookup_topic_request_ref := undefined},
×
307
    try_close_socket(State),
×
308
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
309
idle(_EventType, _Event, _State) ->
310
    keep_state_and_data.
7✔
311

312
%% connecting state
313
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
314
          handler_result().
315
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
316
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
317
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
318
    keep_state_and_data;
27✔
319
connecting(state_timeout, do_connect, State) ->
320
    refresh_urls_and_connect(State);
×
321
connecting(state_timeout, lookup_topic_timeout, State0) ->
322
    log_error("timed out waiting for lookup topic response", [], State0),
×
323
    State = State0#{lookup_topic_request_ref := undefined},
×
324
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
325
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
326
    State = enqueue_send_requests([SendRequest], State0),
2✔
327
    {keep_state, State};
2✔
328
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
329
    State = State0#{lookup_topic_request_ref := undefined},
×
330
    erlang:demonitor(Ref, [flush]),
×
331
    handle_lookup_topic_reply(Reply, State);
×
332
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
333
    {stop, Reason};
×
334
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
335
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
336
    State = State0#{lookup_topic_request_ref := undefined},
×
337
    try_close_socket(State),
×
338
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
339
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
340
    handle_socket_close(connecting, Sock, Reason, State);
×
341
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
342
    handle_socket_close(connecting, Sock, closed, State);
×
343
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
344
    handle_socket_close(connecting, Sock, Reason, State);
×
345
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
346
    handle_socket_close(connecting, Sock, Reason, State);
×
347
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
348
    handle_and_parse(pulsar_protocol_frame:parse(Bin), State);
54✔
349
connecting(info, Msg, State) ->
350
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
351
    keep_state_and_data;
×
352
connecting({call, From}, get_state, _State) ->
353
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
354
connecting({call, From}, _EventContent, _State) ->
355
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
356
connecting(cast, _EventContent, _State) ->
357
   keep_state_and_data.
×
358

359
%% connected state
360
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
361
          handler_result().
362
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
363
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
364
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
365
    State1 = resend_sent_requests(State0),
27✔
366
    State = maybe_send_to_pulsar(State1),
27✔
367
    {keep_state, State};
27✔
368
connected(state_timeout, do_connect, _State) ->
369
    keep_state_and_data;
×
370
connected(state_timeout, lookup_topic_timeout, State0) ->
371
    log_error("timed out waiting for lookup topic response", [], State0),
×
372
    %% todo: should demonitor reference
373
    State = State0#{lookup_topic_request_ref := undefined},
×
374
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
375
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
376
    ?tp(pulsar_producer_send_req_enter, #{}),
1,167✔
377
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,167✔
378
    State1 = enqueue_send_requests(SendRequests, State0),
1,167✔
379
    State = maybe_send_to_pulsar(State1),
1,167✔
380
    ?tp(pulsar_producer_send_req_exit, #{}),
1,167✔
381
    {keep_state, State};
1,167✔
382
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
383
    State = State0#{lookup_topic_request_ref := undefined},
×
384
    erlang:demonitor(Ref, [flush]),
×
385
    handle_lookup_topic_reply(Reply, State);
×
386
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
387
    {stop, Reason};
×
388
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
389
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
390
    State = State0#{lookup_topic_request_ref := undefined},
×
391
    try_close_socket(State),
×
392
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
393
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
394
    handle_socket_close(connected, Sock, Reason, State);
1✔
395
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
396
    handle_socket_close(connected, Sock, closed, State);
8✔
397
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
398
    handle_socket_close(connected, Sock, Reason, State);
×
399
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
400
    handle_socket_close(connected, Sock, Reason, State);
×
401
connected(_EventType, {Inet, _, Bin}, State = #{last_bin := LastBin})
402
        when Inet == tcp; Inet == ssl ->
403
    handle_and_parse(pulsar_protocol_frame:parse(<<LastBin/binary, Bin/binary>>), State);
651✔
404
connected(_EventType, ping, State = #{sock := Sock, opts := Opts}) ->
405
    ?POORMAN(Sock, pulsar_socket:ping(Sock, Opts)),
6✔
406
    {keep_state, State};
6✔
407
connected({call, From}, get_state, _State) ->
408
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
409
connected({call, From}, _EventContent, _State) ->
410
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
411
connected(cast, _EventContent, _State) ->
412
    keep_state_and_data;
×
413
connected(_EventType, EventContent, State) ->
NEW
414
    handle_and_parse(EventContent, State).
×
415

416
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
417
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
418
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
419
    try_close_socket(State),
8✔
420
    ?NEXT_STATE_IDLE_RECONNECT(State);
8✔
421
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
422
    %% stale close event
423
    keep_state_and_data.
1✔
424

425
-spec refresh_urls_and_connect(state()) -> handler_result().
426
refresh_urls_and_connect(State0) ->
427
    %% if Pulsar went down and then restarted later, we must issue a
428
    %% LookupTopic command again after reconnecting.
429
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
430
    %% > Topic lookup needs to be performed each time a client needs
431
    %% > to create or reconnect a producer or a consumer. Lookup is used
432
    %% > to discover which particular broker is serving the topic we are
433
    %% > about to use.
434
    %% Simply looking up the topic (even from a distinct connection)
435
    %% will "unblock" the topic so we may send messages to it.  The
436
    %% producer may be started only after that.
437
    #{ clientid := ClientId
33✔
438
     , partitiontopic := PartitionTopic
439
     } = State0,
440
    ?tp(debug, pulsar_producer_refresh_start, #{}),
33✔
441
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
33✔
442
        {ok, LookupTopicRequestRef} ->
443
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
33✔
444
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
33✔
445
    catch
446
        exit:{noproc, _} ->
447
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
448
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
449
    end.
450

451
-spec do_connect(state()) -> handler_result().
452
do_connect(State) ->
453
    #{ broker_server := {Host, Port}
27✔
454
     , opts := Opts
455
     , proxy_to_broker_url := ProxyToBrokerUrl
456
     } = State,
457
    try pulsar_socket:connect(Host, Port, Opts) of
27✔
458
        {ok, Sock} ->
459
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
27✔
460
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
27✔
461
            {next_state, connecting, State#{sock := Sock}};
27✔
462
        {error, Reason} ->
463
            log_error("error connecting: ~p", [Reason], State),
×
464
            try_close_socket(State),
×
465
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
466
    catch
467
        Kind:Error:Stacktrace ->
468
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
469
            try_close_socket(State),
×
470
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
471
    end.
472

473
format_status(Status) ->
474
    maps:map(
1✔
475
      fun(data, Data0) ->
476
              censor_secrets(Data0);
1✔
477
         (_Key, Value)->
478
              Value
6✔
479
      end,
480
      Status).
481

482
%% `format_status/2' is deprecated as of OTP 25.0
483
format_status(_Opt, [_PDict, _State0, Data0]) ->
484
    Data = censor_secrets(Data0),
×
485
    [{data, [{"State", Data}]}].
×
486

487
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
488
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
489
censor_secrets(Data) ->
490
    Data.
1✔
491

492
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
493
    ok = replayq:close(Q),
17✔
494
    ok = clear_gauges(State, Q),
17✔
495
    ok.
16✔
496

497
clear_gauges(State, Q) ->
498
    pulsar_metrics:inflight_set(State, 0),
17✔
499
    maybe_reset_queuing(State, Q),
17✔
500
    ok.
16✔
501

502
maybe_reset_queuing(State, Q) ->
503
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
17✔
504
        {0, _} ->
505
            pulsar_metrics:queuing_set(State, 0),
17✔
506
            pulsar_metrics:queuing_bytes_set(State, 0);
17✔
507
        {_, false} ->
508
            pulsar_metrics:queuing_set(State, 0),
×
509
            pulsar_metrics:queuing_bytes_set(State, 0);
×
510
        {_, _} ->
511
            ok
×
512
    end.
513

514
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
515
    false;
×
516
is_replayq_durable(_, Q) ->
517
    not replayq:is_mem_only(Q).
17✔
518

519
handle_and_parse(DecodedHead, State) ->
520
    try
705✔
521
        {keep_state, handle_and_parse2(DecodedHead, State)}
705✔
522
    catch
523
        throw:{connected, State1} ->
524
            {next_state, connected, State1};
27✔
525
        throw:{reconnect, State1} ->
526
            %% discard the remaining bytes (if any)
NEW
527
            ?NEXT_STATE_IDLE_RECONNECT(State1#{last_bin := <<>>})
×
528
    end.
529

530
handle_and_parse2({incomplete, Bin}, State) ->
531
    State#{last_bin := Bin};
678✔
532
handle_and_parse2({Cmd, LastBin}, State) ->
533
    %% push LastBin because handle_response may throw
534
    State2 = handle_response(Cmd, State#{last_bin => LastBin}),
1,089✔
535
    handle_and_parse2(pulsar_protocol_frame:parse(LastBin), State2).
1,062✔
536

537
-spec handle_response(_EventContent, state()) -> state().
538
handle_response({connected, _ConnectedData}, State0 = #{
539
        sock := Sock,
540
        opts := Opts,
541
        producer_id := ProducerId,
542
        request_id := RequestId,
543
        partitiontopic := PartitionTopic
544
    }) ->
545
    start_keepalive(),
27✔
546
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
27✔
547
    next_request_id(State0);
27✔
548
handle_response({producer_success, #{producer_name := ProName}}, State) ->
549
    throw({connected, State#{producer_name := ProName}});
27✔
550
handle_response({pong, #{}}, State) ->
551
    start_keepalive(),
3✔
552
    State;
3✔
553
handle_response({ping, #{}}, #{sock := Sock, opts := Opts} = State) ->
554
    ?POORMAN(Sock, pulsar_socket:pong(Sock, Opts)),
5✔
555
    State;
5✔
556
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
557
                                                }) ->
558
    log_error("Close producer: ~p~n", [Topic], State),
×
559
    try_close_socket(State),
×
NEW
560
    throw({reconnect, State});
×
561
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
562
    #{ callback := Callback
1,027✔
563
     , inflight_calls := InflightCalls0
564
     , requests := Reqs
565
     , replayq := Q
566
     } = State,
567
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
1,027✔
568
    case maps:get(SequenceId, Reqs, undefined) of
1,027✔
569
        undefined ->
570
            _ = invoke_callback(Callback, {ok, Resp}),
×
NEW
571
            State;
×
572
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
573
            ok = replayq:ack(Q, QAckRef),
1,027✔
574
            lists:foreach(
1,027✔
575
              fun({undefined, {_TS, Messages}}) ->
576
                   BatchLen = length(Messages),
1,806✔
577
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,806✔
578
                   ok;
1,806✔
579
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
580
                   %% No need to count the messages, as we invoke
581
                   %% per-request callbacks once for the whole batch.
582
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
583
                   ok;
1✔
584
                 ({From, {_TS, _Messages}}) ->
585
                   gen_statem:reply(From, {ok, Resp})
6✔
586
              end,
587
              FromsToMessages),
588
            InflightCalls = InflightCalls0 - BatchSize,
1,027✔
589
            pulsar_metrics:inflight_set(State, InflightCalls),
1,027✔
590
            State#{ requests := maps:remove(SequenceId, Reqs)
1,027✔
591
                  , inflight_calls := InflightCalls
592
                  }
593
    end;
594
handle_response({error, #{error := Error, message := Msg}}, State) ->
595
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
596
    try_close_socket(State),
×
NEW
597
    throw({reconnect, State});
×
598
handle_response(Msg, State) ->
599
    log_error("Receive unknown message:~p~n", [Msg], State),
×
NEW
600
    State.
×
601

602
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
603
send_batch_payload(Messages, SequenceId, #{
604
            partitiontopic := Topic,
605
            producer_id := ProducerId,
606
            producer_name := ProducerName,
607
            sock := Sock,
608
            opts := Opts
609
        }) ->
610
    ?POORMAN(Sock, pulsar_socket:send_batch_message_packet(Sock, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts)).
1,581✔
611

612
start_keepalive() ->
613
    erlang:send_after(30_000, self(), ping).
30✔
614

615
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
616
    State#{request_id := 1};
×
617
next_request_id(State = #{request_id := RequestId}) ->
618
    State#{request_id := RequestId + 1}.
27✔
619

620
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
621
    State#{sequence_id := 1};
×
622
next_sequence_id(State = #{sequence_id := SequenceId}) ->
623
    State#{sequence_id := SequenceId + 1}.
1,180✔
624

625
-spec log_debug(string(), [term()], state()) -> ok.
626
log_debug(Fmt, Args, State) ->
627
    do_log(debug, Fmt, Args, State).
54✔
628

629
-spec log_info(string(), [term()], state()) -> ok.
630
log_info(Fmt, Args, State) ->
631
    do_log(info, Fmt, Args, State).
×
632

633
-spec log_warn(string(), [term()], state()) -> ok.
634
log_warn(Fmt, Args, State) ->
635
    do_log(warning, Fmt, Args, State).
2✔
636

637
-spec log_error(string(), [term()], state()) -> ok.
638
log_error(Fmt, Args, State) ->
639
    do_log(error, Fmt, Args, State).
12✔
640

641
-spec do_log(atom(), string(), [term()], state()) -> ok.
642
do_log(Level, Format, Args, State) ->
643
    #{partitiontopic := PartitionTopic} = State,
68✔
644
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
68✔
645
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
646

647
-spec invoke_callback(callback(), callback_input()) -> ok.
648
invoke_callback(Callback, Resp) ->
649
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
650

651
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
652
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
653
    ok;
×
654
invoke_callback({M, F, A}, Resp, BatchLen) ->
655
    lists:foreach(
1,813✔
656
      fun(_) ->
657
        erlang:apply(M, F, [Resp] ++ A)
2,260✔
658
      end,  lists:seq(1, BatchLen));
659
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
660
    lists:foreach(
×
661
      fun(_) ->
662
        Callback(Resp)
×
663
      end,  lists:seq(1, BatchLen));
664
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
665
    %% for per-request callbacks, we invoke it only once, regardless
666
    %% of how many messages were sent.
667
    apply(Fn, Args ++ [Resp]).
3✔
668

669
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
670
    erlang:external_size(Item).
2,125✔
671

672
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
673
  term_to_binary(I);
508✔
674
queue_item_marshaller(Bin) when is_binary(Bin) ->
675
  case binary_to_term(Bin) of
304✔
676
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
677
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
678
              true ->
679
                  Item;
1✔
680
              false ->
681
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
682
          end;
683
      Item ->
684
          Item
302✔
685
  end.
686

687
now_ts() ->
688
    erlang:system_time(millisecond).
3,041✔
689

690
make_queue_item(From, Messages) ->
691
    ?Q_ITEM(From, now_ts(), Messages).
1,824✔
692

693
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
694
    QItems = lists:map(
1,822✔
695
               fun(?SEND_REQ(From, Messages)) ->
696
                 make_queue_item(From, Messages)
1,822✔
697
               end,
698
               Requests),
699
    NewQ = replayq:append(Q, QItems),
1,822✔
700
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,822✔
701
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(NewQ)),
1,822✔
702
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,822✔
703
    Overflow = replayq:overflow(NewQ),
1,822✔
704
    handle_overflow(State#{replayq := NewQ}, Overflow).
1,822✔
705

706
-spec handle_overflow(state(), integer()) -> state().
707
handle_overflow(State, Overflow) when Overflow =< 0 ->
708
    %% no overflow
709
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,818✔
710
    State;
1,818✔
711
handle_overflow(State0 = #{replayq := Q, callback := Callback}, Overflow) ->
712
    {NewQ, QAckRef, Items0} =
4✔
713
        replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
714
    ok = replayq:ack(NewQ, QAckRef),
4✔
715
    maybe_log_discard(State0, length(Items0)),
4✔
716
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
717
    reply_with_error(Items, Callback, {error, overflow}),
4✔
718
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
719
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
720
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
721
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
722
    State0#{replayq := NewQ}.
4✔
723

724
maybe_log_discard(State, Increment) ->
725
    Last = get_overflow_log_state(),
1,825✔
726
    #{ count_since_last_log := CountLast
1,825✔
727
     , total_count := TotalCount
728
     } = Last,
729
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,825✔
730
        true -> %% no change
731
            ok;
1,819✔
732
        false ->
733
            maybe_log_discard(State, Increment, Last)
6✔
734
    end.
735

736
-spec maybe_log_discard(
737
        state(),
738
        non_neg_integer(),
739
        #{ last_log_inst => non_neg_integer()
740
         , count_since_last_log => non_neg_integer()
741
         , total_count => non_neg_integer()
742
         }) -> ok.
743
maybe_log_discard(State,
744
                  Increment,
745
                  #{ last_log_inst := LastInst
746
                   , count_since_last_log := CountLast
747
                   , total_count := TotalCount
748
                   }) ->
749
    NowInst = now_ts(),
6✔
750
    NewTotalCount = TotalCount + Increment,
6✔
751
    Delta = NewTotalCount - CountLast,
6✔
752
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
753
        true ->
754
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
755
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
756
                                    , count_since_last_log => NewTotalCount
757
                                    , total_count => NewTotalCount
758
                                    });
759
        false ->
760
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
761
                                    , count_since_last_log => CountLast
762
                                    , total_count => NewTotalCount
763
                                    })
764
    end.
765

766
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
767
                                   , count_since_last_log => non_neg_integer()
768
                                   , total_count => non_neg_integer()
769
                                   }.
770
get_overflow_log_state() ->
771
    case get(?buffer_overflow_discarded) of
1,827✔
772
        undefined ->
773
            #{ last_log_inst => 0
1,820✔
774
             , count_since_last_log => 0
775
             , total_count => 0
776
             };
777
        Stats = #{} ->
778
            Stats
7✔
779
    end.
780

781
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
782
                              , count_since_last_log => non_neg_integer()
783
                              , total_count => non_neg_integer()
784
                              }) -> ok.
785
put_overflow_log_state(#{ last_log_inst := _LastInst
786
                        , count_since_last_log := _CountLast
787
                        , total_count := _TotalCount
788
                        } = Stats) ->
789
    put(?buffer_overflow_discarded, Stats),
8✔
790
    ok.
8✔
791

792
maybe_send_to_pulsar(State) ->
793
    #{replayq := Q} = State,
2,376✔
794
    case replayq:count(Q) =:= 0 of
2,376✔
795
        true ->
796
            State;
1,194✔
797
        false ->
798
            do_send_to_pulsar(State)
1,182✔
799
    end.
800

801
do_send_to_pulsar(State0) ->
802
    #{ batch_size := BatchSize
1,182✔
803
     , inflight_calls := InflightCalls0
804
     , sequence_id := SequenceId
805
     , requests := Requests0
806
     , replayq := Q
807
     , opts := ProducerOpts
808
     } = State0,
809
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
1,182✔
810
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
1,182✔
811
                                             , bytes_limit => MaxBatchBytes
812
                                             }),
813
    State1 = State0#{replayq := NewQ},
1,182✔
814
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
1,182✔
815
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
1,182✔
816
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
1,182✔
817
    Now = now_ts(),
1,182✔
818
    {Expired, FromsToMessages} =
1,182✔
819
       lists:foldr(
820
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
821
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,970✔
822
             true ->
823
               {[{From, Msgs} | Expired], Acc};
4✔
824
             false ->
825
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,966✔
826
           end
827
         end,
828
         {[], []},
829
         Items),
830
    reply_expired_messages(Expired, State1),
1,182✔
831
    pulsar_metrics:dropped_inc(State1, length(Expired)),
1,182✔
832
    case FromsToMessages of
1,182✔
833
        [] ->
834
            %% all expired, immediately ack replayq batch and continue
835
            ok = replayq:ack(Q, QAckRef),
2✔
836
            maybe_send_to_pulsar(State1);
2✔
837
        [_ | _] ->
838
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
1,180✔
839
                                     FromsToMessages,
1,180✔
840
                                 Msg <- Msgs],
1,966✔
841
            FinalBatchSize = length(FinalBatch),
1,180✔
842
            send_batch_payload(FinalBatch, SequenceId, State0),
1,180✔
843
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
1,180✔
844
            InflightCalls = InflightCalls0 + FinalBatchSize,
1,180✔
845
            pulsar_metrics:inflight_set(State1, InflightCalls),
1,180✔
846
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
1,180✔
847
            State = next_sequence_id(State2),
1,180✔
848
            maybe_send_to_pulsar(State)
1,180✔
849
    end.
850

851
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
852
                               [pulsar:message()]}],
853
                             state()) -> ok.
854
reply_expired_messages(Expired, #{callback := Callback}) ->
855
    reply_with_error(Expired, Callback, {error, expired}).
1,183✔
856

857
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
858
                         [pulsar:message()]}],
859
                       callback(), {error, expired | overflow}) -> ok.
860
reply_with_error(Items, Callback, Error) ->
861
    lists:foreach(
1,187✔
862
      fun({undefined, Msgs}) ->
863
              invoke_callback(Callback, Error, length(Msgs));
7✔
864
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
865
              %% No need to count the messages, as we invoke
866
              %% per-request callbacks once for the whole batch.
867
              invoke_callback({Fn, Args}, Error);
2✔
868
         ({From, _Msgs}) ->
869
              gen_statem:reply(From, Error)
×
870
      end,
871
      Items).
872

873
collect_send_requests(Acc, Limit) ->
874
    Count = length(Acc),
1,167✔
875
    do_collect_send_requests(Acc, Count, Limit).
1,167✔
876

877
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
878
    lists:reverse(Acc);
×
879
do_collect_send_requests(Acc, Count, Limit) ->
880
    receive
1,167✔
881
        ?SEND_REQ(_, _) = Req ->
882
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
883
    after
884
        0 ->
885
            lists:reverse(Acc)
1,167✔
886
    end.
887

888
try_close_socket(#{sock := undefined}) ->
889
    ok;
×
890
try_close_socket(#{sock := Sock, opts := Opts}) ->
891
    _ = pulsar_socket:close(Sock, Opts),
8✔
892
    ok.
8✔
893

894
resend_sent_requests(State) ->
895
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
27✔
896
    #{ inflight_calls := InflightCalls0
27✔
897
     , requests := Requests0
898
     , replayq := Q
899
     , opts := ProducerOpts
900
     } = State,
901
    Now = now_ts(),
27✔
902
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
27✔
903
    {Requests, Dropped} =
27✔
904
        maps:fold(
905
          fun(SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize), {AccIn, DroppedAcc}) ->
906
               {Messages, Expired} =
402✔
907
                  lists:partition(
908
                    fun({_From, {Ts, _Msgs}}) ->
909
                      not is_batch_expired(Ts, RetentionPeriod, Now)
402✔
910
                    end,
911
                    FromsToMessages),
912
               lists:foreach(
402✔
913
                 fun({From, {_Ts, Msgs}}) ->
914
                   reply_expired_messages([{From, Msgs}], State)
1✔
915
                 end,
916
                 Expired),
917
               Dropped = length(Expired),
402✔
918
               Acc = case Messages of
402✔
919
                   [] ->
920
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
921
                       ok = replayq:ack(Q, QAckRef),
1✔
922
                       AccIn;
1✔
923
                   [_ | _] ->
924
                       send_batch_payload([Msg || {_From, {_Ts, Msgs}} <- Messages,
401✔
925
                                                  Msg <- Msgs],
401✔
926
                                          SequenceId, State),
927
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(Messages))}
401✔
928
               end,
929
               {Acc, DroppedAcc + Dropped}
402✔
930
          end,
931
          {#{}, 0},
932
          Requests0),
933
    InflightCalls = InflightCalls0 - Dropped,
27✔
934
    pulsar_metrics:dropped_inc(State, Dropped),
27✔
935
    pulsar_metrics:inflight_set(State, InflightCalls),
27✔
936
    State#{requests := Requests, inflight_calls := InflightCalls}.
27✔
937

938
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
939
    false;
2,363✔
940
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
941
    Timestamp =< Now - RetentionPeriod.
9✔
942

943
-spec escape(string()) -> binary().
944
escape(Str) ->
945
    NormalizedStr = unicode:characters_to_nfd_list(Str),
7✔
946
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
7✔
947

948
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
949
handle_lookup_topic_reply({error, Error}, State) ->
950
    log_error("error looking up topic: ~0p", [Error], State),
×
951
    try_close_socket(State),
×
952
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
953
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
954
                                , brokerServiceUrl := NewBrokerServiceURL
955
                                }}, State0) ->
956
    #{clientid := ClientId} = State0,
27✔
957
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
27✔
958
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
27✔
959
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
27✔
960
        {ok, AlivePulsarURL} ->
961
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
27✔
962
                           , alive_pulsar_url => AlivePulsarURL
963
                           }, State0);
964
        {error, Reason} ->
965
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
966
            try_close_socket(State0),
×
967
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
968
    catch
969
        exit:{noproc, _} ->
970
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
971
            try_close_socket(State0),
×
972
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
973
        exit:{timeout, _} ->
974
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
×
975
            try_close_socket(State0),
×
976
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
977
    end;
978
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
979
                                , brokerServiceUrl := NewBrokerServiceURL
980
                                }},
981
                         State) ->
982
    log_debug("received topic lookup reply: ~0p",
×
983
              [#{proxy_through_service_url => false,
984
                 broker_service_url => NewBrokerServiceURL}], State),
985
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
986
                   , broker_service_url => undefined
987
                   }, State).
988

989
-spec maybe_connect(#{ alive_pulsar_url := string()
990
                     , broker_service_url := string() | undefined
991
                     }, state()) -> handler_result().
992
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
993
               , alive_pulsar_url := AlivePulsarURL
994
               }, State0) ->
995
    #{ broker_server := OldBrokerServer
27✔
996
     , proxy_to_broker_url := OldBrokerServiceURL
997
     } = State0,
998
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
27✔
999
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
27✔
1000
        true ->
1001
            log_debug("connecting to ~0p",
27✔
1002
                      [#{broker_server => NewBrokerServer,
1003
                         service_url => NewBrokerServiceURL}], State0),
1004
            do_connect(State0);
27✔
1005
        false ->
1006
            %% broker changed; reconnect.
1007
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
1008
                      [ #{ broker_server => OldBrokerServer
1009
                         , proxy_url => OldBrokerServiceURL
1010
                         }
1011
                      , #{ broker_server => NewBrokerServer
1012
                         , proxy_url => NewBrokerServiceURL
1013
                         }
1014
                      ],
1015
                      State0),
1016
            try_close_socket(State0),
×
1017
            State = State0#{
×
1018
                broker_server := NewBrokerServer,
1019
                proxy_to_broker_url := NewBrokerServiceURL
1020
            },
1021
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1022
    end.
1023

1024
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1025
notify_state_change(undefined, _ProducerState) ->
1026
    ok;
×
1027
notify_state_change({Fun, Args}, ProducerState) ->
1028
    _ = apply(Fun, [ProducerState | Args]),
84✔
1029
    ok.
84✔
1030

1031
-ifdef(TEST).
1032
-include_lib("eunit/include/eunit.hrl").
1033

1034
maybe_log_discard_test_() ->
1035
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1036
    , {"fake-last-old",
1037
       fun() ->
1038
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1039
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1040
                                      , count_since_last_log => 2
1041
                                      , total_count => 2
1042
                                      }),
1043
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1044
         Stats = get_overflow_log_state(),
1✔
1045
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1046
         %% greater than the minimum interval because we just logged
1047
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1048
         ok
1✔
1049
       end}
1050
    , {"fake-last-fresh",
1051
       fun() ->
1052
         Inst0 = now_ts(),
1✔
1053
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1054
                                      , count_since_last_log => 2
1055
                                      , total_count => 2
1056
                                      }),
1057
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1058
         Stats = get_overflow_log_state(),
1✔
1059
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1060
         %% less than the minimum interval because we didn't log and just accumulated
1061
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1062
         ok
1✔
1063
       end}
1064
    ].
1065
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc