• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 773

29 Nov 2024 08:57PM UTC coverage: 71.844% (-2.0%) from 73.807%
773

push

github

web-flow
Merge pull request #68 from thalesmg/fix-lookup-redirect-mkII

fix(client): handle `Redirect` in `LookupTopicResponse`

189 of 261 new or added lines in 8 files covered. (72.41%)

29 existing lines in 3 files now uncovered.

939 of 1307 relevant lines covered (71.84%)

313.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.11
/src/pulsar_producer.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producer).
16

17
-behaviour(gen_statem).
18

19
-include_lib("kernel/include/inet.hrl").
20
-include("include/pulsar_producer_internal.hrl").
21
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
22

23
-export([ send/2
24
        , send/3
25
        , send_sync/2
26
        , send_sync/3
27
        , get_state/1
28
        ]).
29

30
-export([ start_link/4
31
        , idle/3
32
        , connecting/3
33
        , connected/3
34
        ]).
35

36
%% gen_statem API
37
-export([ callback_mode/0
38
        , init/1
39
        , terminate/3
40
        , format_status/1
41
        , format_status/2
42
        ]).
43

44
%% replayq API
45
-export([ queue_item_sizer/1
46
        , queue_item_marshaller/1
47
        ]).
48

49
%% for testing only
50
-ifdef(TEST).
51
-export([make_queue_item/2]).
52
-endif.
53

54
-type statem() :: idle | connecting | connected.
55
-type sequence_id() :: integer().
56
-type send_receipt() :: #{ sequence_id := sequence_id()
57
                         , producer_id := integer()
58
                         , highest_sequence_id := sequence_id()
59
                         , message_id := map()
60
                         , any() => term()
61
                         }.
62
-type timestamp() :: integer().
63
-type callback() :: undefined | mfa() | fun((map()) -> ok) | per_request_callback().
64
-type callback_input() :: {ok, send_receipt()} | {error, expired}.
65
-type config() :: #{ replayq_dir := string()
66
                   , replayq_max_total_bytes => pos_integer()
67
                   , replayq_seg_bytes => pos_integer()
68
                   , replayq_offload_mode => boolean()
69
                   , max_batch_bytes => pos_integer()
70
                   , producer_name => atom()
71
                   , clientid => atom()
72
                   , callback => callback()
73
                   , batch_size => non_neg_integer()
74
                   , retention_period => timeout()
75
                   }.
76
-export_type([ config/0
77
             ]).
78

79
-define(RECONNECT_TIMEOUT, 5_000).
80
-define(LOOKUP_TOPIC_TIMEOUT, 15_000).
81
-define(GET_ALIVE_PULSAR_URL, 5_000).
82

83
-define(MAX_REQ_ID, 4294836225).
84
-define(MAX_SEQ_ID, 18445618199572250625).
85

86
-define(DEFAULT_REPLAYQ_SEG_BYTES, 10 * 1024 * 1024).
87
-define(DEFAULT_REPLAYQ_LIMIT, 2_000_000_000).
88
-define(DEFAULT_MAX_BATCH_BYTES, 1_000_000).
89
-define(Q_ITEM(From, Ts, Messages), {From, Ts, Messages}).
90
-define(INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize), {inflight_req, QAckRef, FromsToMessages, BatchSize}).
91
-define(NEXT_STATE_IDLE_RECONNECT(State), {next_state, idle, State#{sock := undefined},
92
                                           [{state_timeout, ?RECONNECT_TIMEOUT, do_connect}]}).
93
-define(buffer_overflow_discarded, buffer_overflow_discarded).
94
-define(MIN_DISCARD_LOG_INTERVAL, timer:seconds(5)).
95
-define(PER_REQ_CALLBACK(Fn, Args), {callback, {Fn, Args}}).
96
-define(SOCK_ERR(SOCK, REASON), {socket_error, SOCK, REASON}).
97

98
%% poorman's error handling.
99
%% this is an extra safety to handle a previously missed tcp/ssl_error or tcp/ssl_closed event
100
-define(POORMAN(SOCK, EXPR),
101
        case (EXPR) of
102
            ok ->
103
                ok;
104
            {error, Reason} ->
105
                _ = self() ! ?SOCK_ERR(SOCK, Reason),
106
                ok
107
        end).
108

109
-type state_observer_callback() :: {function(), [term()]}.
110
-type state() :: #{
111
                   batch_size := non_neg_integer(),
112
                   broker_server := {binary(), pos_integer()},
113
                   callback := undefined | mfa() | fun((map()) -> ok),
114
                   clientid := atom(),
115
                   inflight_calls := non_neg_integer(),
116
                   last_bin := binary(),
117
                   lookup_topic_request_ref := reference() | undefined,
118
                   opts := map(),
119
                   parent_pid := undefined | pid(),
120
                   partitiontopic := string(),
121
                   producer_id := integer(),
122
                   producer_name := atom(),
123
                   proxy_to_broker_url := undefined | string(),
124
                   replayq := replayq:q(),
125
                   replayq_offload_mode := boolean(),
126
                   request_id := integer(),
127
                   requests := #{sequence_id() =>
128
                                     ?INFLIGHT_REQ(
129
                                        replayq:ack_ref(),
130
                                        [{gen_statem:from() | undefined,
131
                                          {timestamp(), [pulsar:message()]}}],
132
                                        _BatchSize :: non_neg_integer()
133
                                       )},
134
                   sequence_id := sequence_id(),
135
                   state_observer_callback := undefined | state_observer_callback(),
136
                   sock := undefined | port(),
137
                   telemetry_metadata := map()
138
                  }.
139
-type handler_result() :: gen_statem:event_handler_result(statem(), state()).
140
-type per_request_callback() :: {function(), [term()]}.
141
-type per_request_callback_int() :: ?PER_REQ_CALLBACK(function(), [term()]).
142
-type send_opts() :: #{callback_fn => per_request_callback()}.
143
-export_type([send_opts/0]).
144

145
callback_mode() -> [state_functions, state_enter].
22✔
146

147
start_link(PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts) ->
148
    gen_statem:start_link(?MODULE, {PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts}, []).
22✔
149

150
-spec send(gen_statem:server_ref(), [pulsar:message()]) -> {ok, pid()}.
151
send(Pid, Messages) ->
152
    send(Pid, Messages, _Opts = #{}).
×
153

154
-spec send(gen_statem:server_ref(), [pulsar:message()], send_opts()) -> {ok, pid()}.
155
send(Pid, Messages, Opts) ->
156
    From = case maps:get(callback_fn, Opts, undefined) of
1,816✔
157
               undefined -> undefined;
1,813✔
158
               {Fn, Args} when is_function(Fn) -> {callback, {Fn, Args}}
3✔
159
           end,
160
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
1,816✔
161
    {ok, Pid}.
1,816✔
162

163
-spec send_sync(gen_statem:server_ref(), [pulsar:message()]) ->
164
          {ok, send_receipt()}
165
        | {error, producer_connecting
166
                | producer_disconnected
167
                | term()}.
168
send_sync(Pid, Messages) ->
169
    send_sync(Pid, Messages, 5_000).
×
170

171
-spec send_sync(gen_statem:server_ref(), [pulsar:message()], timeout()) ->
172
          {ok, send_receipt()}
173
        | {error, producer_connecting
174
                | producer_disconnected
175
                | term()}.
176
send_sync(Pid, Messages, Timeout) ->
177
    Caller = self(),
6✔
178
    MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
6✔
179
    %% Mimicking gen_statem's From, so the reply can be sent with
180
    %% `gen_statem:reply/2'
181
    From = {Caller, MRef},
6✔
182
    erlang:send(Pid, ?SEND_REQ(From, Messages)),
6✔
183
    receive
6✔
184
        {MRef, Response} ->
185
            erlang:demonitor(MRef, [flush]),
6✔
186
            Response;
6✔
187
        {'DOWN', MRef, process, Pid, Reason} ->
188
            error({producer_down, Reason})
×
189
    after
190
        Timeout ->
191
            erlang:demonitor(MRef, [flush]),
×
192
            receive
×
193
                {MRef, Response} ->
194
                    Response
×
195
            after
196
                0 ->
197
                    error(timeout)
×
198
            end
199
    end.
200

201
-spec get_state(pid()) -> statem().
202
get_state(Pid) ->
203
    gen_statem:call(Pid, get_state, 5_000).
2✔
204

205
%%--------------------------------------------------------------------
206
%% gen_statem callback
207
%%--------------------------------------------------------------------
208

209
-spec init({string(), string(), string() | undefined, config()}) ->
210
          gen_statem:init_result(statem(), state()).
211
init({PartitionTopic, Server, ProxyToBrokerUrl, ProducerOpts0}) ->
212
    process_flag(trap_exit, true),
22✔
213
    {Transport, BrokerServer} = pulsar_utils:parse_url(Server),
22✔
214
    ProducerID = maps:get(producer_id, ProducerOpts0),
22✔
215
    Offload = maps:get(replayq_offload_mode, ProducerOpts0, false),
22✔
216
    ReplayqCfg0 =
22✔
217
        case maps:get(replayq_dir, ProducerOpts0, false) of
218
            false ->
219
                #{mem_only => true};
15✔
220
            BaseDir ->
221
                PartitionTopicPath = escape(PartitionTopic),
7✔
222
                Dir = filename:join([BaseDir, PartitionTopicPath]),
7✔
223
                SegBytes = maps:get(replayq_seg_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_SEG_BYTES),
7✔
224
                #{dir => Dir, seg_bytes => SegBytes, offload => Offload}
7✔
225
        end,
226
    MaxTotalBytes = maps:get(replayq_max_total_bytes, ProducerOpts0, ?DEFAULT_REPLAYQ_LIMIT),
22✔
227
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts0, ?DEFAULT_MAX_BATCH_BYTES),
22✔
228
    ReplayqCfg =
22✔
229
        ReplayqCfg0#{ sizer => fun ?MODULE:queue_item_sizer/1
230
                    , marshaller => fun ?MODULE:queue_item_marshaller/1
231
                    , max_total_bytes => MaxTotalBytes
232
                    },
233
    Q = replayq:open(ReplayqCfg),
22✔
234
    ProducerOpts1 = ProducerOpts0#{max_batch_bytes => MaxBatchBytes},
22✔
235
    %% drop replayq options, now that it's open.
236
    ProducerOpts = maps:without([ replayq_dir
22✔
237
                                , replayq_seg_bytes
238
                                , replayq_offload_mode
239
                                , replayq_max_total_bytes
240
                                ],
241
                                ProducerOpts1),
242
    StateObserverCallback = maps:get(state_observer_callback, ProducerOpts0, undefined),
22✔
243
    ParentPid = maps:get(parent_pid, ProducerOpts, undefined),
22✔
244
    TelemetryMetadata0 = maps:get(telemetry_metadata, ProducerOpts0, #{}),
22✔
245
    TelemetryMetadata = maps:put(partition_topic, PartitionTopic, TelemetryMetadata0),
22✔
246
    State = #{
22✔
247
        batch_size => maps:get(batch_size, ProducerOpts, 0),
248
        broker_server => BrokerServer,
249
        callback => maps:get(callback, ProducerOpts, undefined),
250
        clientid => maps:get(clientid, ProducerOpts),
251
        inflight_calls => 0,
252
        last_bin => <<>>,
253
        lookup_topic_request_ref => undefined,
254
        opts => pulsar_utils:maybe_enable_ssl_opts(Transport, ProducerOpts),
255
        parent_pid => ParentPid,
256
        partitiontopic => PartitionTopic,
257
        producer_id => ProducerID,
258
        producer_name => maps:get(producer_name, ProducerOpts, pulsar_producer),
259
        proxy_to_broker_url => ProxyToBrokerUrl,
260
        replayq => Q,
261
        replayq_offload_mode => Offload,
262
        request_id => 1,
263
        requests => #{},
264
        sequence_id => 1,
265
        state_observer_callback => StateObserverCallback,
266
        sock => undefined,
267
        telemetry_metadata => TelemetryMetadata
268
    },
269
    pulsar_metrics:inflight_set(State, 0),
22✔
270
    pulsar_metrics:queuing_set(State, replayq:count(Q)),
22✔
271
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(Q)),
22✔
272
    {ok, idle, State, [{next_event, internal, do_connect}]}.
22✔
273

274
%% idle state
275
-spec idle(gen_statem:event_type(), _EventContent, state()) ->
276
          handler_result().
277
idle(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
278
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
30✔
279
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
30✔
280
    keep_state_and_data;
30✔
281
idle(internal, do_connect, State) ->
282
    refresh_urls_and_connect(State);
22✔
283
idle(state_timeout, do_connect, State) ->
284
    refresh_urls_and_connect(State);
11✔
285
idle(state_timeout, lookup_topic_timeout, State0) ->
286
    log_error("timed out waiting for lookup topic response", [], State0),
4✔
287
    State = State0#{lookup_topic_request_ref := undefined},
4✔
288
    ?NEXT_STATE_IDLE_RECONNECT(State);
4✔
289
idle({call, From}, get_state, _State) ->
290
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
291
idle({call, From}, _EventContent, _State) ->
292
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
293
idle(cast, _EventContent, _State) ->
294
    keep_state_and_data;
×
295
idle(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
296
    State = enqueue_send_requests([SendRequest], State0),
653✔
297
    {keep_state, State};
653✔
298
idle(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
299
    State = State0#{lookup_topic_request_ref := undefined},
27✔
300
    erlang:demonitor(Ref, [flush]),
27✔
301
    handle_lookup_topic_reply(Reply, State);
27✔
302
idle(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
303
    {stop, Reason};
×
304
idle(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
305
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
306
    State = State0#{lookup_topic_request_ref := undefined},
×
307
    try_close_socket(State),
×
308
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
309
idle(_EventType, _Event, _State) ->
310
    keep_state_and_data.
7✔
311

312
%% connecting state
313
-spec connecting(gen_statem:event_type(), _EventContent, state()) ->
314
          handler_result().
315
connecting(enter, _OldState, _State = #{state_observer_callback := StateObserverCallback}) ->
316
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
317
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
318
    keep_state_and_data;
27✔
319
connecting(state_timeout, do_connect, State) ->
320
    refresh_urls_and_connect(State);
×
321
connecting(state_timeout, lookup_topic_timeout, State0) ->
322
    log_error("timed out waiting for lookup topic response", [], State0),
×
323
    State = State0#{lookup_topic_request_ref := undefined},
×
324
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
325
connecting(info, ?SEND_REQ(_, _) = SendRequest, State0) ->
326
    State = enqueue_send_requests([SendRequest], State0),
1✔
327
    {keep_state, State};
1✔
328
connecting(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
329
    State = State0#{lookup_topic_request_ref := undefined},
×
330
    erlang:demonitor(Ref, [flush]),
×
331
    handle_lookup_topic_reply(Reply, State);
×
332
connecting(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
333
    {stop, Reason};
×
334
connecting(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
335
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
336
    State = State0#{lookup_topic_request_ref := undefined},
×
337
    try_close_socket(State),
×
338
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
339
connecting(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
340
    handle_socket_close(connecting, Sock, Reason, State);
×
341
connecting(info, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
342
    handle_socket_close(connecting, Sock, closed, State);
×
343
connecting(info, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
344
    handle_socket_close(connecting, Sock, Reason, State);
×
345
connecting(info, ?SOCK_ERR(Sock, Reason), State) ->
346
    handle_socket_close(connecting, Sock, Reason, State);
×
347
connecting(_EventType, {Inet, _, Bin}, State) when Inet == tcp; Inet == ssl ->
348
    {Cmd, _} = pulsar_protocol_frame:parse(Bin),
54✔
349
    handle_response(Cmd, State);
54✔
350
connecting(info, Msg, State) ->
351
    log_info("[connecting] unknown message received ~p~n  ~p", [Msg, State], State),
×
352
    keep_state_and_data;
×
353
connecting({call, From}, get_state, _State) ->
354
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
×
355
connecting({call, From}, _EventContent, _State) ->
356
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
357
connecting(cast, _EventContent, _State) ->
358
   keep_state_and_data.
×
359

360
%% connected state
361
-spec connected(gen_statem:event_type(), _EventContent, state()) ->
362
          handler_result().
363
connected(enter, _OldState, State0 = #{state_observer_callback := StateObserverCallback}) ->
364
    ?tp(debug, pulsar_producer_state_enter, #{state => ?FUNCTION_NAME, previous => _OldState}),
27✔
365
    notify_state_change(StateObserverCallback, ?FUNCTION_NAME),
27✔
366
    State1 = resend_sent_requests(State0),
27✔
367
    State = maybe_send_to_pulsar(State1),
27✔
368
    {keep_state, State};
27✔
369
connected(state_timeout, do_connect, _State) ->
370
    keep_state_and_data;
×
371
connected(state_timeout, lookup_topic_timeout, State0) ->
372
    log_error("timed out waiting for lookup topic response", [], State0),
×
373
    %% todo: should demonitor reference
374
    State = State0#{lookup_topic_request_ref := undefined},
×
375
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
376
connected(info, ?SEND_REQ(_, _) = SendRequest, State0 = #{batch_size := BatchSize}) ->
377
    ?tp(pulsar_producer_send_req_enter, #{}),
1,168✔
378
    SendRequests = collect_send_requests([SendRequest], BatchSize),
1,168✔
379
    State1 = enqueue_send_requests(SendRequests, State0),
1,168✔
380
    State = maybe_send_to_pulsar(State1),
1,168✔
381
    ?tp(pulsar_producer_send_req_exit, #{}),
1,168✔
382
    {keep_state, State};
1,168✔
383
connected(info, {Ref, Reply}, State0 = #{lookup_topic_request_ref := Ref}) ->
384
    State = State0#{lookup_topic_request_ref := undefined},
×
385
    erlang:demonitor(Ref, [flush]),
×
386
    handle_lookup_topic_reply(Reply, State);
×
387
connected(info, {'EXIT', ParentPid, Reason}, #{parent_pid := ParentPid}) when is_pid(ParentPid) ->
388
    {stop, Reason};
×
389
connected(info, {'DOWN', Ref, process, _Pid, Reason}, State0 = #{lookup_topic_request_ref := Ref}) ->
390
    log_error("client down; will retry connection later; reason: ~0p", [Reason], State0),
×
391
    State = State0#{lookup_topic_request_ref := undefined},
×
392
    try_close_socket(State),
×
393
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
394
connected(info, {'EXIT', Sock, Reason}, State) when is_port(Sock) ->
395
    handle_socket_close(connected, Sock, Reason, State);
1✔
396
connected(_EventType, {C, Sock}, State) when C =:= tcp_closed; C =:= ssl_closed ->
397
    handle_socket_close(connected, Sock, closed, State);
8✔
398
connected(_EventType, {E, Sock, Reason}, State) when E =:= tcp_error; E =:= ssl_error ->
399
    handle_socket_close(connected, Sock, Reason, State);
×
400
connected(_EventType, ?SOCK_ERR(Sock, Reason), State) ->
401
    handle_socket_close(connected, Sock, Reason, State);
×
402
connected(_EventType, {Inet, _, Bin}, State = #{last_bin := LastBin})
403
        when Inet == tcp; Inet == ssl ->
404
    parse(pulsar_protocol_frame:parse(<<LastBin/binary, Bin/binary>>), State);
699✔
405
connected(_EventType, ping, State = #{sock := Sock, opts := Opts}) ->
406
    ?POORMAN(Sock, pulsar_socket:ping(Sock, Opts)),
6✔
407
    {keep_state, State};
6✔
408
connected({call, From}, get_state, _State) ->
409
    {keep_state_and_data, [{reply, From, ?FUNCTION_NAME}]};
2✔
410
connected({call, From}, _EventContent, _State) ->
411
    {keep_state_and_data, [{reply, From, {error, unknown_call}}]};
×
412
connected(cast, _EventContent, _State) ->
413
    keep_state_and_data;
×
414
connected(_EventType, EventContent, State) ->
415
    handle_response(EventContent, State).
×
416

417
handle_socket_close(StateName, Sock, Reason, #{sock := Sock} = State) ->
418
    ?tp("pulsar_socket_close", #{sock => Sock, reason => Reason}),
8✔
419
    log_error("connection_closed at_state: ~p, reason: ~p", [StateName, Reason], State),
8✔
420
    try_close_socket(State),
8✔
421
    ?NEXT_STATE_IDLE_RECONNECT(State);
8✔
422
handle_socket_close(_StateName, _Sock, _Reason, _State) ->
423
    %% stale close event
424
    keep_state_and_data.
1✔
425

426
-spec refresh_urls_and_connect(state()) -> handler_result().
427
refresh_urls_and_connect(State0) ->
428
    %% if Pulsar went down and then restarted later, we must issue a
429
    %% LookupTopic command again after reconnecting.
430
    %% https://pulsar.apache.org/docs/2.10.x/developing-binary-protocol/#topic-lookup
431
    %% > Topic lookup needs to be performed each time a client needs
432
    %% > to create or reconnect a producer or a consumer. Lookup is used
433
    %% > to discover which particular broker is serving the topic we are
434
    %% > about to use.
435
    %% Simply looking up the topic (even from a distinct connection)
436
    %% will "unblock" the topic so we may send messages to it.  The
437
    %% producer may be started only after that.
438
    #{ clientid := ClientId
33✔
439
     , partitiontopic := PartitionTopic
440
     } = State0,
441
    ?tp(debug, pulsar_producer_refresh_start, #{}),
33✔
442
    try pulsar_client_manager:lookup_topic_async(ClientId, PartitionTopic) of
33✔
443
        {ok, LookupTopicRequestRef} ->
444
            State = State0#{lookup_topic_request_ref := LookupTopicRequestRef},
33✔
445
            {keep_state, State, [{state_timeout, ?LOOKUP_TOPIC_TIMEOUT, lookup_topic_timeout}]}
33✔
446
    catch
447
        exit:{noproc, _} ->
448
            log_error("client restarting; will retry to lookup topic again later", [], State0),
×
449
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
450
    end.
451

452
-spec do_connect(state()) -> handler_result().
453
do_connect(State) ->
454
    #{ broker_server := {Host, Port}
27✔
455
     , opts := Opts
456
     , proxy_to_broker_url := ProxyToBrokerUrl
457
     } = State,
458
    try pulsar_socket:connect(Host, Port, Opts) of
27✔
459
        {ok, Sock} ->
460
            Opts1 = pulsar_utils:maybe_add_proxy_to_broker_url_opts(Opts, ProxyToBrokerUrl),
27✔
461
            ?POORMAN(Sock, pulsar_socket:send_connect_packet(Sock, Opts1)),
27✔
462
            {next_state, connecting, State#{sock := Sock}};
27✔
463
        {error, Reason} ->
UNCOV
464
            log_error("error connecting: ~p", [Reason], State),
×
UNCOV
465
            try_close_socket(State),
×
UNCOV
466
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
467
    catch
468
        Kind:Error:Stacktrace ->
469
            log_error("exception connecting: ~p -> ~p~n  ~p", [Kind, Error, Stacktrace], State),
×
470
            try_close_socket(State),
×
471
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
472
    end.
473

474
format_status(Status) ->
475
    maps:map(
1✔
476
      fun(data, Data0) ->
477
              censor_secrets(Data0);
1✔
478
         (_Key, Value)->
479
              Value
6✔
480
      end,
481
      Status).
482

483
%% `format_status/2' is deprecated as of OTP 25.0
484
format_status(_Opt, [_PDict, _State0, Data0]) ->
485
    Data = censor_secrets(Data0),
×
486
    [{data, [{"State", Data}]}].
×
487

488
censor_secrets(Data0 = #{opts := Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
489
    Data0#{opts := Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
490
censor_secrets(Data) ->
491
    Data.
1✔
492

493
terminate(_Reason, _StateName, State = #{replayq := Q}) ->
494
    ok = replayq:close(Q),
17✔
495
    ok = clear_gauges(State, Q),
16✔
496
    ok.
16✔
497

498
clear_gauges(State, Q) ->
499
    pulsar_metrics:inflight_set(State, 0),
16✔
500
    maybe_reset_queuing(State, Q),
16✔
501
    ok.
16✔
502

503
maybe_reset_queuing(State, Q) ->
504
    case {replayq:count(Q), is_replayq_durable(State, Q)} of
16✔
505
        {0, _} ->
506
            pulsar_metrics:queuing_set(State, 0),
16✔
507
            pulsar_metrics:queuing_bytes_set(State, 0);
16✔
508
        {_, false} ->
509
            pulsar_metrics:queuing_set(State, 0),
×
510
            pulsar_metrics:queuing_bytes_set(State, 0);
×
511
        {_, _} ->
512
            ok
×
513
    end.
514

515
is_replayq_durable(#{replayq_offload_mode := true}, _Q) ->
516
    false;
×
517
is_replayq_durable(_, Q) ->
518
    not replayq:is_mem_only(Q).
16✔
519

520
parse({incomplete, Bin}, State) ->
521
    {keep_state, State#{last_bin := Bin}};
×
522
parse({Cmd, <<>>}, State) ->
523
    handle_response(Cmd, State#{last_bin := <<>>});
699✔
524
parse({Cmd, LastBin}, State) ->
525
    State2 = case handle_response(Cmd, State) of
336✔
526
        keep_state_and_data -> State;
×
527
        {_, State1 = #{}} -> State1;
336✔
528
        {_, _NextState, State1 = #{}} -> State1;
×
529
        {_, _NextState, State1 = #{}, _Actions} -> State1
×
530
    end,
531
    parse(pulsar_protocol_frame:parse(LastBin), State2).
336✔
532

533
-spec handle_response(_EventContent, state()) ->
534
          handler_result().
535
handle_response({connected, _ConnectedData}, State0 = #{
536
        sock := Sock,
537
        opts := Opts,
538
        producer_id := ProducerId,
539
        request_id := RequestId,
540
        partitiontopic := PartitionTopic
541
    }) ->
542
    start_keepalive(),
27✔
543
    ?POORMAN(Sock, pulsar_socket:send_create_producer_packet(Sock, PartitionTopic, RequestId, ProducerId, Opts)),
27✔
544
    {keep_state, next_request_id(State0)};
27✔
545
handle_response({producer_success, #{producer_name := ProName}}, State) ->
546
    {next_state, connected, State#{producer_name := ProName}};
27✔
547
handle_response({pong, #{}}, _State) ->
548
    start_keepalive(),
3✔
549
    keep_state_and_data;
3✔
550
handle_response({ping, #{}}, #{sock := Sock, opts := Opts}) ->
551
    ?POORMAN(Sock, pulsar_socket:pong(Sock, Opts)),
5✔
552
    keep_state_and_data;
5✔
553
handle_response({close_producer, #{}}, State = #{ partitiontopic := Topic
554
                                                }) ->
555
    log_error("Close producer: ~p~n", [Topic], State),
×
556
    try_close_socket(State),
×
557
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
558
handle_response({send_receipt, Resp = #{sequence_id := SequenceId}}, State) ->
559
    #{ callback := Callback
1,027✔
560
     , inflight_calls := InflightCalls0
561
     , requests := Reqs
562
     , replayq := Q
563
     } = State,
564
    ?tp(pulsar_producer_recv_send_receipt, #{receipt => Resp}),
1,027✔
565
    case maps:get(SequenceId, Reqs, undefined) of
1,027✔
566
        undefined ->
567
            _ = invoke_callback(Callback, {ok, Resp}),
×
568
            {keep_state, State};
×
569
        ?INFLIGHT_REQ(QAckRef, FromsToMessages, BatchSize) ->
570
            ok = replayq:ack(Q, QAckRef),
1,027✔
571
            lists:foreach(
1,027✔
572
              fun({undefined, {_TS, Messages}}) ->
573
                   BatchLen = length(Messages),
1,806✔
574
                   _ = invoke_callback(Callback, {ok, Resp}, BatchLen),
1,806✔
575
                   ok;
1,806✔
576
                 ({?PER_REQ_CALLBACK(Fn, Args), {_TS, _Messages}}) ->
577
                   %% No need to count the messages, as we invoke
578
                   %% per-request callbacks once for the whole batch.
579
                   _ = invoke_callback({Fn, Args}, {ok, Resp}),
1✔
580
                   ok;
1✔
581
                 ({From, {_TS, _Messages}}) ->
582
                   gen_statem:reply(From, {ok, Resp})
6✔
583
              end,
584
              FromsToMessages),
585
            InflightCalls = InflightCalls0 - BatchSize,
1,027✔
586
            pulsar_metrics:inflight_set(State, InflightCalls),
1,027✔
587
            {keep_state, State#{ requests := maps:remove(SequenceId, Reqs)
1,027✔
588
                               , inflight_calls := InflightCalls
589
                               }}
590
    end;
591
handle_response({error, #{error := Error, message := Msg}}, State) ->
592
    log_error("Response error:~p, msg:~p~n", [Error, Msg], State),
×
593
    try_close_socket(State),
×
594
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
595
handle_response(Msg, State) ->
596
    log_error("Receive unknown message:~p~n", [Msg], State),
×
597
    keep_state_and_data.
×
598

599
-spec send_batch_payload([{timestamp(), [pulsar:message()]}], sequence_id(), state()) -> ok.
600
send_batch_payload(Messages, SequenceId, #{
601
            partitiontopic := Topic,
602
            producer_id := ProducerId,
603
            producer_name := ProducerName,
604
            sock := Sock,
605
            opts := Opts
606
        }) ->
607
    ?POORMAN(Sock, pulsar_socket:send_batch_message_packet(Sock, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts)).
1,581✔
608

609
start_keepalive() ->
610
    erlang:send_after(30_000, self(), ping).
30✔
611

612
next_request_id(State = #{request_id := ?MAX_REQ_ID}) ->
613
    State#{request_id := 1};
×
614
next_request_id(State = #{request_id := RequestId}) ->
615
    State#{request_id := RequestId + 1}.
27✔
616

617
next_sequence_id(State = #{sequence_id := ?MAX_SEQ_ID}) ->
618
    State#{sequence_id := 1};
×
619
next_sequence_id(State = #{sequence_id := SequenceId}) ->
620
    State#{sequence_id := SequenceId + 1}.
1,180✔
621

622
-spec log_debug(string(), [term()], state()) -> ok.
623
log_debug(Fmt, Args, State) ->
624
    do_log(debug, Fmt, Args, State).
54✔
625

626
-spec log_info(string(), [term()], state()) -> ok.
627
log_info(Fmt, Args, State) ->
628
    do_log(info, Fmt, Args, State).
×
629

630
-spec log_warn(string(), [term()], state()) -> ok.
631
log_warn(Fmt, Args, State) ->
632
    do_log(warning, Fmt, Args, State).
2✔
633

634
-spec log_error(string(), [term()], state()) -> ok.
635
log_error(Fmt, Args, State) ->
636
    do_log(error, Fmt, Args, State).
12✔
637

638
-spec do_log(atom(), string(), [term()], state()) -> ok.
639
do_log(Level, Format, Args, State) ->
640
    #{partitiontopic := PartitionTopic} = State,
68✔
641
    logger:log(Level, "[pulsar-producer][~s] " ++ Format,
68✔
642
               [PartitionTopic | Args], #{domain => [pulsar, producer]}).
643

644
-spec invoke_callback(callback(), callback_input()) -> ok.
645
invoke_callback(Callback, Resp) ->
646
    invoke_callback(Callback, Resp, _BatchLen = 1).
3✔
647

648
-spec invoke_callback(callback(), callback_input(), non_neg_integer()) -> ok.
649
invoke_callback(_Callback = undefined, _Resp, _BatchLen) ->
650
    ok;
×
651
invoke_callback({M, F, A}, Resp, BatchLen) ->
652
    lists:foreach(
1,813✔
653
      fun(_) ->
654
        erlang:apply(M, F, [Resp] ++ A)
2,260✔
655
      end,  lists:seq(1, BatchLen));
656
invoke_callback(Callback, Resp, BatchLen) when is_function(Callback, 1) ->
657
    lists:foreach(
×
658
      fun(_) ->
659
        Callback(Resp)
×
660
      end,  lists:seq(1, BatchLen));
661
invoke_callback({Fn, Args}, Resp, _BatchLen) when is_function(Fn), is_list(Args) ->
662
    %% for per-request callbacks, we invoke it only once, regardless
663
    %% of how many messages were sent.
664
    apply(Fn, Args ++ [Resp]).
3✔
665

666
queue_item_sizer(?Q_ITEM(_CallId, _Ts, _Batch) = Item) ->
667
    erlang:external_size(Item).
2,125✔
668

669
queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
670
  term_to_binary(I);
508✔
671
queue_item_marshaller(Bin) when is_binary(Bin) ->
672
  case binary_to_term(Bin) of
304✔
673
      Item = ?Q_ITEM({Pid, _Tag}, Ts, Msgs) when is_pid(Pid) ->
674
          case node(Pid) =:= node() andalso erlang:is_process_alive(Pid) of
2✔
675
              true ->
676
                  Item;
1✔
677
              false ->
678
                  ?Q_ITEM(undefined, Ts, Msgs)
1✔
679
          end;
680
      Item ->
681
          Item
302✔
682
  end.
683

684
now_ts() ->
685
    erlang:system_time(millisecond).
3,041✔
686

687
make_queue_item(From, Messages) ->
688
    ?Q_ITEM(From, now_ts(), Messages).
1,824✔
689

690
enqueue_send_requests(Requests, State = #{replayq := Q}) ->
691
    QItems = lists:map(
1,822✔
692
               fun(?SEND_REQ(From, Messages)) ->
693
                 make_queue_item(From, Messages)
1,822✔
694
               end,
695
               Requests),
696
    NewQ = replayq:append(Q, QItems),
1,822✔
697
    pulsar_metrics:queuing_set(State, replayq:count(NewQ)),
1,822✔
698
    pulsar_metrics:queuing_bytes_set(State, replayq:bytes(NewQ)),
1,822✔
699
    ?tp(pulsar_producer_send_requests_enqueued, #{requests => Requests}),
1,822✔
700
    Overflow = replayq:overflow(NewQ),
1,822✔
701
    handle_overflow(State#{replayq := NewQ}, Overflow).
1,822✔
702

703
-spec handle_overflow(state(), integer()) -> state().
704
handle_overflow(State, Overflow) when Overflow =< 0 ->
705
    %% no overflow
706
    ok = maybe_log_discard(State, _NumRequestsIncrement = 0),
1,818✔
707
    State;
1,818✔
708
handle_overflow(State0 = #{replayq := Q, callback := Callback}, Overflow) ->
709
    {NewQ, QAckRef, Items0} =
4✔
710
        replayq:pop(Q, #{bytes_limit => Overflow, count_limit => 999999999}),
711
    ok = replayq:ack(NewQ, QAckRef),
4✔
712
    maybe_log_discard(State0, length(Items0)),
4✔
713
    Items = [{From, Msgs} || ?Q_ITEM(From, _Now, Msgs) <- Items0],
4✔
714
    reply_with_error(Items, Callback, {error, overflow}),
4✔
715
    NumMsgs = length([1 || {_, Msgs} <- Items, _ <- Msgs]),
4✔
716
    pulsar_metrics:dropped_queue_full_inc(State0, NumMsgs),
4✔
717
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
4✔
718
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
4✔
719
    State0#{replayq := NewQ}.
4✔
720

721
maybe_log_discard(State, Increment) ->
722
    Last = get_overflow_log_state(),
1,825✔
723
    #{ count_since_last_log := CountLast
1,825✔
724
     , total_count := TotalCount
725
     } = Last,
726
    case CountLast =:= TotalCount andalso Increment =:= 0 of
1,825✔
727
        true -> %% no change
728
            ok;
1,819✔
729
        false ->
730
            maybe_log_discard(State, Increment, Last)
6✔
731
    end.
732

733
-spec maybe_log_discard(
734
        state(),
735
        non_neg_integer(),
736
        #{ last_log_inst => non_neg_integer()
737
         , count_since_last_log => non_neg_integer()
738
         , total_count => non_neg_integer()
739
         }) -> ok.
740
maybe_log_discard(State,
741
                  Increment,
742
                  #{ last_log_inst := LastInst
743
                   , count_since_last_log := CountLast
744
                   , total_count := TotalCount
745
                   }) ->
746
    NowInst = now_ts(),
6✔
747
    NewTotalCount = TotalCount + Increment,
6✔
748
    Delta = NewTotalCount - CountLast,
6✔
749
    case NowInst - LastInst > ?MIN_DISCARD_LOG_INTERVAL of
6✔
750
        true ->
751
            log_warn("replayq dropped ~b overflowed messages", [Delta], State),
2✔
752
            put_overflow_log_state(#{ last_log_inst => NowInst
2✔
753
                                    , count_since_last_log => NewTotalCount
754
                                    , total_count => NewTotalCount
755
                                    });
756
        false ->
757
            put_overflow_log_state(#{ last_log_inst => LastInst
4✔
758
                                    , count_since_last_log => CountLast
759
                                    , total_count => NewTotalCount
760
                                    })
761
    end.
762

763
-spec get_overflow_log_state() -> #{ last_log_inst => non_neg_integer()
764
                                   , count_since_last_log => non_neg_integer()
765
                                   , total_count => non_neg_integer()
766
                                   }.
767
get_overflow_log_state() ->
768
    case get(?buffer_overflow_discarded) of
1,827✔
769
        undefined ->
770
            #{ last_log_inst => 0
1,820✔
771
             , count_since_last_log => 0
772
             , total_count => 0
773
             };
774
        Stats = #{} ->
775
            Stats
7✔
776
    end.
777

778
-spec put_overflow_log_state(#{ last_log_inst => non_neg_integer()
779
                              , count_since_last_log => non_neg_integer()
780
                              , total_count => non_neg_integer()
781
                              }) -> ok.
782
put_overflow_log_state(#{ last_log_inst := _LastInst
783
                        , count_since_last_log := _CountLast
784
                        , total_count := _TotalCount
785
                        } = Stats) ->
786
    put(?buffer_overflow_discarded, Stats),
8✔
787
    ok.
8✔
788

789
maybe_send_to_pulsar(State) ->
790
    #{replayq := Q} = State,
2,377✔
791
    case replayq:count(Q) =:= 0 of
2,377✔
792
        true ->
793
            State;
1,195✔
794
        false ->
795
            do_send_to_pulsar(State)
1,182✔
796
    end.
797

798
do_send_to_pulsar(State0) ->
799
    #{ batch_size := BatchSize
1,182✔
800
     , inflight_calls := InflightCalls0
801
     , sequence_id := SequenceId
802
     , requests := Requests0
803
     , replayq := Q
804
     , opts := ProducerOpts
805
     } = State0,
806
    MaxBatchBytes = maps:get(max_batch_bytes, ProducerOpts, ?DEFAULT_MAX_BATCH_BYTES),
1,182✔
807
    {NewQ, QAckRef, Items} = replayq:pop(Q, #{ count_limit => BatchSize
1,182✔
808
                                             , bytes_limit => MaxBatchBytes
809
                                             }),
810
    State1 = State0#{replayq := NewQ},
1,182✔
811
    pulsar_metrics:queuing_set(State0, replayq:count(NewQ)),
1,182✔
812
    pulsar_metrics:queuing_bytes_set(State0, replayq:bytes(NewQ)),
1,182✔
813
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
1,182✔
814
    Now = now_ts(),
1,182✔
815
    {Expired, FromsToMessages} =
1,182✔
816
       lists:foldr(
817
         fun(?Q_ITEM(From, Timestamp, Msgs), {Expired, Acc}) ->
818
           case is_batch_expired(Timestamp, RetentionPeriod, Now) of
1,970✔
819
             true ->
820
               {[{From, Msgs} | Expired], Acc};
4✔
821
             false ->
822
               {Expired, [{From, {Timestamp, Msgs}} | Acc]}
1,966✔
823
           end
824
         end,
825
         {[], []},
826
         Items),
827
    reply_expired_messages(Expired, State1),
1,182✔
828
    pulsar_metrics:dropped_inc(State1, length(Expired)),
1,182✔
829
    case FromsToMessages of
1,182✔
830
        [] ->
831
            %% all expired, immediately ack replayq batch and continue
832
            ok = replayq:ack(Q, QAckRef),
2✔
833
            maybe_send_to_pulsar(State1);
2✔
834
        [_ | _] ->
835
            FinalBatch = [Msg || {_From, {_Timestamp, Msgs}} <-
1,180✔
836
                                     FromsToMessages,
1,180✔
837
                                 Msg <- Msgs],
1,966✔
838
            FinalBatchSize = length(FinalBatch),
1,180✔
839
            send_batch_payload(FinalBatch, SequenceId, State0),
1,180✔
840
            Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
1,180✔
841
            InflightCalls = InflightCalls0 + FinalBatchSize,
1,180✔
842
            pulsar_metrics:inflight_set(State1, InflightCalls),
1,180✔
843
            State2 = State1#{requests := Requests, inflight_calls := InflightCalls},
1,180✔
844
            State = next_sequence_id(State2),
1,180✔
845
            maybe_send_to_pulsar(State)
1,180✔
846
    end.
847

848
-spec reply_expired_messages([{gen_statem:from() | per_request_callback_int() | undefined,
849
                               [pulsar:message()]}],
850
                             state()) -> ok.
851
reply_expired_messages(Expired, #{callback := Callback}) ->
852
    reply_with_error(Expired, Callback, {error, expired}).
1,183✔
853

854
-spec reply_with_error([{gen_statem:from() | per_request_callback_int() | undefined,
855
                         [pulsar:message()]}],
856
                       callback(), {error, expired | overflow}) -> ok.
857
reply_with_error(Items, Callback, Error) ->
858
    lists:foreach(
1,187✔
859
      fun({undefined, Msgs}) ->
860
              invoke_callback(Callback, Error, length(Msgs));
7✔
861
         ({?PER_REQ_CALLBACK(Fn, Args), _Msgs}) ->
862
              %% No need to count the messages, as we invoke
863
              %% per-request callbacks once for the whole batch.
864
              invoke_callback({Fn, Args}, Error);
2✔
865
         ({From, _Msgs}) ->
866
              gen_statem:reply(From, Error)
×
867
      end,
868
      Items).
869

870
collect_send_requests(Acc, Limit) ->
871
    Count = length(Acc),
1,168✔
872
    do_collect_send_requests(Acc, Count, Limit).
1,168✔
873

874
do_collect_send_requests(Acc, Count, Limit) when Count >= Limit ->
875
    lists:reverse(Acc);
×
876
do_collect_send_requests(Acc, Count, Limit) ->
877
    receive
1,168✔
878
        ?SEND_REQ(_, _) = Req ->
879
            do_collect_send_requests([Req | Acc], Count + 1, Limit)
×
880
    after
881
        0 ->
882
            lists:reverse(Acc)
1,168✔
883
    end.
884

885
try_close_socket(#{sock := undefined}) ->
UNCOV
886
    ok;
×
887
try_close_socket(#{sock := Sock, opts := Opts}) ->
888
    _ = pulsar_socket:close(Sock, Opts),
8✔
889
    ok.
8✔
890

891
resend_sent_requests(State) ->
892
    ?tp(pulsar_producer_resend_sent_requests_enter, #{}),
27✔
893
    #{ inflight_calls := InflightCalls0
27✔
894
     , requests := Requests0
895
     , replayq := Q
896
     , opts := ProducerOpts
897
     } = State,
898
    Now = now_ts(),
27✔
899
    RetentionPeriod = maps:get(retention_period, ProducerOpts, infinity),
27✔
900
    {Requests, Dropped} =
27✔
901
        maps:fold(
902
          fun(SequenceId, ?INFLIGHT_REQ(QAckRef, FromsToMessages, _BatchSize), {AccIn, DroppedAcc}) ->
903
               {Messages, Expired} =
402✔
904
                  lists:partition(
905
                    fun({_From, {Ts, _Msgs}}) ->
906
                      not is_batch_expired(Ts, RetentionPeriod, Now)
402✔
907
                    end,
908
                    FromsToMessages),
909
               lists:foreach(
402✔
910
                 fun({From, {_Ts, Msgs}}) ->
911
                   reply_expired_messages([{From, Msgs}], State)
1✔
912
                 end,
913
                 Expired),
914
               Dropped = length(Expired),
402✔
915
               Acc = case Messages of
402✔
916
                   [] ->
917
                       ?tp(pulsar_producer_resend_all_expired, #{}),
1✔
918
                       ok = replayq:ack(Q, QAckRef),
1✔
919
                       AccIn;
1✔
920
                   [_ | _] ->
921
                       send_batch_payload([Msg || {_From, {_Ts, Msgs}} <- Messages,
401✔
922
                                                  Msg <- Msgs],
401✔
923
                                          SequenceId, State),
924
                       AccIn#{SequenceId => ?INFLIGHT_REQ(QAckRef, Messages, length(Messages))}
401✔
925
               end,
926
               {Acc, DroppedAcc + Dropped}
402✔
927
          end,
928
          {#{}, 0},
929
          Requests0),
930
    InflightCalls = InflightCalls0 - Dropped,
27✔
931
    pulsar_metrics:dropped_inc(State, Dropped),
27✔
932
    pulsar_metrics:inflight_set(State, InflightCalls),
27✔
933
    State#{requests := Requests, inflight_calls := InflightCalls}.
27✔
934

935
is_batch_expired(_Timestamp, infinity = _RetentionPeriod, _Now) ->
936
    false;
2,363✔
937
is_batch_expired(Timestamp, RetentionPeriod, Now) ->
938
    Timestamp =< Now - RetentionPeriod.
9✔
939

940
-spec escape(string()) -> binary().
941
escape(Str) ->
942
    NormalizedStr = unicode:characters_to_nfd_list(Str),
7✔
943
    iolist_to_binary(pulsar_utils:escape_uri(NormalizedStr)).
7✔
944

945
-spec handle_lookup_topic_reply(pulsar_client:lookup_topic_response(), state()) -> handler_result().
946
handle_lookup_topic_reply({error, Error}, State) ->
UNCOV
947
    log_error("error looking up topic: ~0p", [Error], State),
×
UNCOV
948
    try_close_socket(State),
×
UNCOV
949
    ?NEXT_STATE_IDLE_RECONNECT(State);
×
950
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := true
951
                                , brokerServiceUrl := NewBrokerServiceURL
952
                                }}, State0) ->
953
    #{clientid := ClientId} = State0,
27✔
954
    ?tp(debug, pulsar_producer_lookup_alive_pulsar_url, #{}),
27✔
955
    log_debug("received topic lookup reply: ~0p", [#{proxy_through_service_url => true, broker_service_url => NewBrokerServiceURL}], State0),
27✔
956
    try pulsar_client_manager:get_alive_pulsar_url(ClientId, ?GET_ALIVE_PULSAR_URL) of
27✔
957
        {ok, AlivePulsarURL} ->
958
            maybe_connect(#{ broker_service_url => NewBrokerServiceURL
27✔
959
                           , alive_pulsar_url => AlivePulsarURL
960
                           }, State0);
961
        {error, Reason} ->
962
            log_error("error getting pulsar alive URL: ~0p", [Reason], State0),
×
963
            try_close_socket(State0),
×
964
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
965
    catch
966
        exit:{noproc, _} ->
967
            log_error("client restarting; will retry to lookup topic later", [], State0),
×
968
            try_close_socket(State0),
×
969
            ?NEXT_STATE_IDLE_RECONNECT(State0);
×
970
        exit:{timeout, _} ->
971
            log_error("timeout calling client; will retry to lookup topic later", [], State0),
×
972
            try_close_socket(State0),
×
973
            ?NEXT_STATE_IDLE_RECONNECT(State0)
×
974
    end;
975
handle_lookup_topic_reply({ok, #{ proxy_through_service_url := false
976
                                , brokerServiceUrl := NewBrokerServiceURL
977
                                }},
978
                         State) ->
979
    log_debug("received topic lookup reply: ~0p",
×
980
              [#{proxy_through_service_url => false,
981
                 broker_service_url => NewBrokerServiceURL}], State),
982
    maybe_connect(#{ alive_pulsar_url => NewBrokerServiceURL
×
983
                   , broker_service_url => undefined
984
                   }, State).
985

986
-spec maybe_connect(#{ alive_pulsar_url := string()
987
                     , broker_service_url := string() | undefined
988
                     }, state()) -> handler_result().
989
maybe_connect(#{ broker_service_url := NewBrokerServiceURL
990
               , alive_pulsar_url := AlivePulsarURL
991
               }, State0) ->
992
    #{ broker_server := OldBrokerServer
27✔
993
     , proxy_to_broker_url := OldBrokerServiceURL
994
     } = State0,
995
    {_Transport, NewBrokerServer} = pulsar_utils:parse_url(AlivePulsarURL),
27✔
996
    case {OldBrokerServer, OldBrokerServiceURL} =:= {NewBrokerServer, NewBrokerServiceURL} of
27✔
997
        true ->
998
            log_debug("connecting to ~0p",
27✔
999
                      [#{broker_server => NewBrokerServer,
1000
                         service_url => NewBrokerServiceURL}], State0),
1001
            do_connect(State0);
27✔
1002
        false ->
1003
            %% broker changed; reconnect.
1004
            log_info("pulsar endpoint changed from ~0p to ~0p; reconnecting...",
×
1005
                      [ #{ broker_server => OldBrokerServer
1006
                         , proxy_url => OldBrokerServiceURL
1007
                         }
1008
                      , #{ broker_server => NewBrokerServer
1009
                         , proxy_url => NewBrokerServiceURL
1010
                         }
1011
                      ],
1012
                      State0),
1013
            try_close_socket(State0),
×
1014
            State = State0#{
×
1015
                broker_server := NewBrokerServer,
1016
                proxy_to_broker_url := NewBrokerServiceURL
1017
            },
1018
            ?NEXT_STATE_IDLE_RECONNECT(State)
×
1019
    end.
1020

1021
-spec notify_state_change(undefined | state_observer_callback(), statem()) -> ok.
1022
notify_state_change(undefined, _ProducerState) ->
1023
    ok;
×
1024
notify_state_change({Fun, Args}, ProducerState) ->
1025
    _ = apply(Fun, [ProducerState | Args]),
84✔
1026
    ok.
84✔
1027

1028
-ifdef(TEST).
1029
-include_lib("eunit/include/eunit.hrl").
1030

1031
maybe_log_discard_test_() ->
1032
    [ {"no increment, empty dictionary", fun() -> maybe_log_discard(undefined, 0) end}
2✔
1033
    , {"fake-last-old",
1034
       fun() ->
1035
         Inst0 = now_ts() - ?MIN_DISCARD_LOG_INTERVAL - 1,
1✔
1036
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1037
                                      , count_since_last_log => 2
1038
                                      , total_count => 2
1039
                                      }),
1040
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 1),
1✔
1041
         Stats = get_overflow_log_state(),
1✔
1042
         ?assertMatch(#{count_since_last_log := 3, total_count := 3}, Stats),
1✔
1043
         %% greater than the minimum interval because we just logged
1044
         ?assert(maps:get(last_log_inst, Stats) - Inst0 > ?MIN_DISCARD_LOG_INTERVAL),
1✔
1045
         ok
1✔
1046
       end}
1047
    , {"fake-last-fresh",
1048
       fun() ->
1049
         Inst0 = now_ts(),
1✔
1050
         ok = put_overflow_log_state(#{ last_log_inst => Inst0
1✔
1051
                                      , count_since_last_log => 2
1052
                                      , total_count => 2
1053
                                      }),
1054
         ok = maybe_log_discard(#{partitiontopic => <<"partitiontopic">>}, 2),
1✔
1055
         Stats = get_overflow_log_state(),
1✔
1056
         ?assertMatch(#{count_since_last_log := 2, total_count := 4}, Stats),
1✔
1057
         %% less than the minimum interval because we didn't log and just accumulated
1058
         ?assert(maps:get(last_log_inst, Stats) - Inst0 < ?MIN_DISCARD_LOG_INTERVAL),
1✔
1059
         ok
1✔
1060
       end}
1061
    ].
1062
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc