• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 843

03 Oct 2025 07:14PM UTC coverage: 72.247% (+0.07%) from 72.182%
843

push

github

web-flow
Merge pull request #78 from emqx/251003-fix-compile-warning

chore: fix format_status deprecation warnings with OTP_RELEASE macro

984 of 1362 relevant lines covered (72.25%)

259.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.14
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        ]).
41

42
-if(?OTP_RELEASE >= 25).
43
-export([format_status/1]).
44
-else.
45
-export([format_status/2]).
46
-endif.
47

48
%% replayq API
49
-export([ queue_item_sizer/1
50
        , queue_item_marshaller/1
51
        ]).
52

53
-export([handle_response/2]).
54

55
%% for testing only
56
-ifdef(TEST).
57
-export([make_queue_item/2]).
58
-endif.
59

60
-type statem() :: idle | connecting | connected.
61
-type sequence_id() :: integer().
62
-type send_receipt() :: #{ sequence_id := sequence_id()
63
                         , producer_id := integer()
64
                         , highest_sequence_id := sequence_id()
65
                         , message_id := map()
66
                         , any() => term()
67
                         }.
68
-type timestamp() :: integer().
69
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
70
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
71
-type config() :: #{ replayq_dir := string()
72
                   , replayq_max_total_bytes => pos_integer()
73
                   , replayq_seg_bytes => pos_integer()
74
                   , replayq_offload_mode => boolean()
75
                   , max_batch_bytes => pos_integer()
76
                   , producer_name => atom()
77
                   , clientid => atom()
78
                   , callback => callback()
79
                   , batch_size => non_neg_integer()
80
                   , drop_if_high_mem => boolean()
81
                   , max_inflight => pos_integer()
82
                   , retention_period => timeout()
83
                   }.
84
-export_type([ config/0
85
             ]).
86

87
-define(RECONNECT_TIMEOUT, 5_000).
88
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
89
-define(GET_ALIVE_PULSAR_URL, 5_000).
90

91
-define(MAX_REQ_ID, 4294836225).
92
-define(MAX_SEQ_ID, 18445618199572250625).
93

94
-define(DEFAULT_MAX_INFLIGHT, 10).
95

96
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
97
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
98
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
99
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
100
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
101
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined,
102
                                                                    sock_pid := undefined},
103
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
104
-define(buffer_overflow_discarded, buffer_overflow_discarded).
105
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
106
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
107
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
108

109
%% poorman's error handling.
110
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
111
-define(POORMAN(SOCK, EXPR),
112
        case (EXPR) of
113
            ok ->
114
                ok;
115
            {error, Reason} ->
116
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
117
                ok
118
        end).
119

120
%% Calls/Casts/Infos
121
-record(maybe_send_to_pulsar, {}).
122

123
-type state_observer_callback() :: {function(), [term()]}.
124
-type state() :: #{
125
    batch_size := non_neg_integer(),
126
    broker_server := {binary(), pos_integer()},
127
    callback := undefined | mfa() | fun((map()) -> ok),
128
    clientid := atom(),
129
    drop_if_high_mem := boolean(),
130
    inflight_calls := non_neg_integer(),
131
    lookup_topic_request_ref := reference() | undefined,
132
    max_inflight := pos_integer(),
133
    opts := map(),
134
    parent_pid := undefined | pid(),
135
    partitiontopic := string(),
136
    producer_id := integer(),
137
    producer_name := atom(),
138
    proxy_to_broker_url := undefined | string(),
139
    replayq := replayq:q(),
140
    replayq_offload_mode := boolean(),
141
    request_id := integer(),
142
    requests := #{sequence_id() =>
143
                      ?INFLIGHT_REQ(
144
                         replayq:ack_ref(),
145
                         [{gen_statem:from() | undefined,
146
                           {timestamp(), [pulsar:message()]}}],
147
                         _BatchSize :: non_neg_integer()
148
                        )},
149
    sequence_id := sequence_id(),
150
    state_observer_callback := undefined | state_observer_callback(),
151
    sock := undefined | port(),
152
    sock_pid := undefined | pid(),
153
    telemetry_metadata := map()
154
}.
155
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
156
-type per_request_callback() :: {function(), [term()]}.
157
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
158
-type send_opts() :: #{callback_fn => per_request_callback()}.
159
-export_type([send_opts/0]).
160

161
callback_mode() -> [state_functions, state_enter].
22✔
162

163
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
164
    SpawnOpts = [{spawn_opt, [{message_queue_data, off_heap}]}],
22✔
165
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, SpawnOpts).
22✔
166

167
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
168
send(Pid, Messages) ->
169
    send(Pid, Messages, _Opts = #{}).
×
170

171
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
172
send(Pid, Messages, Opts) ->
173
    From = case maps:get(callback_fn, Opts, undefined) of
1,816✔
174
               undefined -> undefined;
1,813✔
175
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
176
           end,
177
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,816✔
178
    {ok, Pid}.
1,816✔
179

180
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
181
          {ok, send_receipt()}
182
        | {error, producer_connecting
183
                | producer_disconnected
184
                | term()}.
185
send_sync(Pid, Messages) ->
186
    send_sync(Pid, Messages, 5_000).
×
187

188
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
189
          {ok, send_receipt()}
190
        | {error, producer_connecting
191
                | producer_disconnected
192
                | term()}.
193
send_sync(Pid, Messages, Timeout) ->
194
    Caller = self(),
6✔
195
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
6✔
196
    %% Mimicking gen_statem's From, so the reply can be sent with
197
    %% `gen_statem:reply/2'
198
    From = {Caller, MRef},
6✔
199
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
6✔
200
    receive
6✔
201
        {MRef, Response} ->
202
            erlang:demonitor(MRef, [flush]),
6✔
203
            Response;
6✔
204
        {'DOWN', MRef, process, Pid, Reason} ->
205
            error({producer_down, Reason})
×
206
    after
207
        Timeout ->
208
            erlang:demonitor(MRef, [flush]),
×
209
            receive
×
210
                {MRef, Response} ->
211
                    Response
×
212
            after
213
                0 ->
214
                    error(timeout)
×
215
            end
216
    end.
217

218
-spec get_state(pid()) -> statem().
219
get_state(Pid) ->
220
    gen_statem:call(Pid, get_state, 5_000).
2✔
221

222
%%--------------------------------------------------------------------
223
%% gen_statem callback
224
%%--------------------------------------------------------------------
225

226
-spec init({string(), string(), string() | undefined, config()}) ->
227
          gen_statem:init_result(statem(), state()).
228
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
229
    process_flag(trap_exit, true),
22✔
230
    pulsar_utils:set_label({?MODULE, PartitionTopic}),
22✔
231
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
22✔
232
    ProducerID = maps:get(producer_id, ProducerOpts0),
22✔
233
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
22✔
234
    ReplayqCfg0 =
22✔
235
        case maps:get(replayq_dir, ProducerOpts0, false) of
236
            false ->
237
                #{mem_only => true};
15✔
238
            BaseDir ->
239
                PartitionTopicPath = escape(PartitionTopic),
7✔
240
                Dir = filename:join([BaseDir, PartitionTopicPath]),
7✔
241
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
7✔
242
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
7✔
243
        end,
244
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
22✔
245
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
22✔
246
    ReplayqCfg =
22✔
247
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
248
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
249
                    , max_total_bytes => MaxTotalBytes
250
                    },
251
    Q = replayq:open(ReplayqCfg),
22✔
252
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
22✔
253
    %% drop replayq options, now that it's open.
254
    DropIfHighMem = maps:get(drop_if_high_mem, ProducerOpts1, false),
22✔
255
    MaxInflight = maps:get(max_inflight, ProducerOpts1, ?DEFAULT_MAX_INFLIGHT),
22✔
256
    ProducerOpts = maps:without([ replayq_dir
22✔
257
                                , replayq_seg_bytes
258
                                , replayq_offload_mode
259
                                , replayq_max_total_bytes
260
                                , drop_if_high_mem
261
                                , max_inflight
262
                                ],
263
                                ProducerOpts1),
264
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
22✔
265
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
22✔
266
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
22✔
267
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
22✔
268
    State = #{
22✔
269
        batch_size => maps:get(batch_size, ProducerOpts, 0),
270
        broker_server => BrokerServer,
271
        callback => maps:get(callback, ProducerOpts, undefined),
272
        clientid => maps:get(clientid, ProducerOpts),
273
        drop_if_high_mem => DropIfHighMem,
274
        inflight_calls => 0,
275
        lookup_topic_request_ref => undefined,
276
        max_inflight => MaxInflight,
277
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
278
        parent_pid => ParentPid,
279
        partitiontopic => PartitionTopic,
280
        producer_id => ProducerID,
281
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
282
        proxy_to_broker_url => ProxyToBrokerUrl,
283
        replayq => Q,
284
        replayq_offload_mode => Offload,
285
        request_id => 1,
286
        requests => #{},
287
        sequence_id => 1,
288
        state_observer_callback => StateObserverCallback,
289
        sock => undefined,
290
        sock_pid => undefined,
291
        telemetry_metadata => TelemetryMetadata
292
    },
293
    pulsar_metrics:inflight_set(State, 0),
22✔
294
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
22✔
295
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
22✔
296
    {ok, idle, State, [{next_event, internal, do_connect}]}.
22✔
297

298
%% idle state
299
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
300
          handler_result().
301
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
302
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
29✔
303
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
29✔
304
    keep_state_and_data;
29✔
305
idle(internal, do_connect, State) ->
306
    refresh_urls_and_connect(State);
22✔
307
idle(state_timeout, do_connect, State) ->
308
    refresh_urls_and_connect(State);
10✔
309
idle(state_timeout, lookup_topic_timeout, State0) ->
310
    log_error("timed out waiting for lookup topic response", [], State0),
3✔
311
    State = State0#{lookup_topic_request_ref := undefined},
3✔
312
    ?NEXT_STATE_IDLE_RECONNECT(State);
3✔
313
idle({call, From}, get_state, _State) ->
314
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
315
idle({call, From}, _EventContent, _State) ->
316
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
317
idle(cast, _EventContent, _State) ->
318
    keep_state_and_data;
×
319
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
320
    State = enqueue_send_requests([SendRequest], State0),
658✔
321
    {keep_state, State};
658✔
322
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
323
    State = State0#{lookup_topic_request_ref := undefined},
27✔
324
    erlang:demonitor(Ref, [flush]),
27✔
325
    handle_lookup_topic_reply(Reply, State);
27✔
326
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
327
    {stop, Reason};
×
328
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
329
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
330
    State = State0#{lookup_topic_request_ref := undefined},
×
331
    try_close_socket(State),
×
332
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
333
idle(internal, #maybe_send_to_pulsar{}, _State) ->
334
    %% Stale nudge
335
    keep_state_and_data;
×
336
idle(_EventType, _Event, _State) ->
337
    keep_state_and_data.
6✔
338

339
%% connecting state
340
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
341
          handler_result().
342
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
343
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
344
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
345
    keep_state_and_data;
27✔
346
connecting(state_timeout, do_connect, State) ->
347
    refresh_urls_and_connect(State);
×
348
connecting(state_timeout, lookup_topic_timeout, State0) ->
349
    log_error("timed out waiting for lookup topic response", [], State0),
×
350
    State = State0#{lookup_topic_request_ref := undefined},
×
351
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
352
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
353
    State = enqueue_send_requests([SendRequest], State0),
×
354
    {keep_state, State};
×
355
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
356
    State = State0#{lookup_topic_request_ref := undefined},
×
357
    erlang:demonitor(Ref, [flush]),
×
358
    handle_lookup_topic_reply(Reply, State);
×
359
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
360
    {stop, Reason};
×
361
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
362
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
363
    State = State0#{lookup_topic_request_ref := undefined},
×
364
    try_close_socket(State),
×
365
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
366
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
367
    handle_socket_close(connecting, Sock, Reason, State);
×
368
connecting(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
369
    handle_socket_close(connecting, SockPid, Reason, State);
×
370
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
371
    handle_socket_close(connecting, Sock, closed, State);
×
372
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
373
    handle_socket_close(connecting, Sock, Reason, State);
×
374
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
375
    handle_socket_close(connecting, Sock, Reason, State);
×
376
connecting(internal, #maybe_send_to_pulsar{}, _State) ->
377
    %% Stale nudge
378
    keep_state_and_data;
×
379
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
380
    Cmd = pulsar_protocol_frame:parse(Bin),
54✔
381
    ?MODULE:handle_response(Cmd, State);
54✔
382
connecting(info, Msg, State) ->
383
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
384
    keep_state_and_data;
×
385
connecting({call, From}, get_state, _State) ->
386
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
387
connecting({call, From}, _EventContent, _State) ->
388
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
389
connecting(cast, _EventContent, _State) ->
390
   keep_state_and_data.
×
391

392
%% connected state
393
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
394
          handler_result().
395
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
396
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
397
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
398
    State1 = resend_sent_requests(State0),
27✔
399
    State = maybe_send_to_pulsar(State1),
27✔
400
    {keep_state, State};
27✔
401
connected(state_timeout, do_connect, _State) ->
402
    keep_state_and_data;
×
403
connected(state_timeout, lookup_topic_timeout, State0) ->
404
    log_error("timed out waiting for lookup topic response", [], State0),
×
405
    %% todo: should demonitor reference
406
    State = State0#{lookup_topic_request_ref := undefined},
×
407
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
408
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
409
    ?tp(pulsar_producer_send_req_enter, #{}),
1,164✔
410
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,164✔
411
    State1 = enqueue_send_requests(SendRequests, State0),
1,164✔
412
    State = maybe_send_to_pulsar(State1),
1,164✔
413
    ?tp(pulsar_producer_send_req_exit, #{}),
1,164✔
414
    {keep_state, State};
1,164✔
415
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
416
    State = State0#{lookup_topic_request_ref := undefined},
×
417
    erlang:demonitor(Ref, [flush]),
×
418
    handle_lookup_topic_reply(Reply, State);
×
419
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
420
    {stop, Reason};
×
421
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
422
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
423
    State = State0#{lookup_topic_request_ref := undefined},
×
424
    try_close_socket(State),
×
425
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
426
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
427
    handle_socket_close(connected, Sock, Reason, State);
1✔
428
connected(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
429
    handle_socket_close(connected, SockPid, Reason, State);
×
430
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
431
    handle_socket_close(connected, Sock, closed, State);
8✔
432
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
433
    handle_socket_close(connected, Sock, Reason, State);
×
434
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
435
    handle_socket_close(connected, Sock, Reason, State);
×
436
connected(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
437
    Cmd = pulsar_protocol_frame:parse(Bin),
643✔
438
    ?MODULE:handle_response(Cmd, State);
643✔
439
connected(_EventType, ping, State = #{sock_pid := SockPid}) ->
440
    ok = pulsar_socket_writer:ping_async(SockPid),
6✔
441
    {keep_state, State};
6✔
442
connected(internal, #maybe_send_to_pulsar{}, State0) ->
443
    State = maybe_send_to_pulsar(State0),
634✔
444
    {keep_state, State};
634✔
445
connected({call, From}, get_state, _State) ->
446
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
447
connected({call, From}, _EventContent, _State) ->
448
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
449
connected(cast, _EventContent, _State) ->
450
    keep_state_and_data;
×
451
connected(_EventType, EventContent, State) ->
452
    ?MODULE:handle_response(EventContent, State).
×
453

454
handle_socket_close(StateName, SockPid, Reason, #{sock_pid := SockPid} = State) ->
455
    #{sock := Sock} = State,
×
456
    handle_socket_close(StateName, Sock, Reason, State);
×
457
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
458
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
459
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
460
    try_close_socket(State),
8✔
461
    ?NEXT_STATE_IDLE_RECONNECT(State);
7✔
462
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
463
    %% stale close event
464
    keep_state_and_data.
1✔
465

466
-spec refresh_urls_and_connect(state()) -> handler_result().
467
refresh_urls_and_connect(State0) ->
468
    %% if Pulsar went down and then restarted later, we must issue a
469
    %% LookupTopic command again after reconnecting.
470
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
471
    %% > Topic lookup needs to be performed each time a client needs
472
    %% > to create or reconnect a producer or a consumer. Lookup is used
473
    %% > to discover which particular broker is serving the topic we are
474
    %% > about to use.
475
    %% Simply looking up the topic (even from a distinct connection)
476
    %% will "unblock" the topic so we may send messages to it.  The
477
    %% producer may be started only after that.
478
    #{ clientid := ClientId
32✔
479
     , partitiontopic := PartitionTopic
480
     } = State0,
481
    ?tp(debug, pulsar_producer_refresh_start, #{}),
32✔
482
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
32✔
483
        {ok, LookupTopicRequestRef} ->
484
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
32✔
485
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
32✔
486
    catch
487
        exit:{noproc, _} ->
488
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
489
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
490
    end.
491

492
-spec do_connect(state()) -> handler_result().
493
do_connect(State) ->
494
    #{ broker_server := {Host, Port}
27✔
495
     , opts := Opts
496
     , partitiontopic := PartitionTopic
497
     , proxy_to_broker_url := ProxyToBrokerUrl
498
     } = State,
499
    try pulsar_socket_writer:start_link(PartitionTopic, Host, Port, Opts) of
27✔
500
        {ok, {SockPid, Sock}} ->
501
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
27✔
502
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
27✔
503
            {next_state, connecting, State#{sock := Sock, sock_pid := SockPid}};
27✔
504
        {error, Reason} ->
505
            log_error("error connecting: ~p", [Reason], State),
×
506
            try_close_socket(State),
×
507
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
508
    catch
509
        Kind:Error:Stacktrace ->
510
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
511
            try_close_socket(State),
×
512
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
513
    end.
514

515
-if(?OTP_RELEASE >= 25).
516
format_status(Status) ->
517
    maps:map(
2✔
518
      fun(data, Data0) ->
519
              censor_secrets(Data0);
2✔
520
         (_Key, Value)->
521
              Value
12✔
522
      end,
523
      Status).
524
-else.
525
%% `format_status/2' is deprecated as of OTP 25.0
526
format_status(_Opt, [_PDict, _State0, Data0]) ->
527
    Data = censor_secrets(Data0),
528
    [{data, [{"State", Data}]}].
529
-endif.
530

531
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
532
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
533
censor_secrets(Data) ->
534
    Data.
2✔
535

536
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
537
    ok = replayq:close(Q),
16✔
538
    ok = clear_gauges(State, Q),
16✔
539
    ok.
16✔
540

541
clear_gauges(State, Q) ->
542
    pulsar_metrics:inflight_set(State, 0),
16✔
543
    maybe_reset_queuing(State, Q),
16✔
544
    ok.
16✔
545

546
maybe_reset_queuing(State, Q) ->
547
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
16✔
548
        {0, _} ->
549
            pulsar_metrics:queuing_set(State, 0),
16✔
550
            pulsar_metrics:queuing_bytes_set(State, 0);
16✔
551
        {_, false} ->
552
            pulsar_metrics:queuing_set(State, 0),
×
553
            pulsar_metrics:queuing_bytes_set(State, 0);
×
554
        {_, _} ->
555
            ok
×
556
    end.
557

558
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
559
    false;
×
560
is_replayq_durable(_, Q) ->
561
    not replayq:is_mem_only(Q).
16✔
562

563
-spec handle_response(_EventContent, state()) ->
564
          handler_result().
565
handle_response({connected, _ConnectedData}, State0 = #{
566
        sock := Sock,
567
        opts := Opts,
568
        producer_id := ProducerId,
569
        request_id := RequestId,
570
        partitiontopic := PartitionTopic
571
    }) ->
572
    start_keepalive(),
27✔
573
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
27✔
574
    {keep_state, next_request_id(State0)};
27✔
575
handle_response({producer_success, #{producer_name := ProName}}, State) ->
576
    {next_state, connected, State#{producer_name := ProName}};
27✔
577
handle_response({pong, #{}}, _State) ->
578
    start_keepalive(),
3✔
579
    keep_state_and_data;
3✔
580
handle_response({ping, #{}}, #{sock_pid := SockPid}) ->
581
    ok = pulsar_socket_writer:pong_async(SockPid),
5✔
582
    keep_state_and_data;
5✔
583
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
584
                                                }) ->
585
    log_error("Close producer: ~p~n", [Topic], State),
×
586
    try_close_socket(State),
×
587
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
588
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
589
    #{ callback := Callback
634✔
590
     , inflight_calls := InflightCalls0
591
     , requests := Reqs
592
     , replayq := Q
593
     } = State,
594
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
634✔
595
    case maps:get(SequenceId, Reqs, undefined) of
634✔
596
        undefined ->
597
            _ = invoke_callback(Callback, {ok, Resp}),
×
598
            {keep_state, State};
×
599
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
600
            ok = replayq:ack(Q, QAckRef),
634✔
601
            lists:foreach(
634✔
602
              fun({undefined, {_TS, Messages}}) ->
603
                   BatchLen = length(Messages),
1,805✔
604
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,805✔
605
                   ok;
1,805✔
606
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
607
                   %% No need to count the messages, as we invoke
608
                   %% per-request callbacks once for the whole batch.
609
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
610
                   ok;
1✔
611
                 ({From, {_TS, _Messages}}) ->
612
                   gen_statem:reply(From, {ok, Resp})
6✔
613
              end,
614
              FromsToMessages),
615
            InflightCalls = InflightCalls0 - BatchSize,
634✔
616
            pulsar_metrics:inflight_set(State, InflightCalls),
634✔
617
            Actions = [{next_event, internal, #maybe_send_to_pulsar{}}],
634✔
618
            NewState = State#{ requests := maps:remove(SequenceId, Reqs)
634✔
619
                             , inflight_calls := InflightCalls
620
                             },
621
            {keep_state, NewState, Actions}
634✔
622
    end;
623
handle_response({error, #{error := Error, message := Msg}}, State) ->
624
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
625
    try_close_socket(State),
×
626
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
627
handle_response(Msg, State) ->
628
    log_error("Receive unknown message:~p~n", [Msg], State),
×
629
    keep_state_and_data.
×
630

631
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
632
send_batch_payload(Messages, SequenceId, #{
633
            partitiontopic := Topic,
634
            producer_id := ProducerId,
635
            producer_name := ProducerName,
636
            sock_pid := SockPid,
637
            opts := Opts
638
        }) ->
639
    pulsar_socket_writer:send_batch_async(SockPid, Topic, Messages, SequenceId,
799✔
640
                                          ProducerId, ProducerName, Opts).
641

642
start_keepalive() ->
643
    erlang:send_after(30_000, self(), ping).
30✔
644

645
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
646
    State#{request_id := 1};
×
647
next_request_id(State = #{request_id := RequestId}) ->
648
    State#{request_id := RequestId + 1}.
27✔
649

650
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
651
    State#{sequence_id := 1};
×
652
next_sequence_id(State = #{sequence_id := SequenceId}) ->
653
    State#{sequence_id := SequenceId + 1}.
788✔
654

655
-spec log_debug(string(), [term()], state()) -> ok.
656
log_debug(Fmt, Args, State) ->
657
    do_log(debug, Fmt, Args, State).
54✔
658

659
-spec log_info(string(), [term()], state()) -> ok.
660
log_info(Fmt, Args, State) ->
661
    do_log(info, Fmt, Args, State).
×
662

663
-spec log_warn(string(), [term()], state()) -> ok.
664
log_warn(Fmt, Args, State) ->
665
    do_log(warning, Fmt, Args, State).
2✔
666

667
-spec log_error(string(), [term()], state()) -> ok.
668
log_error(Fmt, Args, State) ->
669
    do_log(error, Fmt, Args, State).
11✔
670

671
-spec do_log(atom(), string(), [term()], state()) -> ok.
672
do_log(Level, Format, Args, State) ->
673
    #{partitiontopic := PartitionTopic} = State,
67✔
674
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
67✔
675
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
676

677
-spec invoke_callback(callback(), callback_input()) -> ok.
678
invoke_callback(Callback, Resp) ->
679
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
680

681
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
682
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
683
    ok;
×
684
invoke_callback({M, F, A}, Resp, BatchLen) ->
685
    lists:foreach(
1,812✔
686
      fun(_) ->
687
        erlang:apply(M, F, [Resp] ++ A)
2,259✔
688
      end,  lists:seq(1, BatchLen));
689
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
690
    lists:foreach(
×
691
      fun(_) ->
692
        Callback(Resp)
×
693
      end,  lists:seq(1, BatchLen));
694
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
695
    %% for per-request callbacks, we invoke it only once, regardless
696
    %% of how many messages were sent.
697
    apply(Fn, Args ++ [Resp]).
3✔
698

699
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
700
    erlang:external_size(Item).
2,125✔
701

702
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
703
  term_to_binary(I);
508✔
704
queue_item_marshaller(Bin) when is_binary(Bin) ->
705
  case binary_to_term(Bin) of
304✔
706
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
707
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
708
              true ->
709
                  Item;
1✔
710
              false ->
711
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
712
          end;
713
      Item ->
714
          Item
302✔
715
  end.
716

717
now_ts() ->
718
    erlang:system_time(millisecond).
2,649✔
719

720
make_queue_item(From, Messages) ->
721
    ?Q_ITEM(From, now_ts(), Messages).
1,824✔
722

723
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
724
    #{drop_if_high_mem := DropIfHighMem} = State,
1,822✔
725
    QItems = lists:map(
1,822✔
726
               fun(?SEND_REQ(From, Messages)) ->
727
                 make_queue_item(From, Messages)
1,822✔
728
               end,
729
               Requests),
730
    BytesBefore = replayq:bytes(Q),
1,822✔
731
    NewQ = replayq:append(Q, QItems),
1,822✔
732
    BytesAfter = replayq:bytes(NewQ),
1,822✔
733
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,822✔
734
    pulsar_metrics:queuing_bytes_set(State, BytesAfter),
1,822✔
735
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,822✔
736
    Overflow0 = replayq:overflow(NewQ),
1,822✔
737
    IsHighMemOverflow =
1,822✔
738
        DropIfHighMem
1,822✔
739
        andalso replayq:is_mem_only(NewQ)
×
740
        andalso load_ctl:is_high_mem(),
×
741
    Overflow = case IsHighMemOverflow of
1,822✔
742
        true ->
743
            max(Overflow0, BytesAfter - BytesBefore);
×
744
        false ->
745
            Overflow0
1,822✔
746
    end,
747
    handle_overflow(State#{replayq := NewQ}, IsHighMemOverflow, Overflow).
1,822✔
748

749
-spec handle_overflow(state(), _IsHighMemOverflow :: boolean(), _Overflow :: integer()) -> state().
750
handle_overflow(State, _IsHighMemOverflow, Overflow) when Overflow =< 0 ->
751
    %% no overflow
752
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,818✔
753
    State;
1,818✔
754
handle_overflow(State0 = #{replayq := Q, callback := Callback}, IsHighMemOverflow, Overflow) ->
755
    BytesMode = case IsHighMemOverflow of
4✔
756
        true -> at_least;
×
757
        false -> at_most
4✔
758
    end,
759
    {NewQ, QAckRef, Items0} =
4✔
760
        replayq:pop(Q, #{bytes_limit => {BytesMode, Overflow}, count_limit => 999999999}),
761
    ok = replayq:ack(NewQ, QAckRef),
4✔
762
    maybe_log_discard(State0, length(Items0)),
4✔
763
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
764
    reply_with_error(Items, Callback, {error, overflow}),
4✔
765
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
766
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
767
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
768
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
769
    State0#{replayq := NewQ}.
4✔
770

771
maybe_log_discard(State, Increment) ->
772
    Last = get_overflow_log_state(),
1,825✔
773
    #{ count_since_last_log := CountLast
1,825✔
774
     , total_count := TotalCount
775
     } = Last,
776
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,825✔
777
        true -> %% no change
778
            ok;
1,819✔
779
        false ->
780
            maybe_log_discard(State, Increment, Last)
6✔
781
    end.
782

783
-spec maybe_log_discard(
784
        state(),
785
        non_neg_integer(),
786
        #{ last_log_inst => non_neg_integer()
787
         , count_since_last_log => non_neg_integer()
788
         , total_count => non_neg_integer()
789
         }) -> ok.
790
maybe_log_discard(State,
791
                  Increment,
792
                  #{ last_log_inst := LastInst
793
                   , count_since_last_log := CountLast
794
                   , total_count := TotalCount
795
                   }) ->
796
    NowInst = now_ts(),
6✔
797
    NewTotalCount = TotalCount + Increment,
6✔
798
    Delta = NewTotalCount - CountLast,
6✔
799
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
800
        true ->
801
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
802
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
803
                                    , count_since_last_log => NewTotalCount
804
                                    , total_count => NewTotalCount
805
                                    });
806
        false ->
807
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
808
                                    , count_since_last_log => CountLast
809
                                    , total_count => NewTotalCount
810
                                    })
811
    end.
812

813
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
814
                                   , count_since_last_log => non_neg_integer()
815
                                   , total_count => non_neg_integer()
816
                                   }.
817
get_overflow_log_state() ->
818
    case get(?buffer_overflow_discarded) of
1,827✔
819
        undefined ->
820
            #{ last_log_inst => 0
1,820✔
821
             , count_since_last_log => 0
822
             , total_count => 0
823
             };
824
        Stats = #{} ->
825
            Stats
7✔
826
    end.
827

828
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
829
                              , count_since_last_log => non_neg_integer()
830
                              , total_count => non_neg_integer()
831
                              }) -> ok.
832
put_overflow_log_state(#{ last_log_inst := _LastInst
833
                        , count_since_last_log := _CountLast
834
                        , total_count := _TotalCount
835
                        } = Stats) ->
836
    put(?buffer_overflow_discarded, Stats),
8✔
837
    ok.
8✔
838

839
maybe_send_to_pulsar(State) ->
840
    #{ replayq := Q
2,615✔
841
     , requests := Requests
842
     , max_inflight := MaxInflight
843
     } = State,
844
    HasQueued = replayq:count(Q) /= 0,
2,615✔
845
    HasAvailableInflight = map_size(Requests) < MaxInflight,
2,615✔
846
    case HasQueued andalso HasAvailableInflight of
2,615✔
847
        true ->
848
            do_send_to_pulsar(State);
790✔
849
        false ->
850
            State
1,825✔
851
    end.
852

853
do_send_to_pulsar(State0) ->
854
    #{ batch_size := BatchSize
790✔
855
     , inflight_calls := InflightCalls0
856
     , sequence_id := SequenceId
857
     , requests := Requests0
858
     , replayq := Q
859
     , opts := ProducerOpts
860
     } = State0,
861
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
790✔
862
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
790✔
863
                                             , bytes_limit => MaxBatchBytes
864
                                             }),
865
    State1 = State0#{replayq := NewQ},
790✔
866
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
790✔
867
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
790✔
868
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
790✔
869
    Now = now_ts(),
790✔
870
    {Expired, FromsToMessages} =
790✔
871
       lists:foldr(
872
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
873
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,970✔
874
             true ->
875
               {[{From, Msgs} | Expired], Acc};
4✔
876
             false ->
877
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,966✔
878
           end
879
         end,
880
         {[], []},
881
         Items),
882
    reply_expired_messages(Expired, State1),
790✔
883
    pulsar_metrics:dropped_inc(State1, length(Expired)),
790✔
884
    case FromsToMessages of
790✔
885
        [] ->
886
            %% all expired, immediately ack replayq batch and continue
887
            ok = replayq:ack(Q, QAckRef),
2✔
888
            maybe_send_to_pulsar(State1);
2✔
889
        [_ | _] ->
890
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
788✔
891
                                     FromsToMessages,
788✔
892
                                 Msg <- Msgs],
1,966✔
893
            FinalBatchSize = length(FinalBatch),
788✔
894
            send_batch_payload(FinalBatch, SequenceId, State0),
788✔
895
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
788✔
896
            InflightCalls = InflightCalls0 + FinalBatchSize,
788✔
897
            pulsar_metrics:inflight_set(State1, InflightCalls),
788✔
898
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
788✔
899
            State = next_sequence_id(State2),
788✔
900
            maybe_send_to_pulsar(State)
788✔
901
    end.
902

903
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
904
                               [pulsar:message()]}],
905
                             state()) -> ok.
906
reply_expired_messages(Expired, #{callback := Callback}) ->
907
    reply_with_error(Expired, Callback, {error, expired}).
791✔
908

909
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
910
                         [pulsar:message()]}],
911
                       callback(), {error, expired | overflow}) -> ok.
912
reply_with_error(Items, Callback, Error) ->
913
    lists:foreach(
795✔
914
      fun({undefined, Msgs}) ->
915
              invoke_callback(Callback, Error, length(Msgs));
7✔
916
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
917
              %% No need to count the messages, as we invoke
918
              %% per-request callbacks once for the whole batch.
919
              invoke_callback({Fn, Args}, Error);
2✔
920
         ({From, _Msgs}) ->
921
              gen_statem:reply(From, Error)
×
922
      end,
923
      Items).
924

925
collect_send_requests(Acc, Limit) ->
926
    Count = length(Acc),
1,164✔
927
    do_collect_send_requests(Acc, Count, Limit).
1,164✔
928

929
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
930
    lists:reverse(Acc);
×
931
do_collect_send_requests(Acc, Count, Limit) ->
932
    receive
1,164✔
933
        ?SEND_REQ(_, _) = Req ->
934
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
935
    after
936
        0 ->
937
            lists:reverse(Acc)
1,164✔
938
    end.
939

940
try_close_socket(#{sock := undefined}) ->
941
    ok;
×
942
try_close_socket(#{sock := Sock, sock_pid := SockPid, opts := Opts}) ->
943
    %% N.B.: it's important to first close the socket and then terminate the writer
944
    %% process.  The writer may be blocked in a `send' call, and closing the socket first
945
    %% will make the call return `einval' immediately, allowing us to terminate it (it
946
    %% also terminates itself on such `send' errors, but we make sure here).
947
    _ = pulsar_socket:close(Sock, Opts),
8✔
948
    ok = pulsar_socket_writer:stop(SockPid),
8✔
949
    ok.
7✔
950

951
resend_sent_requests(State) ->
952
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
27✔
953
    #{ inflight_calls := InflightCalls0
27✔
954
     , requests := Requests0
955
     , replayq := Q
956
     , opts := ProducerOpts
957
     } = State,
958
    Now = now_ts(),
27✔
959
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
27✔
960
    Requests1 = lists:keysort(1, maps:to_list(Requests0)),
27✔
961
    {Requests, Dropped} =
27✔
962
        lists:foldl(
963
          fun({SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize)}, {AccIn, DroppedAcc}) ->
964
               {Messages, Expired} =
12✔
965
                  lists:partition(
966
                    fun({_From, {Ts, _Msgs}}) ->
967
                      not is_batch_expired(Ts, RetentionPeriod, Now)
12✔
968
                    end,
969
                    FromsToMessages),
970
               lists:foreach(
12✔
971
                 fun({From, {_Ts, Msgs}}) ->
972
                   reply_expired_messages([{From, Msgs}], State)
1✔
973
                 end,
974
                 Expired),
975
               Dropped = length(Expired),
12✔
976
               Acc = case Messages of
12✔
977
                   [] ->
978
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
979
                       ok = replayq:ack(Q, QAckRef),
1✔
980
                       AccIn;
1✔
981
                   [_ | _] ->
982
                       send_batch_payload([Msg || {_From, {_Ts, Msgs}} <- Messages,
11✔
983
                                                  Msg <- Msgs],
11✔
984
                                          SequenceId, State),
985
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(Messages))}
11✔
986
               end,
987
               {Acc, DroppedAcc + Dropped}
12✔
988
          end,
989
          {#{}, 0},
990
          Requests1),
991
    InflightCalls = InflightCalls0 - Dropped,
27✔
992
    pulsar_metrics:dropped_inc(State, Dropped),
27✔
993
    pulsar_metrics:inflight_set(State, InflightCalls),
27✔
994
    State#{requests := Requests, inflight_calls := InflightCalls}.
27✔
995

996
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
997
    false;
1,973✔
998
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
999
    Timestamp =< Now - RetentionPeriod.
9✔
1000

1001
-spec escape(string()) -> binary().
1002
escape(Str) ->
1003
    NormalizedStr = unicode:characters_to_nfd_list(Str),
7✔
1004
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
7✔
1005

1006
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
1007
handle_lookup_topic_reply({error, Error}, State) ->
1008
    log_error("error looking up topic: ~0p", [Error], State),
×
1009
    try_close_socket(State),
×
1010
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
1011
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
1012
                                , brokerServiceUrl := NewBrokerServiceURL
1013
                                }}, State0) ->
1014
    #{clientid := ClientId} = State0,
27✔
1015
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
27✔
1016
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
27✔
1017
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
27✔
1018
        {ok, AlivePulsarURL} ->
1019
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
27✔
1020
                           , alive_pulsar_url => AlivePulsarURL
1021
                           }, State0);
1022
        {error, Reason} ->
1023
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
1024
            try_close_socket(State0),
×
1025
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
1026
    catch
1027
        exit:{noproc, _} ->
1028
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
1029
            try_close_socket(State0),
×
1030
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
1031
        exit:{timeout, _} ->
1032
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
×
1033
            try_close_socket(State0),
×
1034
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
1035
    end;
1036
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
1037
                                , brokerServiceUrl := NewBrokerServiceURL
1038
                                }},
1039
                         State) ->
1040
    log_debug("received topic lookup reply: ~0p",
×
1041
              [#{proxy_through_service_url => false,
1042
                 broker_service_url => NewBrokerServiceURL}], State),
1043
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
1044
                   , broker_service_url => undefined
1045
                   }, State).
1046

1047
-spec maybe_connect(#{ alive_pulsar_url := string()
1048
                     , broker_service_url := string() | undefined
1049
                     }, state()) -> handler_result().
1050
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
1051
               , alive_pulsar_url := AlivePulsarURL
1052
               }, State0) ->
1053
    #{ broker_server := OldBrokerServer
27✔
1054
     , proxy_to_broker_url := OldBrokerServiceURL
1055
     } = State0,
1056
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
27✔
1057
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
27✔
1058
        true ->
1059
            log_debug("connecting to ~0p",
27✔
1060
                      [#{broker_server => NewBrokerServer,
1061
                         service_url => NewBrokerServiceURL}], State0),
1062
            do_connect(State0);
27✔
1063
        false ->
1064
            %% broker changed; reconnect.
1065
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
1066
                      [ #{ broker_server => OldBrokerServer
1067
                         , proxy_url => OldBrokerServiceURL
1068
                         }
1069
                      , #{ broker_server => NewBrokerServer
1070
                         , proxy_url => NewBrokerServiceURL
1071
                         }
1072
                      ],
1073
                      State0),
1074
            try_close_socket(State0),
×
1075
            State = State0#{
×
1076
                broker_server := NewBrokerServer,
1077
                proxy_to_broker_url := NewBrokerServiceURL
1078
            },
1079
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1080
    end.
1081

1082
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1083
notify_state_change(undefined, _ProducerState) ->
1084
    ok;
×
1085
notify_state_change({Fun, Args}, ProducerState) ->
1086
    _ = apply(Fun, [ProducerState | Args]),
83✔
1087
    ok.
83✔
1088

1089
-ifdef(TEST).
1090
-include_lib("eunit/include/eunit.hrl").
1091

1092
maybe_log_discard_test_() ->
1093
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1094
    , {"fake-last-old",
1095
       fun() ->
1096
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1097
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1098
                                      , count_since_last_log => 2
1099
                                      , total_count => 2
1100
                                      }),
1101
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1102
         Stats = get_overflow_log_state(),
1✔
1103
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1104
         %% greater than the minimum interval because we just logged
1105
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1106
         ok
1✔
1107
       end}
1108
    , {"fake-last-fresh",
1109
       fun() ->
1110
         Inst0 = now_ts(),
1✔
1111
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1112
                                      , count_since_last_log => 2
1113
                                      , total_count => 2
1114
                                      }),
1115
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1116
         Stats = get_overflow_log_state(),
1✔
1117
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1118
         %% less than the minimum interval because we didn't log and just accumulated
1119
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1120
         ok
1✔
1121
       end}
1122
    ].
1123
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc