• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 773

29 Nov 2024 08:57PM UTC coverage: 71.844% (-2.0%) from 73.807%
773

push

github

web-flow
Merge pull request #68 from thalesmg/fix-lookup-redirect-mkII

fix(client): handle `Redirect` in `LookupTopicResponse`

189 of 261 new or added lines in 8 files covered. (72.41%)

29 existing lines in 3 files now uncovered.

939 of 1307 relevant lines covered (71.84%)

313.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/src/pulsar_producers.erl
1
%% Copyright (c) 2013-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_producers).
16

17
-define(MAX_PRODUCER_ID, 65535).
18

19
%% APIs
20
-export([start_supervised/3, stop_supervised/1, start_link/3]).
21

22
-export ([ pick_producer/2
23
         , all_connected/1
24
         ]).
25

26
%% gen_server callbacks
27
-export([ code_change/3
28
        , handle_call/3
29
        , handle_cast/2
30
        , handle_info/2
31
        , init/1
32
        , terminate/2
33
        , format_status/1
34
        , format_status/2
35
        ]).
36

37
-export([report_status/2]).
38

39
-record(state, {topic,
40
                client_id,
41
                workers,
42
                partitions,
43
                producer_opts,
44
                producer_id = 0,
45
                producers = #{}}).
46

47
-type clientid() :: atom().
48
-type topic() :: string().
49
-type produce_strategy() :: roundrobin | random.
50
-type producers() :: #{ client := clientid()
51
                      , topic := topic()
52
                      , workers := _Workers
53
                      , partitions := _Partitions
54
                      , strategy := produce_strategy()
55
                      }.
56

57
-export_type([producers/0]).
58

59
-define(T_RETRY_START, 5000).
60
-define(PRODUCER_STATE_INDEX, 3).
61
-define(GET_TOPIC_METADATA_TIMEOUT, 30_000).
62
-define(LOOKUP_TOPIC_TIMEOUT, 30_000).
63
-define(GET_ALIVE_PULSAR_URL_TIMEOUT, 5_000).
64

65
%% @doc Start supervised producers.
66
-spec start_supervised(clientid(), topic(), map()) -> {ok, producers()}.
67
start_supervised(ClientId, Topic, ProducerOpts) ->
68
  {ok, Pid} = pulsar_producers_sup:ensure_present(ClientId, Topic, ProducerOpts),
19✔
69
  {Partitions, Workers} = gen_server:call(Pid, get_workers, infinity),
19✔
70
  {ok, #{client => ClientId,
19✔
71
         topic => Topic,
72
         workers => Workers,
73
         partitions => Partitions,
74
         strategy => maps:get(strategy, ProducerOpts, random)
75
        }}.
76

77
-spec stop_supervised(producers()) -> ok.
78
stop_supervised(#{client := ClientId, workers := Workers}) ->
79
  pulsar_producers_sup:ensure_absence(ClientId, Workers).
3✔
80

81
%% @doc start pulsar_producers gen_server
82
start_link(ClientId, Topic, ProducerOpts) ->
83
    gen_server:start_link({local, get_name(ProducerOpts)}, ?MODULE, [ClientId, Topic, ProducerOpts], []).
18✔
84

85
-spec all_connected(producers()) -> boolean().
86
all_connected(#{workers := WorkersTable}) ->
87
    NumWorkers = ets:info(WorkersTable, size),
3✔
88
    (NumWorkers =/= 0) andalso
3✔
89
        ets:foldl(
2✔
90
          fun({_Partition, _ProducerPid, ProducerState}, Acc) ->
91
            Acc andalso ProducerState =:= connected
2✔
92
          end,
93
          true,
94
          WorkersTable).
95

96
pick_producer(#{workers := Workers, partitions := Partitions, strategy := Strategy}, Batch) ->
97
    Partition = pick_partition(Partitions, Strategy, Batch),
2,031✔
98
    do_pick_producer(Strategy, Partition, Partitions, Workers).
2,031✔
99

100
do_pick_producer(Strategy, Partition, Partitions, Workers) ->
101
    Pid = lookup_producer(Workers, Partition),
2,031✔
102
    case is_pid(Pid) andalso is_process_alive(Pid) of
2,031✔
103
        true ->
104
            {Partition, Pid};
1,837✔
105
        false when Strategy =:= random ->
106
            pick_next_alive(Workers, Partition, Partitions);
97✔
107
        false when Strategy =:= roundrobin ->
108
            R = pick_next_alive(Workers, Partition, Partitions),
97✔
109
            _ = put(pulsar_roundrobin, (Partition + 1) rem Partitions),
97✔
110
            R;
97✔
111
        false ->
112
            {error, producer_down}
×
113
    end.
114

115
pick_next_alive(Workers, Partition, Partitions) ->
116
    pick_next_alive(Workers, (Partition + 1) rem Partitions, Partitions, _Tried = 1).
194✔
117

118
pick_next_alive(_Workers, _Partition, Partitions, Partitions) ->
119
    {error, no_producers_available};
194✔
120
pick_next_alive(Workers, Partition, Partitions, Tried) ->
121
    Pid = lookup_producer(Workers, Partition),
×
122
    case is_alive(Pid) of
×
123
        true -> {Partition, Pid};
×
124
        false -> pick_next_alive(Workers, (Partition + 1) rem Partitions, Partitions, Tried + 1)
×
125
    end.
126

127
is_alive(Pid) -> is_pid(Pid) andalso is_process_alive(Pid).
×
128

129
lookup_producer(#{workers := Workers}, Partition) ->
130
    lookup_producer(Workers, Partition);
×
131
lookup_producer(Workers, Partition) ->
132
    case ets:lookup(Workers, Partition) of
2,031✔
133
        [{Partition, ProducerPid, _ProducerState}] -> ProducerPid;
1,837✔
134
        _ -> undefined
194✔
135
    end.
136

137
pick_partition(Partitions, random, _) ->
138
    rand:uniform(Partitions) - 1;
1,928✔
139
pick_partition(Partitions, roundrobin, _) ->
140
    Partition = case get(pulsar_roundrobin) of
103✔
141
        undefined -> 0;
1✔
142
        Number    -> Number
102✔
143
    end,
144
    _ = put(pulsar_roundrobin, (Partition + 1) rem Partitions),
103✔
145
    Partition;
103✔
146
pick_partition(Partitions, first_key_dispatch, [#{key := Key} | _]) ->
147
  murmerl3:hash_32(Key) rem Partitions.
×
148

149
init([ClientId, Topic, ProducerOpts]) ->
150
    erlang:process_flag(trap_exit, true),
18✔
151
    {ok, #state{topic = Topic,
18✔
152
                client_id = ClientId,
153
                producer_opts = ProducerOpts,
154
                workers = ets:new(get_name(ProducerOpts), [protected, named_table, {read_concurrency, true}])}, 0}.
155

156
handle_call(get_workers, _From, State = #state{workers = Workers, partitions = Partitions}) ->
157
    {reply, {Partitions, Workers}, State};
19✔
158
handle_call(_Call, _From, State) ->
159
    {reply, {error, unknown_call}, State}.
×
160

161
handle_cast(_Cast, State) ->
162
    {noreply, State}.
×
163

164
%% TODO: should be a `continue'.
165
handle_info(timeout, State = #state{client_id = ClientId, topic = Topic}) ->
166
    case pulsar_client_manager:get_topic_metadata(ClientId, Topic, ?GET_TOPIC_METADATA_TIMEOUT) of
19✔
167
        {ok, {_, Partitions}} ->
168
            PartitionTopics = create_partition_topic(Topic, Partitions),
18✔
169
            NewState = lists:foldl(
18✔
170
               fun({PartitionTopic, Partition}, CurrentState) ->
171
                 start_producer(ClientId, Partition, PartitionTopic, CurrentState)
18✔
172
               end,
173
               State,
174
               PartitionTopics),
175
            {noreply, NewState#state{partitions = length(PartitionTopics)}};
18✔
176
        {error, Reason} ->
NEW
177
            log_error("get topic metatdata failed: ~p", [Reason]),
×
NEW
178
            {stop, {failed_to_get_metadata, Reason}, State}
×
179
    end;
180
handle_info({'EXIT', Pid, Error}, State = #state{workers = Workers, producers = Producers}) ->
181
    log_error("Received EXIT from ~p, error: ~p", [Pid, Error]),
5✔
182
    case maps:get(Pid, Producers, undefined) of
5✔
183
        undefined ->
184
            log_error("Cannot find ~p from producers", [Pid]),
×
185
            {noreply, State};
×
186
        {Partition, PartitionTopic} ->
187
            ets:delete(Workers, Partition),
5✔
188
            log_error("Producer ~p down, restart it later", [Pid]),
5✔
189
            restart_producer_later(Partition, PartitionTopic),
5✔
190
            {noreply, State#state{producers = maps:remove(Pid, Producers)}}
5✔
191
    end;
192
handle_info({restart_producer, Partition, PartitionTopic}, State = #state{client_id = ClientId}) ->
193
    {noreply, start_producer(ClientId, Partition, PartitionTopic, State)};
4✔
194
handle_info({producer_state_change, ProducerPid, ProducerState},
195
            State = #state{producers = Producers, workers = WorkersTable})
196
  when is_map_key(ProducerPid, Producers) ->
197
    #{ProducerPid := {Partition, _PartitionTopic}} = Producers,
84✔
198
    true = ets:update_element(WorkersTable, Partition, {?PRODUCER_STATE_INDEX, ProducerState}),
84✔
199
    {noreply, State};
84✔
200
handle_info(_Info, State) ->
NEW
201
    log_error("Received unknown message: ~p~n", [_Info]),
×
202
    {noreply, State}.
×
203

204
code_change(_OldVsn, State, _Extra) ->
205
    {ok, State}.
×
206

207
terminate(_, _St) -> ok.
18✔
208

209
format_status(Status) ->
210
    maps:map(
1✔
211
      fun(state, State0) ->
212
              censor_secrets(State0);
1✔
213
         (_Key, Value)->
214
              Value
3✔
215
      end,
216
      Status).
217

218
%% `format_status/2' is deprecated as of OTP 25.0
219
format_status(_Opt, [_PDict, State0]) ->
220
    State = censor_secrets(State0),
×
221
    [{data, [{"State", State}]}].
×
222

223
censor_secrets(State0 = #state{producer_opts = Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}}) ->
224
    State0#state{producer_opts = Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}}};
×
225
censor_secrets(State) ->
226
    State.
1✔
227

228
restart_producer_later(Partition, PartitionTopic) ->
229
    erlang:send_after(?T_RETRY_START, self(), {restart_producer, Partition, PartitionTopic}).
5✔
230

231
create_partition_topic(Topic, 0) ->
232
    [{Topic, 0}];
18✔
233
create_partition_topic(Topic, Partitions) ->
234
    lists:map(fun(Partition) ->
×
235
        {lists:concat([Topic, "-partition-", Partition]), Partition}
×
236
    end, lists:seq(0, Partitions-1)).
237

238
get_name(ProducerOpts) -> maps:get(name, ProducerOpts, ?MODULE).
36✔
239

240
log_error(Fmt, Args) ->
241
    do_log(error, Fmt, Args).
10✔
242

243
do_log(Level, Fmt, Args) ->
244
    logger:log(Level, "[pulsar_producers] " ++ Fmt, Args, #{domain => [pulsar, producers]}).
10✔
245

246
start_producer(ClientId, Partition, PartitionTopic, State) ->
247
    try
22✔
248
        case pulsar_client_manager:lookup_topic(ClientId, PartitionTopic, ?LOOKUP_TOPIC_TIMEOUT) of
22✔
249
            {ok, #{ brokerServiceUrl := BrokerServiceURL
250
                  , proxy_through_service_url := IsProxy
251
                  }} ->
252
                do_start_producer(State, ClientId, Partition, PartitionTopic,
22✔
253
                    BrokerServiceURL, IsProxy);
254
            {error, Reason0} ->
255
                log_error("Lookup topic failed: ~p", [Reason0]),
×
256
                restart_producer_later(Partition, PartitionTopic),
×
257
                State
×
258
        end
259
    catch
260
        Error : Reason : Stacktrace ->
261
            log_error("Start producer error: ~p, ~p", [Error, {Reason, Stacktrace}]),
×
262
            restart_producer_later(Partition, PartitionTopic),
×
263
            State
×
264
    end.
265

266
do_start_producer(#state{
267
        client_id = ClientId,
268
        producers = Producers,
269
        workers = Workers,
270
        producer_opts = ProducerOpts0,
271
        producer_id = ProducerID} = State, Pid, Partition, PartitionTopic, BrokerServiceURL, IsProxy) ->
272
    NextID = next_producer_id(ProducerID),
22✔
273
    {AlivePulsarURL, ProxyToBrokerURL} = case IsProxy of
22✔
274
            false -> {BrokerServiceURL, undefined};
×
275
            true ->
276
                {ok, URL} = pulsar_client_manager:get_alive_pulsar_url(
22✔
277
                              Pid, ?GET_ALIVE_PULSAR_URL_TIMEOUT),
278
                {URL, BrokerServiceURL}
22✔
279
        end,
280
    ParentPid = self(),
22✔
281
    StateObserverCallback = {fun ?MODULE:report_status/2, [ParentPid]},
22✔
282
    ProducerOpts = ProducerOpts0#{ producer_id => NextID
22✔
283
                                 , clientid => ClientId
284
                                 , state_observer_callback => StateObserverCallback
285
                                 , parent_id => ParentPid
286
                                 },
287
    {ok, ProducerPid} = pulsar_producer:start_link(PartitionTopic,
22✔
288
        AlivePulsarURL, ProxyToBrokerURL, ProducerOpts),
289
    ProducerState = idle,
22✔
290
    ets:insert(Workers, {Partition, ProducerPid, ProducerState}),
22✔
291
    State#state{
22✔
292
        producers = maps:put(ProducerPid, {Partition, PartitionTopic}, Producers),
293
        producer_id = NextID
294
    }.
295

296
next_producer_id(?MAX_PRODUCER_ID) -> 0;
×
297
next_producer_id(ProducerID) ->
298
    ProducerID + 1.
22✔
299

300
%% Called by `pulsar_producer' when there's a state change.
301
report_status(ProducerState, ParentPid) ->
302
    ProducerPid = self(),
84✔
303
    ParentPid ! {producer_state_change, ProducerPid, ProducerState},
84✔
304
    ok.
84✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc