• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 858

24 Nov 2025 04:28PM UTC coverage: 73.517%. First build
858

Pull #79

github

zmstone
ci: fix python venv
Pull Request #79: 251124 publish single message if batch size is 1

60 of 64 new or added lines in 8 files covered. (93.75%)

1041 of 1416 relevant lines covered (73.52%)

254.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.38
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        ]).
41

42
-if(?OTP_RELEASE >= 25).
43
-export([format_status/1]).
44
-else.
45
-export([format_status/2]).
46
-endif.
47

48
%% replayq API
49
-export([ queue_item_sizer/1
50
        , queue_item_marshaller/1
51
        ]).
52

53
-export([handle_response/2]).
54

55
%% for testing only
56
-ifdef(TEST).
57
-export([make_queue_item/2]).
58
-endif.
59

60
-type statem() :: idle | connecting | connected.
61
-type sequence_id() :: integer().
62
-type send_receipt() :: #{ sequence_id := sequence_id()
63
                         , producer_id := integer()
64
                         , highest_sequence_id := sequence_id()
65
                         , message_id := map()
66
                         , any() => term()
67
                         }.
68
-type timestamp() :: integer().
69
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
70
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
71
-type config() :: #{ replayq_dir := string()
72
                   , replayq_max_total_bytes => pos_integer()
73
                   , replayq_seg_bytes => pos_integer()
74
                   , replayq_offload_mode => boolean()
75
                   , max_batch_bytes => pos_integer()
76
                   , producer_name => atom()
77
                   , clientid => atom()
78
                   , callback => callback()
79
                   , batch_size => non_neg_integer()
80
                   , drop_if_high_mem => boolean()
81
                   , max_inflight => pos_integer()
82
                   , retention_period => timeout()
83
                   }.
84
-export_type([ config/0
85
             ]).
86

87
-define(RECONNECT_TIMEOUT, 5_000).
88
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
89
-define(GET_ALIVE_PULSAR_URL, 5_000).
90

91
-define(MAX_REQ_ID, 4294836225).
92
-define(MAX_SEQ_ID, 18445618199572250625).
93

94
-define(DEFAULT_MAX_INFLIGHT, 10).
95

96
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
97
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
98
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
99
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
100
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
101
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined,
102
                                                                    sock_pid := undefined},
103
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
104
-define(buffer_overflow_discarded, buffer_overflow_discarded).
105
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
106
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
107
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
108

109
%% poorman's error handling.
110
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
111
-define(POORMAN(SOCK, EXPR),
112
        case (EXPR) of
113
            ok ->
114
                ok;
115
            {error, Reason} ->
116
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
117
                ok
118
        end).
119

120
%% Calls/Casts/Infos
121
-record(maybe_send_to_pulsar, {}).
122

123
-type state_observer_callback() :: {function(), [term()]}.
124
-type state() :: #{
125
    batch_size := non_neg_integer(),
126
    broker_server := {binary(), pos_integer()},
127
    callback := undefined | mfa() | fun((map()) -> ok),
128
    clientid := atom(),
129
    drop_if_high_mem := boolean(),
130
    inflight_calls := non_neg_integer(),
131
    lookup_topic_request_ref := reference() | undefined,
132
    max_inflight := pos_integer(),
133
    opts := map(),
134
    parent_pid := undefined | pid(),
135
    partitiontopic := string(),
136
    producer_id := integer(),
137
    producer_name := atom(),
138
    proxy_to_broker_url := undefined | string(),
139
    replayq := replayq:q(),
140
    replayq_offload_mode := boolean(),
141
    request_id := integer(),
142
    requests := #{sequence_id() =>
143
                      ?INFLIGHT_REQ(
144
                         replayq:ack_ref() | undefined,
145
                         [{gen_statem:from() | undefined,
146
                           {timestamp(), [pulsar:message()]}}],
147
                         _BatchSize :: non_neg_integer()
148
                        )},
149
    sequence_id := sequence_id(),
150
    state_observer_callback := undefined | state_observer_callback(),
151
    sock := undefined | port(),
152
    sock_pid := undefined | pid(),
153
    telemetry_metadata := map()
154
}.
155
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
156
-type per_request_callback() :: {function(), [term()]}.
157
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
158
-type send_opts() :: #{callback_fn => per_request_callback()}.
159
-export_type([send_opts/0]).
160

161
callback_mode() -> [state_functions, state_enter].
24✔
162

163
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
164
    SpawnOpts = [{spawn_opt, [{message_queue_data, off_heap}]}],
24✔
165
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, SpawnOpts).
24✔
166

167
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
168
send(Pid, Messages) ->
169
    send(Pid, Messages, _Opts = #{}).
×
170

171
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
172
send(Pid, Messages, Opts) ->
173
    From = case maps:get(callback_fn, Opts, undefined) of
1,821✔
174
               undefined -> undefined;
1,818✔
175
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
176
           end,
177
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,821✔
178
    {ok, Pid}.
1,821✔
179

180
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
181
          {ok, send_receipt()}
182
        | {error, producer_connecting
183
                | producer_disconnected
184
                | term()}.
185
send_sync(Pid, Messages) ->
186
    send_sync(Pid, Messages, 5_000).
×
187

188
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
189
          {ok, send_receipt()}
190
        | {error, producer_connecting
191
                | producer_disconnected
192
                | term()}.
193
send_sync(Pid, Messages, Timeout) ->
194
    Caller = self(),
11✔
195
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
11✔
196
    %% Mimicking gen_statem's From, so the reply can be sent with
197
    %% `gen_statem:reply/2'
198
    From = {Caller, MRef},
11✔
199
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
11✔
200
    receive
11✔
201
        {MRef, Response} ->
202
            erlang:demonitor(MRef, [flush]),
11✔
203
            Response;
11✔
204
        {'DOWN', MRef, process, Pid, Reason} ->
205
            error({producer_down, Reason})
×
206
    after
207
        Timeout ->
208
            erlang:demonitor(MRef, [flush]),
×
209
            receive
×
210
                {MRef, Response} ->
211
                    Response
×
212
            after
213
                0 ->
214
                    error(timeout)
×
215
            end
216
    end.
217

218
-spec get_state(pid()) -> statem().
219
get_state(Pid) ->
220
    gen_statem:call(Pid, get_state, 5_000).
2✔
221

222
%%--------------------------------------------------------------------
223
%% gen_statem callback
224
%%--------------------------------------------------------------------
225

226
-spec init({string(), string(), string() | undefined, config()}) ->
227
          gen_statem:init_result(statem(), state()).
228
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
229
    process_flag(trap_exit, true),
24✔
230
    pulsar_utils:set_label({?MODULE, PartitionTopic}),
24✔
231
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
24✔
232
    ProducerID = maps:get(producer_id, ProducerOpts0),
24✔
233
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
24✔
234
    ReplayqCfg0 =
24✔
235
        case maps:get(replayq_dir, ProducerOpts0, false) of
236
            false ->
237
                #{mem_only => true};
15✔
238
            BaseDir ->
239
                PartitionTopicPath = escape(PartitionTopic),
9✔
240
                Dir = filename:join([BaseDir, PartitionTopicPath]),
9✔
241
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
9✔
242
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
9✔
243
        end,
244
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
24✔
245
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
24✔
246
    ReplayqCfg =
24✔
247
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
248
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
249
                    , max_total_bytes => MaxTotalBytes
250
                    },
251
    Q = replayq:open(ReplayqCfg),
24✔
252
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
24✔
253
    %% drop replayq options, now that it's open.
254
    DropIfHighMem = maps:get(drop_if_high_mem, ProducerOpts1, false),
24✔
255
    MaxInflight = maps:get(max_inflight, ProducerOpts1, ?DEFAULT_MAX_INFLIGHT),
24✔
256
    ProducerOpts = maps:without([ replayq_dir
24✔
257
                                , replayq_seg_bytes
258
                                , replayq_offload_mode
259
                                , replayq_max_total_bytes
260
                                , drop_if_high_mem
261
                                , max_inflight
262
                                ],
263
                                ProducerOpts1),
264
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
24✔
265
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
24✔
266
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
24✔
267
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
24✔
268
    State = #{
24✔
269
        batch_size => maps:get(batch_size, ProducerOpts, 0),
270
        broker_server => BrokerServer,
271
        callback => maps:get(callback, ProducerOpts, undefined),
272
        clientid => maps:get(clientid, ProducerOpts),
273
        drop_if_high_mem => DropIfHighMem,
274
        inflight_calls => 0,
275
        lookup_topic_request_ref => undefined,
276
        max_inflight => MaxInflight,
277
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
278
        parent_pid => ParentPid,
279
        partitiontopic => PartitionTopic,
280
        producer_id => ProducerID,
281
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
282
        proxy_to_broker_url => ProxyToBrokerUrl,
283
        replayq => Q,
284
        replayq_offload_mode => Offload,
285
        request_id => 1,
286
        requests => #{},
287
        sequence_id => 1,
288
        state_observer_callback => StateObserverCallback,
289
        sock => undefined,
290
        sock_pid => undefined,
291
        telemetry_metadata => TelemetryMetadata
292
    },
293
    pulsar_metrics:inflight_set(State, 0),
24✔
294
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
24✔
295
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
24✔
296
    {ok, idle, State, [{next_event, internal, do_connect}]}.
24✔
297

298
%% idle state
299
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
300
          handler_result().
301
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
302
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
32✔
303
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
32✔
304
    keep_state_and_data;
32✔
305
idle(internal, do_connect, State) ->
306
    refresh_urls_and_connect(State);
24✔
307
idle(state_timeout, do_connect, State) ->
308
    refresh_urls_and_connect(State);
10✔
309
idle(state_timeout, lookup_topic_timeout, State0) ->
310
    log_error("timed out waiting for lookup topic response", [], State0),
3✔
311
    State = State0#{lookup_topic_request_ref := undefined},
3✔
312
    ?NEXT_STATE_IDLE_RECONNECT(State);
3✔
313
idle({call, From}, get_state, _State) ->
314
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
315
idle({call, From}, _EventContent, _State) ->
316
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
317
idle(cast, _EventContent, _State) ->
318
    keep_state_and_data;
×
319
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
320
    State = enqueue_send_requests([SendRequest], State0),
658✔
321
    {keep_state, State};
658✔
322
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
323
    State = State0#{lookup_topic_request_ref := undefined},
29✔
324
    erlang:demonitor(Ref, [flush]),
29✔
325
    handle_lookup_topic_reply(Reply, State);
29✔
326
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
327
    {stop, Reason};
×
328
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
329
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
330
    State = State0#{lookup_topic_request_ref := undefined},
×
331
    try_close_socket(State),
×
332
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
333
idle(internal, #maybe_send_to_pulsar{}, _State) ->
334
    %% Stale nudge
335
    keep_state_and_data;
×
336
idle(_EventType, _Event, _State) ->
337
    keep_state_and_data.
6✔
338

339
%% connecting state
340
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
341
          handler_result().
342
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
343
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
29✔
344
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
29✔
345
    keep_state_and_data;
29✔
346
connecting(state_timeout, do_connect, State) ->
347
    refresh_urls_and_connect(State);
×
348
connecting(state_timeout, lookup_topic_timeout, State0) ->
349
    log_error("timed out waiting for lookup topic response", [], State0),
×
350
    State = State0#{lookup_topic_request_ref := undefined},
×
351
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
352
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
353
    State = enqueue_send_requests([SendRequest], State0),
2✔
354
    {keep_state, State};
2✔
355
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
356
    State = State0#{lookup_topic_request_ref := undefined},
×
357
    erlang:demonitor(Ref, [flush]),
×
358
    handle_lookup_topic_reply(Reply, State);
×
359
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
360
    {stop, Reason};
×
361
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
362
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
363
    State = State0#{lookup_topic_request_ref := undefined},
×
364
    try_close_socket(State),
×
365
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
366
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
367
    handle_socket_close(connecting, Sock, Reason, State);
×
368
connecting(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
369
    handle_socket_close(connecting, SockPid, Reason, State);
×
370
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
371
    handle_socket_close(connecting, Sock, closed, State);
×
372
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
373
    handle_socket_close(connecting, Sock, Reason, State);
×
374
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
375
    handle_socket_close(connecting, Sock, Reason, State);
×
376
connecting(internal, #maybe_send_to_pulsar{}, _State) ->
377
    %% Stale nudge
378
    keep_state_and_data;
×
379
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
380
    Cmd = pulsar_protocol_frame:parse(Bin),
58✔
381
    ?MODULE:handle_response(Cmd, State);
58✔
382
connecting(info, Msg, State) ->
383
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
384
    keep_state_and_data;
×
385
connecting({call, From}, get_state, _State) ->
386
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
387
connecting({call, From}, _EventContent, _State) ->
388
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
389
connecting(cast, _EventContent, _State) ->
390
   keep_state_and_data.
×
391

392
%% connected state
393
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
394
          handler_result().
395
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
396
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
29✔
397
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
29✔
398
    State1 = resend_sent_requests(State0),
29✔
399
    State = maybe_send_to_pulsar(State1),
29✔
400
    {keep_state, State};
29✔
401
connected(state_timeout, do_connect, _State) ->
402
    keep_state_and_data;
×
403
connected(state_timeout, lookup_topic_timeout, State0) ->
404
    log_error("timed out waiting for lookup topic response", [], State0),
×
405
    %% todo: should demonitor reference
406
    State = State0#{lookup_topic_request_ref := undefined},
×
407
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
408
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
409
    ?tp(pulsar_producer_send_req_enter, #{}),
1,172✔
410
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,172✔
411
    State1 = enqueue_send_requests(SendRequests, State0),
1,172✔
412
    State = maybe_send_to_pulsar(State1),
1,172✔
413
    ?tp(pulsar_producer_send_req_exit, #{}),
1,172✔
414
    {keep_state, State};
1,172✔
415
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
416
    State = State0#{lookup_topic_request_ref := undefined},
×
417
    erlang:demonitor(Ref, [flush]),
×
418
    handle_lookup_topic_reply(Reply, State);
×
419
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
420
    {stop, Reason};
×
421
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
422
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
423
    State = State0#{lookup_topic_request_ref := undefined},
×
424
    try_close_socket(State),
×
425
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
426
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
427
    handle_socket_close(connected, Sock, Reason, State);
1✔
428
connected(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
429
    handle_socket_close(connected, SockPid, Reason, State);
×
430
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
431
    handle_socket_close(connected, Sock, closed, State);
8✔
432
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
433
    handle_socket_close(connected, Sock, Reason, State);
×
434
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
435
    handle_socket_close(connected, Sock, Reason, State);
×
436
connected(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
437
    Cmd = pulsar_protocol_frame:parse(Bin),
659✔
438
    ?MODULE:handle_response(Cmd, State);
659✔
439
connected(_EventType, ping, State = #{sock_pid := SockPid}) ->
440
    ok = pulsar_socket_writer:ping_async(SockPid),
6✔
441
    {keep_state, State};
6✔
442
connected(internal, #maybe_send_to_pulsar{}, State0) ->
443
    State = maybe_send_to_pulsar(State0),
650✔
444
    {keep_state, State};
650✔
445
connected({call, From}, get_state, _State) ->
446
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
447
connected({call, From}, _EventContent, _State) ->
448
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
449
connected(cast, _EventContent, _State) ->
450
    keep_state_and_data;
×
451
connected(_EventType, EventContent, State) ->
452
    ?MODULE:handle_response(EventContent, State).
×
453

454
handle_socket_close(StateName, SockPid, Reason, #{sock_pid := SockPid} = State) ->
455
    #{sock := Sock} = State,
×
456
    handle_socket_close(StateName, Sock, Reason, State);
×
457
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
458
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
459
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
460
    try_close_socket(State),
8✔
461
    ?NEXT_STATE_IDLE_RECONNECT(State);
8✔
462
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
463
    %% stale close event
464
    keep_state_and_data.
1✔
465

466
-spec refresh_urls_and_connect(state()) -> handler_result().
467
refresh_urls_and_connect(State0) ->
468
    %% if Pulsar went down and then restarted later, we must issue a
469
    %% LookupTopic command again after reconnecting.
470
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
471
    %% > Topic lookup needs to be performed each time a client needs
472
    %% > to create or reconnect a producer or a consumer. Lookup is used
473
    %% > to discover which particular broker is serving the topic we are
474
    %% > about to use.
475
    %% Simply looking up the topic (even from a distinct connection)
476
    %% will "unblock" the topic so we may send messages to it.  The
477
    %% producer may be started only after that.
478
    #{ clientid := ClientId
34✔
479
     , partitiontopic := PartitionTopic
480
     } = State0,
481
    ?tp(debug, pulsar_producer_refresh_start, #{}),
34✔
482
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
34✔
483
        {ok, LookupTopicRequestRef} ->
484
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
34✔
485
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
34✔
486
    catch
487
        exit:{noproc, _} ->
488
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
489
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
490
    end.
491

492
-spec do_connect(state()) -> handler_result().
493
do_connect(State) ->
494
    #{ broker_server := {Host, Port}
29✔
495
     , opts := Opts
496
     , partitiontopic := PartitionTopic
497
     , proxy_to_broker_url := ProxyToBrokerUrl
498
     } = State,
499
    try pulsar_socket_writer:start_link(PartitionTopic, Host, Port, Opts) of
29✔
500
        {ok, {SockPid, Sock}} ->
501
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
29✔
502
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
29✔
503
            {next_state, connecting, State#{sock := Sock, sock_pid := SockPid}};
29✔
504
        {error, Reason} ->
505
            log_error("error connecting: ~p", [Reason], State),
×
506
            try_close_socket(State),
×
507
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
508
    catch
509
        Kind:Error:Stacktrace ->
510
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
511
            try_close_socket(State),
×
512
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
513
    end.
514

515
-if(?OTP_RELEASE >= 25).
516
format_status(Status) ->
517
    maps:map(
2✔
518
      fun(data, Data0) ->
519
              censor_secrets(Data0);
2✔
520
         (_Key, Value)->
521
              Value
12✔
522
      end,
523
      Status).
524
-else.
525
%% `format_status/2' is deprecated as of OTP 25.0
526
format_status(_Opt, [_PDict, _State0, Data0]) ->
527
    Data = censor_secrets(Data0),
528
    [{data, [{"State", Data}]}].
529
-endif.
530

531
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
532
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
533
censor_secrets(Data) ->
534
    Data.
2✔
535

536
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
537
    ok = replayq:close(Q),
19✔
538
    ok = clear_gauges(State, Q),
19✔
539
    ok.
19✔
540

541
clear_gauges(State, Q) ->
542
    pulsar_metrics:inflight_set(State, 0),
19✔
543
    maybe_reset_queuing(State, Q),
19✔
544
    ok.
19✔
545

546
maybe_reset_queuing(State, Q) ->
547
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
19✔
548
        {0, _} ->
549
            pulsar_metrics:queuing_set(State, 0),
19✔
550
            pulsar_metrics:queuing_bytes_set(State, 0);
19✔
551
        {_, false} ->
552
            pulsar_metrics:queuing_set(State, 0),
×
553
            pulsar_metrics:queuing_bytes_set(State, 0);
×
554
        {_, _} ->
555
            ok
×
556
    end.
557

558
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
559
    false;
×
560
is_replayq_durable(_, Q) ->
561
    not replayq:is_mem_only(Q).
19✔
562

563
-spec handle_response(_EventContent, state()) ->
564
          handler_result().
565
handle_response({connected, _ConnectedData}, State0 = #{
566
        sock := Sock,
567
        opts := Opts,
568
        producer_id := ProducerId,
569
        request_id := RequestId,
570
        partitiontopic := PartitionTopic
571
    }) ->
572
    start_keepalive(),
29✔
573
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
29✔
574
    {keep_state, next_request_id(State0)};
29✔
575
handle_response({producer_success, #{producer_name := ProName}}, State) ->
576
    {next_state, connected, State#{producer_name := ProName}};
29✔
577
handle_response({pong, #{}}, _State) ->
578
    start_keepalive(),
3✔
579
    keep_state_and_data;
3✔
580
handle_response({ping, #{}}, #{sock_pid := SockPid}) ->
581
    ok = pulsar_socket_writer:pong_async(SockPid),
5✔
582
    keep_state_and_data;
5✔
583
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
584
                                                }) ->
585
    log_error("Close producer: ~p~n", [Topic], State),
×
586
    try_close_socket(State),
×
587
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
588
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
589
    #{ callback := Callback
650✔
590
     , inflight_calls := InflightCalls0
591
     , requests := Reqs
592
     , replayq := Q
593
     } = State,
594
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
650✔
595
    case maps:get(SequenceId, Reqs, undefined) of
650✔
596
        undefined ->
597
            _ = invoke_callback(Callback, {ok, Resp}),
×
598
            {keep_state, State};
×
599
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
600
            ok = replayq_ack(Q, QAckRef),
650✔
601
            lists:foreach(
650✔
602
              fun({undefined, {_TS, Messages}}) ->
603
                   BatchLen = length(Messages),
1,816✔
604
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,816✔
605
                   ok;
1,816✔
606
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
607
                   %% No need to count the messages, as we invoke
608
                   %% per-request callbacks once for the whole batch.
609
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
610
                   ok;
1✔
611
                 ({From, {_TS, _Messages}}) ->
612
                   gen_statem:reply(From, {ok, Resp})
11✔
613
              end,
614
              FromsToMessages),
615
            InflightCalls = InflightCalls0 - BatchSize,
650✔
616
            pulsar_metrics:inflight_set(State, InflightCalls),
650✔
617
            Actions = [{next_event, internal, #maybe_send_to_pulsar{}}],
650✔
618
            NewState = State#{ requests := maps:remove(SequenceId, Reqs)
650✔
619
                             , inflight_calls := InflightCalls
620
                             },
621
            {keep_state, NewState, Actions}
650✔
622
    end;
623
handle_response({error, #{error := Error, message := Msg}}, State) ->
624
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
625
    try_close_socket(State),
×
626
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
627
handle_response(Msg, State) ->
628
    log_error("Receive unknown message:~p~n", [Msg], State),
×
629
    keep_state_and_data.
×
630

631
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
632
send_batch_payload(Messages, SequenceId, #{
633
            partitiontopic := Topic,
634
            producer_id := ProducerId,
635
            producer_name := ProducerName,
636
            sock_pid := SockPid,
637
            opts := Opts
638
        }) ->
639
    pulsar_socket_writer:send_batch_async(SockPid, Topic, Messages, SequenceId,
799✔
640
                                          ProducerId, ProducerName, Opts).
641

642
-spec send_single_payload(pulsar:message(), sequence_id(), state()) -> ok.
643
send_single_payload(Message, SequenceId, #{
644
            partitiontopic := Topic,
645
            producer_id := ProducerId,
646
            producer_name := ProducerName,
647
            sock_pid := SockPid,
648
            opts := Opts
649
        }) ->
650
    pulsar_socket_writer:send_single_async(SockPid, Topic, Message, SequenceId,
16✔
651
                                           ProducerId, ProducerName, Opts).
652

653
start_keepalive() ->
654
    erlang:send_after(30_000, self(), ping).
32✔
655

656
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
657
    State#{request_id := 1};
×
658
next_request_id(State = #{request_id := RequestId}) ->
659
    State#{request_id := RequestId + 1}.
29✔
660

661
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
662
    State#{sequence_id := 1};
×
663
next_sequence_id(State = #{sequence_id := SequenceId}) ->
664
    State#{sequence_id := SequenceId + 1}.
804✔
665

666
-spec log_debug(string(), [term()], state()) -> ok.
667
log_debug(Fmt, Args, State) ->
668
    do_log(debug, Fmt, Args, State).
58✔
669

670
-spec log_info(string(), [term()], state()) -> ok.
671
log_info(Fmt, Args, State) ->
672
    do_log(info, Fmt, Args, State).
×
673

674
-spec log_warn(string(), [term()], state()) -> ok.
675
log_warn(Fmt, Args, State) ->
676
    do_log(warning, Fmt, Args, State).
2✔
677

678
-spec log_error(string(), [term()], state()) -> ok.
679
log_error(Fmt, Args, State) ->
680
    do_log(error, Fmt, Args, State).
11✔
681

682
-spec do_log(atom(), string(), [term()], state()) -> ok.
683
do_log(Level, Format, Args, State) ->
684
    #{partitiontopic := PartitionTopic} = State,
71✔
685
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
71✔
686
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
687

688
-spec invoke_callback(callback(), callback_input()) -> ok.
689
invoke_callback(Callback, Resp) ->
690
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
691

692
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
693
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
694
    ok;
×
695
invoke_callback({M, F, A}, Resp, BatchLen) ->
696
    lists:foreach(
1,812✔
697
      fun(_) ->
698
        erlang:apply(M, F, [Resp] ++ A)
2,259✔
699
      end,  lists:seq(1, BatchLen));
700
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
701
    lists:foreach(
11✔
702
      fun(_) ->
703
        Callback(Resp)
11✔
704
      end,  lists:seq(1, BatchLen));
705
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
706
    %% for per-request callbacks, we invoke it only once, regardless
707
    %% of how many messages were sent.
708
    apply(Fn, Args ++ [Resp]).
3✔
709

710
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
711
    erlang:external_size(Item).
2,135✔
712

713
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
714
  term_to_binary(I);
518✔
715
queue_item_marshaller(Bin) when is_binary(Bin) ->
716
  case binary_to_term(Bin) of
304✔
717
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
718
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
719
              true ->
720
                  Item;
1✔
721
              false ->
722
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
723
          end;
724
      Item ->
725
          Item
302✔
726
  end.
727

728
now_ts() ->
729
    erlang:system_time(millisecond).
2,671✔
730

731
make_queue_item(From, Messages) ->
732
    ?Q_ITEM(From, now_ts(), Messages).
1,834✔
733

734
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
735
    #{drop_if_high_mem := DropIfHighMem} = State,
1,832✔
736
    QItems = lists:map(
1,832✔
737
               fun(?SEND_REQ(From, Messages)) ->
738
                 make_queue_item(From, Messages)
1,832✔
739
               end,
740
               Requests),
741
    BytesBefore = replayq:bytes(Q),
1,832✔
742
    NewQ = replayq:append(Q, QItems),
1,832✔
743
    BytesAfter = replayq:bytes(NewQ),
1,832✔
744
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,832✔
745
    pulsar_metrics:queuing_bytes_set(State, BytesAfter),
1,832✔
746
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,832✔
747
    Overflow0 = replayq:overflow(NewQ),
1,832✔
748
    IsHighMemOverflow =
1,832✔
749
        DropIfHighMem
1,832✔
750
        andalso replayq:is_mem_only(NewQ)
×
751
        andalso load_ctl:is_high_mem(),
×
752
    Overflow = case IsHighMemOverflow of
1,832✔
753
        true ->
754
            max(Overflow0, BytesAfter - BytesBefore);
×
755
        false ->
756
            Overflow0
1,832✔
757
    end,
758
    handle_overflow(State#{replayq := NewQ}, IsHighMemOverflow, Overflow).
1,832✔
759

760
-spec handle_overflow(state(), _IsHighMemOverflow :: boolean(), _Overflow :: integer()) -> state().
761
handle_overflow(State, _IsHighMemOverflow, Overflow) when Overflow =< 0 ->
762
    %% no overflow
763
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,828✔
764
    State;
1,828✔
765
handle_overflow(State0 = #{replayq := Q, callback := Callback}, IsHighMemOverflow, Overflow) ->
766
    BytesMode = case IsHighMemOverflow of
4✔
767
        true -> at_least;
×
768
        false -> at_most
4✔
769
    end,
770
    {NewQ, QAckRef, Items0} =
4✔
771
        replayq:pop(Q, #{bytes_limit => {BytesMode, Overflow}, count_limit => 999999999}),
772
    ok = replayq_ack(NewQ, QAckRef),
4✔
773
    maybe_log_discard(State0, length(Items0)),
4✔
774
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
775
    reply_with_error(Items, Callback, {error, overflow}),
4✔
776
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
777
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
778
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
779
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
780
    State0#{replayq := NewQ}.
4✔
781

782
maybe_log_discard(State, Increment) ->
783
    Last = get_overflow_log_state(),
1,835✔
784
    #{ count_since_last_log := CountLast
1,835✔
785
     , total_count := TotalCount
786
     } = Last,
787
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,835✔
788
        true -> %% no change
789
            ok;
1,829✔
790
        false ->
791
            maybe_log_discard(State, Increment, Last)
6✔
792
    end.
793

794
-spec maybe_log_discard(
795
        state(),
796
        non_neg_integer(),
797
        #{ last_log_inst => non_neg_integer()
798
         , count_since_last_log => non_neg_integer()
799
         , total_count => non_neg_integer()
800
         }) -> ok.
801
maybe_log_discard(State,
802
                  Increment,
803
                  #{ last_log_inst := LastInst
804
                   , count_since_last_log := CountLast
805
                   , total_count := TotalCount
806
                   }) ->
807
    NowInst = now_ts(),
6✔
808
    NewTotalCount = TotalCount + Increment,
6✔
809
    Delta = NewTotalCount - CountLast,
6✔
810
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
811
        true ->
812
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
813
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
814
                                    , count_since_last_log => NewTotalCount
815
                                    , total_count => NewTotalCount
816
                                    });
817
        false ->
818
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
819
                                    , count_since_last_log => CountLast
820
                                    , total_count => NewTotalCount
821
                                    })
822
    end.
823

824
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
825
                                   , count_since_last_log => non_neg_integer()
826
                                   , total_count => non_neg_integer()
827
                                   }.
828
get_overflow_log_state() ->
829
    case get(?buffer_overflow_discarded) of
1,837✔
830
        undefined ->
831
            #{ last_log_inst => 0
1,830✔
832
             , count_since_last_log => 0
833
             , total_count => 0
834
             };
835
        Stats = #{} ->
836
            Stats
7✔
837
    end.
838

839
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
840
                              , count_since_last_log => non_neg_integer()
841
                              , total_count => non_neg_integer()
842
                              }) -> ok.
843
put_overflow_log_state(#{ last_log_inst := _LastInst
844
                        , count_since_last_log := _CountLast
845
                        , total_count := _TotalCount
846
                        } = Stats) ->
847
    put(?buffer_overflow_discarded, Stats),
8✔
848
    ok.
8✔
849

850
maybe_send_to_pulsar(State) ->
851
    #{ replayq := Q
2,651✔
852
     , requests := Requests
853
     , max_inflight := MaxInflight
854
     } = State,
855
    HasQueued = replayq:count(Q) /= 0,
2,651✔
856
    HasAvailableInflight = map_size(Requests) < MaxInflight,
2,651✔
857
    case HasQueued andalso HasAvailableInflight of
2,651✔
858
        true ->
859
            do_send_to_pulsar(State);
800✔
860
        false ->
861
            State
1,851✔
862
    end.
863

864
do_send_to_pulsar(State0) ->
865
    #{ batch_size := BatchSize
800✔
866
     , inflight_calls := InflightCalls0
867
     , sequence_id := SequenceId
868
     , requests := Requests0
869
     , replayq := Q
870
     , opts := ProducerOpts
871
     } = State0,
872
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
800✔
873
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
800✔
874
                                             , bytes_limit => MaxBatchBytes
875
                                             }),
876
    State1 = State0#{replayq := NewQ},
800✔
877
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
800✔
878
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
800✔
879
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
800✔
880
    Now = now_ts(),
800✔
881
    {Expired, FromsToMessages} =
800✔
882
       lists:foldr(
883
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
884
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,980✔
885
             true ->
886
               {[{From, Msgs} | Expired], Acc};
4✔
887
             false ->
888
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,976✔
889
           end
890
         end,
891
         {[], []},
892
         Items),
893
    reply_expired_messages(Expired, State1),
800✔
894
    pulsar_metrics:dropped_inc(State1, length(Expired)),
800✔
895
    case FromsToMessages of
800✔
896
        [] ->
897
            %% all expired, immediately ack replayq batch and continue
898
            ok = replayq_ack(Q, QAckRef),
2✔
899
            maybe_send_to_pulsar(State1);
2✔
900
        [{From, {Timestamp, Msgs}}] when BatchSize =:= 1 ->
901
            State2 = send_single_messages(From, Timestamp, QAckRef, State1, Msgs),
10✔
902
            InflightCalls = InflightCalls0 + length(Msgs),
10✔
903
            pulsar_metrics:inflight_set(State2, InflightCalls),
10✔
904
            State = State2#{inflight_calls := InflightCalls},
10✔
905
            maybe_send_to_pulsar(State);
10✔
906
        [_ | _] ->
907
            %% Batch multiple messages together
908
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
788✔
909
                                        FromsToMessages,
788✔
910
                                    Msg <- Msgs],
1,966✔
911
            FinalBatchSize = length(FinalBatch),
788✔
912
            send_batch_payload(FinalBatch, SequenceId, State0),
788✔
913
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
788✔
914
            InflightCalls = InflightCalls0 + FinalBatchSize,
788✔
915
            pulsar_metrics:inflight_set(State1, InflightCalls),
788✔
916
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
788✔
917
            State = next_sequence_id(State2),
788✔
918
            maybe_send_to_pulsar(State)
788✔
919
    end.
920

921
-spec send_single_messages(gen_statem:from() | per_request_callback_int() | undefined,
922
                           timestamp(), replayq:ack_ref(), state(), [pulsar:message()]) -> state().
923
send_single_messages(From, Timestamp, QAckRef, State0, [Msg | Msgs]) ->
924
    #{sequence_id := SequenceId,
16✔
925
      requests := Requests0
926
     } = State0,
927
    %% This is the last message in the caller's batch
928
    send_single_payload(Msg, SequenceId, State0),
16✔
929
    case Msgs =:= [] of
16✔
930
        true ->
931
            %% Associate replayq's ack reference to this request
932
            %% So when the receipt is received, queue can be acked.
933
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, [{From, {Timestamp, [Msg]}}], 1)},
10✔
934
            next_sequence_id(State0#{requests => Requests});
10✔
935
        false ->
936
            %% Associate no replayq or caller references to the preceding requests
937
            NoQAck = undefined,
6✔
938
            NoCaller = undefined,
6✔
939
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(NoQAck, [{NoCaller, {Timestamp, [Msg]}}], 1)},
6✔
940
            State = next_sequence_id(State0#{requests => Requests}),
6✔
941
            send_single_messages(From, Timestamp, QAckRef, State, Msgs)
6✔
942
    end.
943

944
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
945
                               [pulsar:message()]}],
946
                             state()) -> ok.
947
reply_expired_messages(Expired, #{callback := Callback}) ->
948
    reply_with_error(Expired, Callback, {error, expired}).
801✔
949

950
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
951
                         [pulsar:message()]}],
952
                       callback(), {error, expired | overflow}) -> ok.
953
reply_with_error(Items, Callback, Error) ->
954
    lists:foreach(
805✔
955
      fun({undefined, Msgs}) ->
956
              invoke_callback(Callback, Error, length(Msgs));
7✔
957
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
958
              %% No need to count the messages, as we invoke
959
              %% per-request callbacks once for the whole batch.
960
              invoke_callback({Fn, Args}, Error);
2✔
961
         ({From, _Msgs}) ->
962
              gen_statem:reply(From, Error)
×
963
      end,
964
      Items).
965

966
collect_send_requests(Acc, Limit) ->
967
    Count = length(Acc),
1,172✔
968
    do_collect_send_requests(Acc, Count, Limit).
1,172✔
969

970
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
971
    lists:reverse(Acc);
10✔
972
do_collect_send_requests(Acc, Count, Limit) ->
973
    receive
1,162✔
974
        ?SEND_REQ(_, _) = Req ->
975
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
976
    after
977
        0 ->
978
            lists:reverse(Acc)
1,162✔
979
    end.
980

981
try_close_socket(#{sock := undefined}) ->
982
    ok;
×
983
try_close_socket(#{sock := Sock, sock_pid := SockPid, opts := Opts}) ->
984
    %% N.B.: it's important to first close the socket and then terminate the writer
985
    %% process.  The writer may be blocked in a `send' call, and closing the socket first
986
    %% will make the call return `einval' immediately, allowing us to terminate it (it
987
    %% also terminates itself on such `send' errors, but we make sure here).
988
    _ = pulsar_socket:close(Sock, Opts),
8✔
989
    ok = pulsar_socket_writer:stop(SockPid),
8✔
990
    ok.
8✔
991

992
resend_sent_requests(State) ->
993
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
29✔
994
    #{ inflight_calls := InflightCalls0
29✔
995
     , requests := Requests0
996
     , replayq := Q
997
     , opts := ProducerOpts
998
     , batch_size := BatchSize
999
     } = State,
1000
    Now = now_ts(),
29✔
1001
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
29✔
1002
    Requests1 = lists:keysort(1, maps:to_list(Requests0)),
29✔
1003
    {Requests, Dropped} =
29✔
1004
        lists:foldl(
1005
          fun({SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize)}, {AccIn, DroppedAcc}) ->
1006
               {Messages, Expired} =
12✔
1007
                  lists:partition(
1008
                    fun({_From, {Ts, _Msgs}}) ->
1009
                      not is_batch_expired(Ts, RetentionPeriod, Now)
12✔
1010
                    end,
1011
                    FromsToMessages),
1012
               lists:foreach(
12✔
1013
                 fun({From, {_Ts, Msgs}}) ->
1014
                   reply_expired_messages([{From, Msgs}], State)
1✔
1015
                 end,
1016
                 Expired),
1017
               Dropped = length(Expired),
12✔
1018
               Acc = case Messages of
12✔
1019
                   [] ->
1020
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
1021
                       ok = replayq_ack(Q, QAckRef),
1✔
1022
                       AccIn;
1✔
1023
                   [_ | _] ->
1024
                       AllMessages = [Msg || {_From, {_Ts, Msgs}} <- Messages,
11✔
1025
                                                  Msg <- Msgs],
11✔
1026
                       case {BatchSize, AllMessages} of
11✔
1027
                           {1, [Msg1]} ->
1028
                               %% Resend as single message (batch_size was 1)
NEW
1029
                               send_single_payload(Msg1, SequenceId, State);
×
1030
                           _ ->
1031
                               %% Resend as batch
1032
                               send_batch_payload(AllMessages, SequenceId, State)
11✔
1033
                       end,
1034
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(AllMessages))}
11✔
1035
               end,
1036
               {Acc, DroppedAcc + Dropped}
12✔
1037
          end,
1038
          {#{}, 0},
1039
          Requests1),
1040
    InflightCalls = InflightCalls0 - Dropped,
29✔
1041
    pulsar_metrics:dropped_inc(State, Dropped),
29✔
1042
    pulsar_metrics:inflight_set(State, InflightCalls),
29✔
1043
    State#{requests := Requests, inflight_calls := InflightCalls}.
29✔
1044

1045
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
1046
    false;
1,983✔
1047
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
1048
    Timestamp =< Now - RetentionPeriod.
9✔
1049

1050
-spec escape(string()) -> binary().
1051
escape(Str) ->
1052
    NormalizedStr = unicode:characters_to_nfd_list(Str),
9✔
1053
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
9✔
1054

1055
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
1056
handle_lookup_topic_reply({error, Error}, State) ->
1057
    log_error("error looking up topic: ~0p", [Error], State),
×
1058
    try_close_socket(State),
×
1059
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
1060
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
1061
                                , brokerServiceUrl := NewBrokerServiceURL
1062
                                }}, State0) ->
1063
    #{clientid := ClientId} = State0,
29✔
1064
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
29✔
1065
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
29✔
1066
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
29✔
1067
        {ok, AlivePulsarURL} ->
1068
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
29✔
1069
                           , alive_pulsar_url => AlivePulsarURL
1070
                           }, State0);
1071
        {error, Reason} ->
1072
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
1073
            try_close_socket(State0),
×
1074
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
1075
    catch
1076
        exit:{noproc, _} ->
1077
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
1078
            try_close_socket(State0),
×
1079
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
1080
        exit:{timeout, _} ->
1081
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
×
1082
            try_close_socket(State0),
×
1083
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
1084
    end;
1085
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
1086
                                , brokerServiceUrl := NewBrokerServiceURL
1087
                                }},
1088
                         State) ->
1089
    log_debug("received topic lookup reply: ~0p",
×
1090
              [#{proxy_through_service_url => false,
1091
                 broker_service_url => NewBrokerServiceURL}], State),
1092
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
1093
                   , broker_service_url => undefined
1094
                   }, State).
1095

1096
-spec maybe_connect(#{ alive_pulsar_url := string()
1097
                     , broker_service_url := string() | undefined
1098
                     }, state()) -> handler_result().
1099
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
1100
               , alive_pulsar_url := AlivePulsarURL
1101
               }, State0) ->
1102
    #{ broker_server := OldBrokerServer
29✔
1103
     , proxy_to_broker_url := OldBrokerServiceURL
1104
     } = State0,
1105
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
29✔
1106
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
29✔
1107
        true ->
1108
            log_debug("connecting to ~0p",
29✔
1109
                      [#{broker_server => NewBrokerServer,
1110
                         service_url => NewBrokerServiceURL}], State0),
1111
            do_connect(State0);
29✔
1112
        false ->
1113
            %% broker changed; reconnect.
1114
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
1115
                      [ #{ broker_server => OldBrokerServer
1116
                         , proxy_url => OldBrokerServiceURL
1117
                         }
1118
                      , #{ broker_server => NewBrokerServer
1119
                         , proxy_url => NewBrokerServiceURL
1120
                         }
1121
                      ],
1122
                      State0),
1123
            try_close_socket(State0),
×
1124
            State = State0#{
×
1125
                broker_server := NewBrokerServer,
1126
                proxy_to_broker_url := NewBrokerServiceURL
1127
            },
1128
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1129
    end.
1130

1131
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1132
notify_state_change(undefined, _ProducerState) ->
1133
    ok;
×
1134
notify_state_change({Fun, Args}, ProducerState) ->
1135
    _ = apply(Fun, [ProducerState | Args]),
90✔
1136
    ok.
90✔
1137

1138
replayq_ack(_Q, undefined) -> ok;
6✔
1139
replayq_ack(Q, QAckRef) -> ok = replayq:ack(Q, QAckRef).
651✔
1140

1141
-ifdef(TEST).
1142
-include_lib("eunit/include/eunit.hrl").
1143

1144
maybe_log_discard_test_() ->
1145
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1146
    , {"fake-last-old",
1147
       fun() ->
1148
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1149
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1150
                                      , count_since_last_log => 2
1151
                                      , total_count => 2
1152
                                      }),
1153
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1154
         Stats = get_overflow_log_state(),
1✔
1155
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1156
         %% greater than the minimum interval because we just logged
1157
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1158
         ok
1✔
1159
       end}
1160
    , {"fake-last-fresh",
1161
       fun() ->
1162
         Inst0 = now_ts(),
1✔
1163
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1164
                                      , count_since_last_log => 2
1165
                                      , total_count => 2
1166
                                      }),
1167
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1168
         Stats = get_overflow_log_state(),
1✔
1169
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1170
         %% less than the minimum interval because we didn't log and just accumulated
1171
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1172
         ok
1✔
1173
       end}
1174
    ].
1175
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc