• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 833

28 Feb 2025 12:48PM UTC coverage: 72.601% (+0.3%) from 72.261%
833

push

github

web-flow
Merge pull request #75 from thalesmg/20250227-oom-drop

fix(producer): handle `drop_if_high_mem` option

74 of 94 new or added lines in 4 files covered. (78.72%)

4 existing lines in 2 files now uncovered.

991 of 1365 relevant lines covered (72.6%)

261.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.22
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        , format_status/1
41
        , format_status/2
42
        ]).
43

44
%% replayq API
45
-export([ queue_item_sizer/1
46
        , queue_item_marshaller/1
47
        ]).
48

49
-export([handle_response/2]).
50

51
%% for testing only
52
-ifdef(TEST).
53
-export([make_queue_item/2]).
54
-endif.
55

56
-type statem() :: idle | connecting | connected.
57
-type sequence_id() :: integer().
58
-type send_receipt() :: #{ sequence_id := sequence_id()
59
                         , producer_id := integer()
60
                         , highest_sequence_id := sequence_id()
61
                         , message_id := map()
62
                         , any() => term()
63
                         }.
64
-type timestamp() :: integer().
65
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
66
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
67
-type config() :: #{ replayq_dir := string()
68
                   , replayq_max_total_bytes => pos_integer()
69
                   , replayq_seg_bytes => pos_integer()
70
                   , replayq_offload_mode => boolean()
71
                   , max_batch_bytes => pos_integer()
72
                   , producer_name => atom()
73
                   , clientid => atom()
74
                   , callback => callback()
75
                   , batch_size => non_neg_integer()
76
                   , drop_if_high_mem => boolean()
77
                   , max_inflight => pos_integer()
78
                   , retention_period => timeout()
79
                   }.
80
-export_type([ config/0
81
             ]).
82

83
-define(RECONNECT_TIMEOUT, 5_000).
84
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
85
-define(GET_ALIVE_PULSAR_URL, 5_000).
86

87
-define(MAX_REQ_ID, 4294836225).
88
-define(MAX_SEQ_ID, 18445618199572250625).
89

90
-define(DEFAULT_MAX_INFLIGHT, 10).
91

92
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
93
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
94
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
95
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
96
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
97
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined,
98
                                                                    sock_pid := undefined},
99
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
100
-define(buffer_overflow_discarded, buffer_overflow_discarded).
101
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
102
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
103
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
104

105
%% poorman's error handling.
106
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
107
-define(POORMAN(SOCK, EXPR),
108
        case (EXPR) of
109
            ok ->
110
                ok;
111
            {error, Reason} ->
112
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
113
                ok
114
        end).
115

116
%% Calls/Casts/Infos
117
-record(maybe_send_to_pulsar, {}).
118

119
-type state_observer_callback() :: {function(), [term()]}.
120
-type state() :: #{
121
    batch_size := non_neg_integer(),
122
    broker_server := {binary(), pos_integer()},
123
    callback := undefined | mfa() | fun((map()) -> ok),
124
    clientid := atom(),
125
    drop_if_high_mem := boolean(),
126
    inflight_calls := non_neg_integer(),
127
    lookup_topic_request_ref := reference() | undefined,
128
    max_inflight := pos_integer(),
129
    opts := map(),
130
    parent_pid := undefined | pid(),
131
    partitiontopic := string(),
132
    producer_id := integer(),
133
    producer_name := atom(),
134
    proxy_to_broker_url := undefined | string(),
135
    replayq := replayq:q(),
136
    replayq_offload_mode := boolean(),
137
    request_id := integer(),
138
    requests := #{sequence_id() =>
139
                      ?INFLIGHT_REQ(
140
                         replayq:ack_ref(),
141
                         [{gen_statem:from() | undefined,
142
                           {timestamp(), [pulsar:message()]}}],
143
                         _BatchSize :: non_neg_integer()
144
                        )},
145
    sequence_id := sequence_id(),
146
    state_observer_callback := undefined | state_observer_callback(),
147
    sock := undefined | port(),
148
    sock_pid := undefined | pid(),
149
    telemetry_metadata := map()
150
}.
151
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
152
-type per_request_callback() :: {function(), [term()]}.
153
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
154
-type send_opts() :: #{callback_fn => per_request_callback()}.
155
-export_type([send_opts/0]).
156

157
callback_mode() -> [state_functions, state_enter].
22✔
158

159
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
160
    SpawnOpts = [{spawn_opt, [{message_queue_data, off_heap}]}],
22✔
161
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, SpawnOpts).
22✔
162

163
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
164
send(Pid, Messages) ->
165
    send(Pid, Messages, _Opts = #{}).
×
166

167
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
168
send(Pid, Messages, Opts) ->
169
    From = case maps:get(callback_fn, Opts, undefined) of
1,816✔
170
               undefined -> undefined;
1,813✔
171
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
172
           end,
173
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,816✔
174
    {ok, Pid}.
1,816✔
175

176
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
177
          {ok, send_receipt()}
178
        | {error, producer_connecting
179
                | producer_disconnected
180
                | term()}.
181
send_sync(Pid, Messages) ->
182
    send_sync(Pid, Messages, 5_000).
×
183

184
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
185
          {ok, send_receipt()}
186
        | {error, producer_connecting
187
                | producer_disconnected
188
                | term()}.
189
send_sync(Pid, Messages, Timeout) ->
190
    Caller = self(),
6✔
191
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
6✔
192
    %% Mimicking gen_statem's From, so the reply can be sent with
193
    %% `gen_statem:reply/2'
194
    From = {Caller, MRef},
6✔
195
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
6✔
196
    receive
6✔
197
        {MRef, Response} ->
198
            erlang:demonitor(MRef, [flush]),
6✔
199
            Response;
6✔
200
        {'DOWN', MRef, process, Pid, Reason} ->
201
            error({producer_down, Reason})
×
202
    after
203
        Timeout ->
204
            erlang:demonitor(MRef, [flush]),
×
205
            receive
×
206
                {MRef, Response} ->
207
                    Response
×
208
            after
209
                0 ->
210
                    error(timeout)
×
211
            end
212
    end.
213

214
-spec get_state(pid()) -> statem().
215
get_state(Pid) ->
216
    gen_statem:call(Pid, get_state, 5_000).
2✔
217

218
%%--------------------------------------------------------------------
219
%% gen_statem callback
220
%%--------------------------------------------------------------------
221

222
-spec init({string(), string(), string() | undefined, config()}) ->
223
          gen_statem:init_result(statem(), state()).
224
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
225
    process_flag(trap_exit, true),
22✔
226
    pulsar_utils:set_label({?MODULE, PartitionTopic}),
22✔
227
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
22✔
228
    ProducerID = maps:get(producer_id, ProducerOpts0),
22✔
229
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
22✔
230
    ReplayqCfg0 =
22✔
231
        case maps:get(replayq_dir, ProducerOpts0, false) of
232
            false ->
233
                #{mem_only => true};
15✔
234
            BaseDir ->
235
                PartitionTopicPath = escape(PartitionTopic),
7✔
236
                Dir = filename:join([BaseDir, PartitionTopicPath]),
7✔
237
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
7✔
238
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
7✔
239
        end,
240
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
22✔
241
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
22✔
242
    ReplayqCfg =
22✔
243
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
244
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
245
                    , max_total_bytes => MaxTotalBytes
246
                    },
247
    Q = replayq:open(ReplayqCfg),
22✔
248
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
22✔
249
    %% drop replayq options, now that it's open.
250
    DropIfHighMem = maps:get(drop_if_high_mem, ProducerOpts1, false),
22✔
251
    MaxInflight = maps:get(max_inflight, ProducerOpts1, ?DEFAULT_MAX_INFLIGHT),
22✔
252
    ProducerOpts = maps:without([ replayq_dir
22✔
253
                                , replayq_seg_bytes
254
                                , replayq_offload_mode
255
                                , replayq_max_total_bytes
256
                                , drop_if_high_mem
257
                                , max_inflight
258
                                ],
259
                                ProducerOpts1),
260
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
22✔
261
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
22✔
262
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
22✔
263
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
22✔
264
    State = #{
22✔
265
        batch_size => maps:get(batch_size, ProducerOpts, 0),
266
        broker_server => BrokerServer,
267
        callback => maps:get(callback, ProducerOpts, undefined),
268
        clientid => maps:get(clientid, ProducerOpts),
269
        drop_if_high_mem => DropIfHighMem,
270
        inflight_calls => 0,
271
        lookup_topic_request_ref => undefined,
272
        max_inflight => MaxInflight,
273
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
274
        parent_pid => ParentPid,
275
        partitiontopic => PartitionTopic,
276
        producer_id => ProducerID,
277
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
278
        proxy_to_broker_url => ProxyToBrokerUrl,
279
        replayq => Q,
280
        replayq_offload_mode => Offload,
281
        request_id => 1,
282
        requests => #{},
283
        sequence_id => 1,
284
        state_observer_callback => StateObserverCallback,
285
        sock => undefined,
286
        sock_pid => undefined,
287
        telemetry_metadata => TelemetryMetadata
288
    },
289
    pulsar_metrics:inflight_set(State, 0),
22✔
290
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
22✔
291
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
22✔
292
    {ok, idle, State, [{next_event, internal, do_connect}]}.
22✔
293

294
%% idle state
295
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
296
          handler_result().
297
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
298
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
29✔
299
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
29✔
300
    keep_state_and_data;
29✔
301
idle(internal, do_connect, State) ->
302
    refresh_urls_and_connect(State);
22✔
303
idle(state_timeout, do_connect, State) ->
304
    refresh_urls_and_connect(State);
10✔
305
idle(state_timeout, lookup_topic_timeout, State0) ->
306
    log_error("timed out waiting for lookup topic response", [], State0),
3✔
307
    State = State0#{lookup_topic_request_ref := undefined},
3✔
308
    ?NEXT_STATE_IDLE_RECONNECT(State);
3✔
309
idle({call, From}, get_state, _State) ->
310
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
311
idle({call, From}, _EventContent, _State) ->
312
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
313
idle(cast, _EventContent, _State) ->
314
    keep_state_and_data;
×
315
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
316
    State = enqueue_send_requests([SendRequest], State0),
658✔
317
    {keep_state, State};
658✔
318
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
319
    State = State0#{lookup_topic_request_ref := undefined},
28✔
320
    erlang:demonitor(Ref, [flush]),
28✔
321
    handle_lookup_topic_reply(Reply, State);
28✔
322
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
323
    {stop, Reason};
×
324
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
325
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
326
    State = State0#{lookup_topic_request_ref := undefined},
×
327
    try_close_socket(State),
×
328
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
329
idle(internal, #maybe_send_to_pulsar{}, _State) ->
330
    %% Stale nudge
NEW
331
    keep_state_and_data;
×
332
idle(_EventType, _Event, _State) ->
333
    keep_state_and_data.
5✔
334

335
%% connecting state
336
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
337
          handler_result().
338
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
339
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
340
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
341
    keep_state_and_data;
27✔
342
connecting(state_timeout, do_connect, State) ->
343
    refresh_urls_and_connect(State);
×
344
connecting(state_timeout, lookup_topic_timeout, State0) ->
345
    log_error("timed out waiting for lookup topic response", [], State0),
×
346
    State = State0#{lookup_topic_request_ref := undefined},
×
347
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
348
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
349
    State = enqueue_send_requests([SendRequest], State0),
2✔
350
    {keep_state, State};
2✔
351
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
352
    State = State0#{lookup_topic_request_ref := undefined},
×
353
    erlang:demonitor(Ref, [flush]),
×
354
    handle_lookup_topic_reply(Reply, State);
×
355
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
356
    {stop, Reason};
×
357
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
358
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
359
    State = State0#{lookup_topic_request_ref := undefined},
×
360
    try_close_socket(State),
×
361
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
362
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
363
    handle_socket_close(connecting, Sock, Reason, State);
×
364
connecting(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
NEW
365
    handle_socket_close(connecting, SockPid, Reason, State);
×
366
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
367
    handle_socket_close(connecting, Sock, closed, State);
×
368
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
369
    handle_socket_close(connecting, Sock, Reason, State);
×
370
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
371
    handle_socket_close(connecting, Sock, Reason, State);
×
372
connecting(internal, #maybe_send_to_pulsar{}, _State) ->
373
    %% Stale nudge
NEW
374
    keep_state_and_data;
×
375
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
376
    Cmd = pulsar_protocol_frame:parse(Bin),
54✔
377
    ?MODULE:handle_response(Cmd, State);
54✔
378
connecting(info, Msg, State) ->
379
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
380
    keep_state_and_data;
×
381
connecting({call, From}, get_state, _State) ->
382
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
383
connecting({call, From}, _EventContent, _State) ->
384
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
385
connecting(cast, _EventContent, _State) ->
386
   keep_state_and_data.
×
387

388
%% connected state
389
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
390
          handler_result().
391
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
392
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
393
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
394
    State1 = resend_sent_requests(State0),
27✔
395
    State = maybe_send_to_pulsar(State1),
27✔
396
    {keep_state, State};
27✔
397
connected(state_timeout, do_connect, _State) ->
398
    keep_state_and_data;
×
399
connected(state_timeout, lookup_topic_timeout, State0) ->
400
    log_error("timed out waiting for lookup topic response", [], State0),
×
401
    %% todo: should demonitor reference
402
    State = State0#{lookup_topic_request_ref := undefined},
×
403
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
404
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
405
    ?tp(pulsar_producer_send_req_enter, #{}),
1,162✔
406
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,162✔
407
    State1 = enqueue_send_requests(SendRequests, State0),
1,162✔
408
    State = maybe_send_to_pulsar(State1),
1,162✔
409
    ?tp(pulsar_producer_send_req_exit, #{}),
1,162✔
410
    {keep_state, State};
1,162✔
411
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
412
    State = State0#{lookup_topic_request_ref := undefined},
×
413
    erlang:demonitor(Ref, [flush]),
×
414
    handle_lookup_topic_reply(Reply, State);
×
415
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
416
    {stop, Reason};
×
417
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
418
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
419
    State = State0#{lookup_topic_request_ref := undefined},
×
420
    try_close_socket(State),
×
421
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
422
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
423
    handle_socket_close(connected, Sock, Reason, State);
1✔
424
connected(info, {'EXIT', SockPid, Reason}, State) when is_pid(SockPid) ->
NEW
425
    handle_socket_close(connected, SockPid, Reason, State);
×
426
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
427
    handle_socket_close(connected, Sock, closed, State);
8✔
428
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
429
    handle_socket_close(connected, Sock, Reason, State);
×
430
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
431
    handle_socket_close(connected, Sock, Reason, State);
×
432
connected(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
433
    Cmd = pulsar_protocol_frame:parse(Bin),
643✔
434
    ?MODULE:handle_response(Cmd, State);
643✔
435
connected(_EventType, ping, State = #{sock_pid := SockPid}) ->
436
    ok = pulsar_socket_writer:ping_async(SockPid),
6✔
437
    {keep_state, State};
6✔
438
connected(internal, #maybe_send_to_pulsar{}, State0) ->
439
    State = maybe_send_to_pulsar(State0),
634✔
440
    {keep_state, State};
634✔
441
connected({call, From}, get_state, _State) ->
442
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
443
connected({call, From}, _EventContent, _State) ->
444
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
445
connected(cast, _EventContent, _State) ->
446
    keep_state_and_data;
×
447
connected(_EventType, EventContent, State) ->
448
    ?MODULE:handle_response(EventContent, State).
×
449

450
handle_socket_close(StateName, SockPid, Reason, #{sock_pid := SockPid} = State) ->
NEW
451
    #{sock := Sock} = State,
×
NEW
452
    handle_socket_close(StateName, Sock, Reason, State);
×
453
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
454
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
455
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
456
    try_close_socket(State),
8✔
457
    ?NEXT_STATE_IDLE_RECONNECT(State);
7✔
458
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
459
    %% stale close event
460
    keep_state_and_data.
1✔
461

462
-spec refresh_urls_and_connect(state()) -> handler_result().
463
refresh_urls_and_connect(State0) ->
464
    %% if Pulsar went down and then restarted later, we must issue a
465
    %% LookupTopic command again after reconnecting.
466
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
467
    %% > Topic lookup needs to be performed each time a client needs
468
    %% > to create or reconnect a producer or a consumer. Lookup is used
469
    %% > to discover which particular broker is serving the topic we are
470
    %% > about to use.
471
    %% Simply looking up the topic (even from a distinct connection)
472
    %% will "unblock" the topic so we may send messages to it.  The
473
    %% producer may be started only after that.
474
    #{ clientid := ClientId
32✔
475
     , partitiontopic := PartitionTopic
476
     } = State0,
477
    ?tp(debug, pulsar_producer_refresh_start, #{}),
32✔
478
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
32✔
479
        {ok, LookupTopicRequestRef} ->
480
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
32✔
481
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
32✔
482
    catch
483
        exit:{noproc, _} ->
484
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
485
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
486
    end.
487

488
-spec do_connect(state()) -> handler_result().
489
do_connect(State) ->
490
    #{ broker_server := {Host, Port}
27✔
491
     , opts := Opts
492
     , partitiontopic := PartitionTopic
493
     , proxy_to_broker_url := ProxyToBrokerUrl
494
     } = State,
495
    try pulsar_socket_writer:start_link(PartitionTopic, Host, Port, Opts) of
27✔
496
        {ok, {SockPid, Sock}} ->
497
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
27✔
498
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
27✔
499
            {next_state, connecting, State#{sock := Sock, sock_pid := SockPid}};
27✔
500
        {error, Reason} ->
UNCOV
501
            log_error("error connecting: ~p", [Reason], State),
×
UNCOV
502
            try_close_socket(State),
×
UNCOV
503
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
504
    catch
505
        Kind:Error:Stacktrace ->
506
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
507
            try_close_socket(State),
×
508
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
509
    end.
510

511
format_status(Status) ->
512
    maps:map(
2✔
513
      fun(data, Data0) ->
514
              censor_secrets(Data0);
2✔
515
         (_Key, Value)->
516
              Value
12✔
517
      end,
518
      Status).
519

520
%% `format_status/2' is deprecated as of OTP 25.0
521
format_status(_Opt, [_PDict, _State0, Data0]) ->
522
    Data = censor_secrets(Data0),
×
523
    [{data, [{"State", Data}]}].
×
524

525
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
526
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
527
censor_secrets(Data) ->
528
    Data.
2✔
529

530
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
531
    ok = replayq:close(Q),
16✔
532
    ok = clear_gauges(State, Q),
16✔
533
    ok.
16✔
534

535
clear_gauges(State, Q) ->
536
    pulsar_metrics:inflight_set(State, 0),
16✔
537
    maybe_reset_queuing(State, Q),
16✔
538
    ok.
16✔
539

540
maybe_reset_queuing(State, Q) ->
541
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
16✔
542
        {0, _} ->
543
            pulsar_metrics:queuing_set(State, 0),
16✔
544
            pulsar_metrics:queuing_bytes_set(State, 0);
16✔
545
        {_, false} ->
546
            pulsar_metrics:queuing_set(State, 0),
×
547
            pulsar_metrics:queuing_bytes_set(State, 0);
×
548
        {_, _} ->
549
            ok
×
550
    end.
551

552
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
553
    false;
×
554
is_replayq_durable(_, Q) ->
555
    not replayq:is_mem_only(Q).
16✔
556

557
-spec handle_response(_EventContent, state()) ->
558
          handler_result().
559
handle_response({connected, _ConnectedData}, State0 = #{
560
        sock := Sock,
561
        opts := Opts,
562
        producer_id := ProducerId,
563
        request_id := RequestId,
564
        partitiontopic := PartitionTopic
565
    }) ->
566
    start_keepalive(),
27✔
567
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
27✔
568
    {keep_state, next_request_id(State0)};
27✔
569
handle_response({producer_success, #{producer_name := ProName}}, State) ->
570
    {next_state, connected, State#{producer_name := ProName}};
27✔
571
handle_response({pong, #{}}, _State) ->
572
    start_keepalive(),
3✔
573
    keep_state_and_data;
3✔
574
handle_response({ping, #{}}, #{sock_pid := SockPid}) ->
575
    ok = pulsar_socket_writer:pong_async(SockPid),
5✔
576
    keep_state_and_data;
5✔
577
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
578
                                                }) ->
579
    log_error("Close producer: ~p~n", [Topic], State),
×
580
    try_close_socket(State),
×
581
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
582
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
583
    #{ callback := Callback
634✔
584
     , inflight_calls := InflightCalls0
585
     , requests := Reqs
586
     , replayq := Q
587
     } = State,
588
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
634✔
589
    case maps:get(SequenceId, Reqs, undefined) of
634✔
590
        undefined ->
591
            _ = invoke_callback(Callback, {ok, Resp}),
×
592
            {keep_state, State};
×
593
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
594
            ok = replayq:ack(Q, QAckRef),
634✔
595
            lists:foreach(
634✔
596
              fun({undefined, {_TS, Messages}}) ->
597
                   BatchLen = length(Messages),
1,805✔
598
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,805✔
599
                   ok;
1,805✔
600
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
601
                   %% No need to count the messages, as we invoke
602
                   %% per-request callbacks once for the whole batch.
603
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
604
                   ok;
1✔
605
                 ({From, {_TS, _Messages}}) ->
606
                   gen_statem:reply(From, {ok, Resp})
6✔
607
              end,
608
              FromsToMessages),
609
            InflightCalls = InflightCalls0 - BatchSize,
634✔
610
            pulsar_metrics:inflight_set(State, InflightCalls),
634✔
611
            Actions = [{next_event, internal, #maybe_send_to_pulsar{}}],
634✔
612
            NewState = State#{ requests := maps:remove(SequenceId, Reqs)
634✔
613
                             , inflight_calls := InflightCalls
614
                             },
615
            {keep_state, NewState, Actions}
634✔
616
    end;
617
handle_response({error, #{error := Error, message := Msg}}, State) ->
618
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
619
    try_close_socket(State),
×
620
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
621
handle_response(Msg, State) ->
622
    log_error("Receive unknown message:~p~n", [Msg], State),
×
623
    keep_state_and_data.
×
624

625
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
626
send_batch_payload(Messages, SequenceId, #{
627
            partitiontopic := Topic,
628
            producer_id := ProducerId,
629
            producer_name := ProducerName,
630
            sock_pid := SockPid,
631
            opts := Opts
632
        }) ->
633
    pulsar_socket_writer:send_batch_async(SockPid, Topic, Messages, SequenceId,
799✔
634
                                          ProducerId, ProducerName, Opts).
635

636
start_keepalive() ->
637
    erlang:send_after(30_000, self(), ping).
30✔
638

639
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
640
    State#{request_id := 1};
×
641
next_request_id(State = #{request_id := RequestId}) ->
642
    State#{request_id := RequestId + 1}.
27✔
643

644
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
645
    State#{sequence_id := 1};
×
646
next_sequence_id(State = #{sequence_id := SequenceId}) ->
647
    State#{sequence_id := SequenceId + 1}.
788✔
648

649
-spec log_debug(string(), [term()], state()) -> ok.
650
log_debug(Fmt, Args, State) ->
651
    do_log(debug, Fmt, Args, State).
55✔
652

653
-spec log_info(string(), [term()], state()) -> ok.
654
log_info(Fmt, Args, State) ->
655
    do_log(info, Fmt, Args, State).
×
656

657
-spec log_warn(string(), [term()], state()) -> ok.
658
log_warn(Fmt, Args, State) ->
659
    do_log(warning, Fmt, Args, State).
2✔
660

661
-spec log_error(string(), [term()], state()) -> ok.
662
log_error(Fmt, Args, State) ->
663
    do_log(error, Fmt, Args, State).
12✔
664

665
-spec do_log(atom(), string(), [term()], state()) -> ok.
666
do_log(Level, Format, Args, State) ->
667
    #{partitiontopic := PartitionTopic} = State,
69✔
668
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
69✔
669
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
670

671
-spec invoke_callback(callback(), callback_input()) -> ok.
672
invoke_callback(Callback, Resp) ->
673
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
674

675
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
676
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
677
    ok;
×
678
invoke_callback({M, F, A}, Resp, BatchLen) ->
679
    lists:foreach(
1,812✔
680
      fun(_) ->
681
        erlang:apply(M, F, [Resp] ++ A)
2,259✔
682
      end,  lists:seq(1, BatchLen));
683
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
684
    lists:foreach(
×
685
      fun(_) ->
686
        Callback(Resp)
×
687
      end,  lists:seq(1, BatchLen));
688
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
689
    %% for per-request callbacks, we invoke it only once, regardless
690
    %% of how many messages were sent.
691
    apply(Fn, Args ++ [Resp]).
3✔
692

693
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
694
    erlang:external_size(Item).
2,125✔
695

696
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
697
  term_to_binary(I);
508✔
698
queue_item_marshaller(Bin) when is_binary(Bin) ->
699
  case binary_to_term(Bin) of
304✔
700
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
701
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
702
              true ->
703
                  Item;
1✔
704
              false ->
705
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
706
          end;
707
      Item ->
708
          Item
302✔
709
  end.
710

711
now_ts() ->
712
    erlang:system_time(millisecond).
2,649✔
713

714
make_queue_item(From, Messages) ->
715
    ?Q_ITEM(From, now_ts(), Messages).
1,824✔
716

717
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
718
    #{drop_if_high_mem := DropIfHighMem} = State,
1,822✔
719
    QItems = lists:map(
1,822✔
720
               fun(?SEND_REQ(From, Messages)) ->
721
                 make_queue_item(From, Messages)
1,822✔
722
               end,
723
               Requests),
724
    BytesBefore = replayq:bytes(Q),
1,822✔
725
    NewQ = replayq:append(Q, QItems),
1,822✔
726
    BytesAfter = replayq:bytes(NewQ),
1,822✔
727
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,822✔
728
    pulsar_metrics:queuing_bytes_set(State, BytesAfter),
1,822✔
729
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,822✔
730
    Overflow0 = replayq:overflow(NewQ),
1,822✔
731
    IsHighMemOverflow =
1,822✔
732
        DropIfHighMem
1,822✔
NEW
733
        andalso replayq:is_mem_only(NewQ)
×
NEW
734
        andalso load_ctl:is_high_mem(),
×
735
    Overflow = case IsHighMemOverflow of
1,822✔
736
        true ->
NEW
737
            max(Overflow0, BytesAfter - BytesBefore);
×
738
        false ->
739
            Overflow0
1,822✔
740
    end,
741
    handle_overflow(State#{replayq := NewQ}, IsHighMemOverflow, Overflow).
1,822✔
742

743
-spec handle_overflow(state(), _IsHighMemOverflow :: boolean(), _Overflow :: integer()) -> state().
744
handle_overflow(State, _IsHighMemOverflow, Overflow) when Overflow =< 0 ->
745
    %% no overflow
746
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,818✔
747
    State;
1,818✔
748
handle_overflow(State0 = #{replayq := Q, callback := Callback}, IsHighMemOverflow, Overflow) ->
749
    BytesMode = case IsHighMemOverflow of
4✔
NEW
750
        true -> at_least;
×
751
        false -> at_most
4✔
752
    end,
753
    {NewQ, QAckRef, Items0} =
4✔
754
        replayq:pop(Q, #{bytes_limit => {BytesMode, Overflow}, count_limit => 999999999}),
755
    ok = replayq:ack(NewQ, QAckRef),
4✔
756
    maybe_log_discard(State0, length(Items0)),
4✔
757
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
758
    reply_with_error(Items, Callback, {error, overflow}),
4✔
759
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
760
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
761
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
762
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
763
    State0#{replayq := NewQ}.
4✔
764

765
maybe_log_discard(State, Increment) ->
766
    Last = get_overflow_log_state(),
1,825✔
767
    #{ count_since_last_log := CountLast
1,825✔
768
     , total_count := TotalCount
769
     } = Last,
770
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,825✔
771
        true -> %% no change
772
            ok;
1,819✔
773
        false ->
774
            maybe_log_discard(State, Increment, Last)
6✔
775
    end.
776

777
-spec maybe_log_discard(
778
        state(),
779
        non_neg_integer(),
780
        #{ last_log_inst => non_neg_integer()
781
         , count_since_last_log => non_neg_integer()
782
         , total_count => non_neg_integer()
783
         }) -> ok.
784
maybe_log_discard(State,
785
                  Increment,
786
                  #{ last_log_inst := LastInst
787
                   , count_since_last_log := CountLast
788
                   , total_count := TotalCount
789
                   }) ->
790
    NowInst = now_ts(),
6✔
791
    NewTotalCount = TotalCount + Increment,
6✔
792
    Delta = NewTotalCount - CountLast,
6✔
793
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
794
        true ->
795
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
796
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
797
                                    , count_since_last_log => NewTotalCount
798
                                    , total_count => NewTotalCount
799
                                    });
800
        false ->
801
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
802
                                    , count_since_last_log => CountLast
803
                                    , total_count => NewTotalCount
804
                                    })
805
    end.
806

807
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
808
                                   , count_since_last_log => non_neg_integer()
809
                                   , total_count => non_neg_integer()
810
                                   }.
811
get_overflow_log_state() ->
812
    case get(?buffer_overflow_discarded) of
1,827✔
813
        undefined ->
814
            #{ last_log_inst => 0
1,820✔
815
             , count_since_last_log => 0
816
             , total_count => 0
817
             };
818
        Stats = #{} ->
819
            Stats
7✔
820
    end.
821

822
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
823
                              , count_since_last_log => non_neg_integer()
824
                              , total_count => non_neg_integer()
825
                              }) -> ok.
826
put_overflow_log_state(#{ last_log_inst := _LastInst
827
                        , count_since_last_log := _CountLast
828
                        , total_count := _TotalCount
829
                        } = Stats) ->
830
    put(?buffer_overflow_discarded, Stats),
8✔
831
    ok.
8✔
832

833
maybe_send_to_pulsar(State) ->
834
    #{ replayq := Q
2,613✔
835
     , requests := Requests
836
     , max_inflight := MaxInflight
837
     } = State,
838
    HasQueued = replayq:count(Q) /= 0,
2,613✔
839
    HasAvailableInflight = map_size(Requests) < MaxInflight,
2,613✔
840
    case HasQueued andalso HasAvailableInflight of
2,613✔
841
        true ->
842
            do_send_to_pulsar(State);
790✔
843
        false ->
844
            State
1,823✔
845
    end.
846

847
do_send_to_pulsar(State0) ->
848
    #{ batch_size := BatchSize
790✔
849
     , inflight_calls := InflightCalls0
850
     , sequence_id := SequenceId
851
     , requests := Requests0
852
     , replayq := Q
853
     , opts := ProducerOpts
854
     } = State0,
855
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
790✔
856
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
790✔
857
                                             , bytes_limit => MaxBatchBytes
858
                                             }),
859
    State1 = State0#{replayq := NewQ},
790✔
860
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
790✔
861
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
790✔
862
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
790✔
863
    Now = now_ts(),
790✔
864
    {Expired, FromsToMessages} =
790✔
865
       lists:foldr(
866
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
867
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,970✔
868
             true ->
869
               {[{From, Msgs} | Expired], Acc};
4✔
870
             false ->
871
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,966✔
872
           end
873
         end,
874
         {[], []},
875
         Items),
876
    reply_expired_messages(Expired, State1),
790✔
877
    pulsar_metrics:dropped_inc(State1, length(Expired)),
790✔
878
    case FromsToMessages of
790✔
879
        [] ->
880
            %% all expired, immediately ack replayq batch and continue
881
            ok = replayq:ack(Q, QAckRef),
2✔
882
            maybe_send_to_pulsar(State1);
2✔
883
        [_ | _] ->
884
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
788✔
885
                                     FromsToMessages,
788✔
886
                                 Msg <- Msgs],
1,966✔
887
            FinalBatchSize = length(FinalBatch),
788✔
888
            send_batch_payload(FinalBatch, SequenceId, State0),
788✔
889
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
788✔
890
            InflightCalls = InflightCalls0 + FinalBatchSize,
788✔
891
            pulsar_metrics:inflight_set(State1, InflightCalls),
788✔
892
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
788✔
893
            State = next_sequence_id(State2),
788✔
894
            maybe_send_to_pulsar(State)
788✔
895
    end.
896

897
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
898
                               [pulsar:message()]}],
899
                             state()) -> ok.
900
reply_expired_messages(Expired, #{callback := Callback}) ->
901
    reply_with_error(Expired, Callback, {error, expired}).
791✔
902

903
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
904
                         [pulsar:message()]}],
905
                       callback(), {error, expired | overflow}) -> ok.
906
reply_with_error(Items, Callback, Error) ->
907
    lists:foreach(
795✔
908
      fun({undefined, Msgs}) ->
909
              invoke_callback(Callback, Error, length(Msgs));
7✔
910
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
911
              %% No need to count the messages, as we invoke
912
              %% per-request callbacks once for the whole batch.
913
              invoke_callback({Fn, Args}, Error);
2✔
914
         ({From, _Msgs}) ->
915
              gen_statem:reply(From, Error)
×
916
      end,
917
      Items).
918

919
collect_send_requests(Acc, Limit) ->
920
    Count = length(Acc),
1,162✔
921
    do_collect_send_requests(Acc, Count, Limit).
1,162✔
922

923
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
924
    lists:reverse(Acc);
×
925
do_collect_send_requests(Acc, Count, Limit) ->
926
    receive
1,162✔
927
        ?SEND_REQ(_, _) = Req ->
928
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
929
    after
930
        0 ->
931
            lists:reverse(Acc)
1,162✔
932
    end.
933

934
try_close_socket(#{sock := undefined}) ->
935
    ok;
1✔
936
try_close_socket(#{sock := Sock, sock_pid := SockPid, opts := Opts}) ->
937
    %% N.B.: it's important to first close the socket and then terminate the writer
938
    %% process.  The writer may be blocked in a `send' call, and closing the socket first
939
    %% will make the call return `einval' immediately, allowing us to terminate it (it
940
    %% also terminates itself on such `send' errors, but we make sure here).
941
    _ = pulsar_socket:close(Sock, Opts),
8✔
942
    ok = pulsar_socket_writer:stop(SockPid),
8✔
943
    ok.
7✔
944

945
resend_sent_requests(State) ->
946
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
27✔
947
    #{ inflight_calls := InflightCalls0
27✔
948
     , requests := Requests0
949
     , replayq := Q
950
     , opts := ProducerOpts
951
     } = State,
952
    Now = now_ts(),
27✔
953
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
27✔
954
    Requests1 = lists:keysort(1, maps:to_list(Requests0)),
27✔
955
    {Requests, Dropped} =
27✔
956
        lists:foldl(
957
          fun({SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize)}, {AccIn, DroppedAcc}) ->
958
               {Messages, Expired} =
12✔
959
                  lists:partition(
960
                    fun({_From, {Ts, _Msgs}}) ->
961
                      not is_batch_expired(Ts, RetentionPeriod, Now)
12✔
962
                    end,
963
                    FromsToMessages),
964
               lists:foreach(
12✔
965
                 fun({From, {_Ts, Msgs}}) ->
966
                   reply_expired_messages([{From, Msgs}], State)
1✔
967
                 end,
968
                 Expired),
969
               Dropped = length(Expired),
12✔
970
               Acc = case Messages of
12✔
971
                   [] ->
972
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
973
                       ok = replayq:ack(Q, QAckRef),
1✔
974
                       AccIn;
1✔
975
                   [_ | _] ->
976
                       send_batch_payload([Msg || {_From, {_Ts, Msgs}} <- Messages,
11✔
977
                                                  Msg <- Msgs],
11✔
978
                                          SequenceId, State),
979
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(Messages))}
11✔
980
               end,
981
               {Acc, DroppedAcc + Dropped}
12✔
982
          end,
983
          {#{}, 0},
984
          Requests1),
985
    InflightCalls = InflightCalls0 - Dropped,
27✔
986
    pulsar_metrics:dropped_inc(State, Dropped),
27✔
987
    pulsar_metrics:inflight_set(State, InflightCalls),
27✔
988
    State#{requests := Requests, inflight_calls := InflightCalls}.
27✔
989

990
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
991
    false;
1,973✔
992
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
993
    Timestamp =< Now - RetentionPeriod.
9✔
994

995
-spec escape(string()) -> binary().
996
escape(Str) ->
997
    NormalizedStr = unicode:characters_to_nfd_list(Str),
7✔
998
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
7✔
999

1000
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
1001
handle_lookup_topic_reply({error, Error}, State) ->
1002
    log_error("error looking up topic: ~0p", [Error], State),
×
1003
    try_close_socket(State),
×
1004
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
1005
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
1006
                                , brokerServiceUrl := NewBrokerServiceURL
1007
                                }}, State0) ->
1008
    #{clientid := ClientId} = State0,
28✔
1009
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
28✔
1010
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
28✔
1011
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
28✔
1012
        {ok, AlivePulsarURL} ->
1013
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
27✔
1014
                           , alive_pulsar_url => AlivePulsarURL
1015
                           }, State0);
1016
        {error, Reason} ->
1017
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
1018
            try_close_socket(State0),
×
1019
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
1020
    catch
1021
        exit:{noproc, _} ->
1022
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
1023
            try_close_socket(State0),
×
1024
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
1025
        exit:{timeout, _} ->
1026
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
1✔
1027
            try_close_socket(State0),
1✔
1028
            ?NEXT_STATE_IDLE_RECONNECT(State0)
1✔
1029
    end;
1030
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
1031
                                , brokerServiceUrl := NewBrokerServiceURL
1032
                                }},
1033
                         State) ->
1034
    log_debug("received topic lookup reply: ~0p",
×
1035
              [#{proxy_through_service_url => false,
1036
                 broker_service_url => NewBrokerServiceURL}], State),
1037
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
1038
                   , broker_service_url => undefined
1039
                   }, State).
1040

1041
-spec maybe_connect(#{ alive_pulsar_url := string()
1042
                     , broker_service_url := string() | undefined
1043
                     }, state()) -> handler_result().
1044
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
1045
               , alive_pulsar_url := AlivePulsarURL
1046
               }, State0) ->
1047
    #{ broker_server := OldBrokerServer
27✔
1048
     , proxy_to_broker_url := OldBrokerServiceURL
1049
     } = State0,
1050
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
27✔
1051
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
27✔
1052
        true ->
1053
            log_debug("connecting to ~0p",
27✔
1054
                      [#{broker_server => NewBrokerServer,
1055
                         service_url => NewBrokerServiceURL}], State0),
1056
            do_connect(State0);
27✔
1057
        false ->
1058
            %% broker changed; reconnect.
1059
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
1060
                      [ #{ broker_server => OldBrokerServer
1061
                         , proxy_url => OldBrokerServiceURL
1062
                         }
1063
                      , #{ broker_server => NewBrokerServer
1064
                         , proxy_url => NewBrokerServiceURL
1065
                         }
1066
                      ],
1067
                      State0),
1068
            try_close_socket(State0),
×
1069
            State = State0#{
×
1070
                broker_server := NewBrokerServer,
1071
                proxy_to_broker_url := NewBrokerServiceURL
1072
            },
1073
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1074
    end.
1075

1076
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1077
notify_state_change(undefined, _ProducerState) ->
1078
    ok;
×
1079
notify_state_change({Fun, Args}, ProducerState) ->
1080
    _ = apply(Fun, [ProducerState | Args]),
83✔
1081
    ok.
83✔
1082

1083
-ifdef(TEST).
1084
-include_lib("eunit/include/eunit.hrl").
1085

1086
maybe_log_discard_test_() ->
1087
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1088
    , {"fake-last-old",
1089
       fun() ->
1090
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1091
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1092
                                      , count_since_last_log => 2
1093
                                      , total_count => 2
1094
                                      }),
1095
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1096
         Stats = get_overflow_log_state(),
1✔
1097
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1098
         %% greater than the minimum interval because we just logged
1099
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1100
         ok
1✔
1101
       end}
1102
    , {"fake-last-fresh",
1103
       fun() ->
1104
         Inst0 = now_ts(),
1✔
1105
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1106
                                      , count_since_last_log => 2
1107
                                      , total_count => 2
1108
                                      }),
1109
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1110
         Stats = get_overflow_log_state(),
1✔
1111
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1112
         %% less than the minimum interval because we didn't log and just accumulated
1113
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1114
         ok
1✔
1115
       end}
1116
    ].
1117
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc