• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 773

29 Nov 2024 08:57PM UTC coverage: 71.844% (-2.0%) from 73.807%
773

push

github

web-flow
Merge pull request #68 from thalesmg/fix-lookup-redirect-mkII

fix(client): handle `Redirect` in `LookupTopicResponse`

189 of 261 new or added lines in 8 files covered. (72.41%)

29 existing lines in 3 files now uncovered.

939 of 1307 relevant lines covered (71.84%)

313.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.97
/src/pulsar_consumers.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14
-module(pulsar_consumers).
15
-define(MAX_CONSUMER_ID, 65535).
16

17

18
%% APIs
19
-export([start_supervised/3, stop_supervised/1, start_link/3]).
20
-export([all_connected/1]).
21

22
%% gen_server callbacks
23
-export([ code_change/3
24
        , handle_call/3
25
        , handle_cast/2
26
        , handle_info/2
27
        , init/1
28
        , terminate/2
29
        , format_status/1
30
        , format_status/2
31
        ]).
32

33
-record(state, {topic,
34
                client_id,
35
                partitions,
36
                consumer_opts,
37
                consumer_id = 0,
38
                consumers = #{}}).
39

40
-type consumers() :: #{ client := atom()
41
                      , topic := binary()
42
                      , name := atom()
43
                      }.
44

45
-define(T_RETRY_START, 5000).
46
-define(GET_TOPIC_METADATA_TIMEOUT, 30_000).
47
-define(LOOKUP_TOPIC_TIMEOUT, 30_000).
48
-define(GET_ALIVE_PULSAR_URL_TIMEOUT, 5_000).
49

50
%% @doc Start supervised consumer.
51
start_supervised(ClientId, Topic, ConsumerOpts) ->
52
    {ok, _Pid} = pulsar_consumers_sup:ensure_present(ClientId, Topic, ConsumerOpts),
16✔
53
    {ok, #{client => ClientId, topic => Topic, name => get_name(ConsumerOpts)}}.
16✔
54

55
stop_supervised(#{client := ClientId, name := Name}) ->
56
    pulsar_consumers_sup:ensure_absence(ClientId, Name).
2✔
57

58
-spec all_connected(consumers()) -> boolean().
59
all_connected(#{name := Name}) ->
60
    try
2✔
61
      ConsumerToPartitionTopicMap = gen_server:call(Name, get_consumers, 5_000),
2✔
62
      NumConsumers = map_size(ConsumerToPartitionTopicMap),
2✔
63
      (NumConsumers =/= 0) andalso
2✔
64
          lists:all(
1✔
65
            fun(Pid) ->
66
                    connected =:= pulsar_consumer:get_state(Pid)
1✔
67
            end,
68
            maps:keys(ConsumerToPartitionTopicMap))
69
    catch
70
        _:_ ->
71
            false
×
72
    end.
73

74
%% @doc start pulsar_consumers gen_server
75
start_link(ClientId, Topic, ConsumerOpts) ->
76
    gen_server:start_link({local, get_name(ConsumerOpts)}, ?MODULE, [ClientId, Topic, ConsumerOpts], []).
14✔
77

78
init([ClientId, Topic, ConsumerOpts]) ->
79
    erlang:process_flag(trap_exit, true),
14✔
80
    {ok, #state{topic = Topic,
14✔
81
                client_id = ClientId,
82
                consumer_opts = ConsumerOpts}, 0}.
83

84
handle_call(get_consumers, _From, State = #state{consumers = Consumers}) ->
85
    {reply, Consumers, State};
2✔
86
handle_call(_Call, _From, State) ->
87
    {reply, {error, unknown_call}, State}.
×
88

89
handle_cast(_Cast, State) ->
90
    {noreply, State}.
×
91

92
handle_info(timeout, State = #state{client_id = ClientId, topic = Topic}) ->
93
    case pulsar_client_manager:get_topic_metadata(ClientId, Topic, ?GET_TOPIC_METADATA_TIMEOUT) of
14✔
94
        {ok, {_, Partitions}} ->
95
            PartitionTopics = create_partition_topic(Topic, Partitions),
14✔
96
            NewState = lists:foldl(
14✔
97
                fun(PartitionTopic, CurrentState) ->
98
                    start_consumer(ClientId, PartitionTopic, CurrentState)
14✔
99
                end,
100
                State, PartitionTopics),
101
            {noreply, NewState#state{partitions = length(PartitionTopics)}};
14✔
102
        {error, Reason} ->
NEW
103
            log_error("get topic metadata failed: ~p", [Reason]),
×
UNCOV
104
            {stop, {shutdown, Reason}, State}
×
105
    end;
106
handle_info({'EXIT', Pid, _Error}, State = #state{consumers = Consumers}) ->
107
    case maps:get(Pid, Consumers, undefined) of
6✔
108
        undefined ->
109
            log_error("Not find Pid:~p consumer", [Pid]),
×
110
            {noreply, State};
×
111
        PartitionTopic ->
112
            restart_consumer_later(PartitionTopic),
6✔
113
            {noreply, State#state{consumers = maps:remove(Pid, Consumers)}}
6✔
114
    end;
115
handle_info({restart_consumer, PartitionTopic}, State = #state{client_id = ClientId}) ->
116
    {noreply, start_consumer(ClientId, PartitionTopic, State)};
6✔
117
handle_info(_Info, State) ->
118
    log_error("Receive unknown message:~p~n", [_Info]),
×
119
    {noreply, State}.
×
120

121
code_change(_OldVsn, State, _Extra) ->
122
    {ok, State}.
×
123

124
terminate(_, _St) -> ok.
14✔
125

126
format_status(Status) ->
127
    maps:map(
×
128
      fun(state, State) ->
129
              censor_secrets(State);
×
130
         (_Key, Value)->
131
              Value
×
132
      end,
133
      Status).
134

135
%% `format_status/2' is deprecated as of OTP 25.0
136
format_status(_Opt, [_PDict, State0]) ->
137
    State = censor_secrets(State0),
×
138
    [{data, [{"State", State}]}].
×
139

140
censor_secrets(State0 = #state{consumer_opts = Opts0}) ->
141
    Opts1 = censor_conn_opts(Opts0),
×
142
    Opts =
×
143
        maps:map(
144
          fun(cb_init_args, CBInitArgs) ->
145
                  censor_conn_opts(CBInitArgs);
×
146
             (_Key, Val) ->
147
                  Val
×
148
          end,
149
          Opts1),
150
    State0#state{consumer_opts = Opts}.
×
151

152
censor_conn_opts(Opts0 = #{conn_opts := ConnOpts0 = #{auth_data := _}}) ->
153
    Opts0#{conn_opts := ConnOpts0#{auth_data := "******"}};
×
154
censor_conn_opts(Opts) ->
155
    Opts.
×
156

157
create_partition_topic(Topic, 0) ->
158
    [Topic];
14✔
159
create_partition_topic(Topic, Partitions) ->
160
    lists:map(fun(Partition) ->
×
161
        lists:concat([Topic, "-partition-", Partition])
×
162
    end,lists:seq(0, Partitions-1)).
163

164
get_name(ConsumerOpts) -> maps:get(name, ConsumerOpts, ?MODULE).
30✔
165

166
log_error(Fmt, Args) ->
167
    do_log(error, Fmt, Args).
1✔
168

169
do_log(Level, Fmt, Args) ->
170
    logger:log(Level, Fmt, Args, #{domain => [pulsar, consumers]}).
1✔
171

172
start_consumer(ClientId, PartitionTopic, #state{consumer_opts = ConsumerOpts} = State) ->
173
    try
20✔
174
        {ok, #{ brokerServiceUrl := BrokerServiceURL
20✔
175
              , proxy_through_service_url := IsProxy
176
              }} =
177
            pulsar_client_manager:lookup_topic(ClientId, PartitionTopic, ?LOOKUP_TOPIC_TIMEOUT),
178
        {MaxConsumerMum, ConsumerOpts1} = case maps:take(max_consumer_num, ConsumerOpts) of
19✔
179
            error -> {1, ConsumerOpts};
×
180
            Res -> Res
19✔
181
        end,
182
        lists:foldl(
19✔
183
            fun(_, #state{consumer_id = CurrentID, consumers = Consumers} = CurrentState) ->
184
                ConsumerOptsWithConsumerID = maps:put(consumer_id, CurrentID, ConsumerOpts1),
19✔
185
                {AlivePulsarURL, ProxyToBrokerURL} = case IsProxy of
19✔
186
                    false ->
187
                        {BrokerServiceURL, undefined};
×
188
                    true ->
189
                        {ok, URL} = pulsar_client_manager:get_alive_pulsar_url(
19✔
190
                                      ClientId, ?GET_ALIVE_PULSAR_URL_TIMEOUT),
191
                        {URL, BrokerServiceURL}
19✔
192
                end,
193
                {ok, Consumer} =
19✔
194
                    pulsar_consumer:start_link(PartitionTopic, AlivePulsarURL,
195
                        ProxyToBrokerURL, ConsumerOptsWithConsumerID),
196
                NewState = next_consumer_id(CurrentState),
19✔
197
                NewState#state{consumers = maps:put(Consumer, PartitionTopic, Consumers)}
19✔
198
            end,
199
            State, lists:seq(1, MaxConsumerMum))
200
    catch
201
        Error : Reason : Stacktrace ->
202
            log_error("Start consumer: ~p, ~p", [Error, {Reason, Stacktrace}]),
1✔
203
            restart_consumer_later(PartitionTopic),
1✔
204
            State
1✔
205
    end.
206

207
restart_consumer_later(PartitionTopic) ->
208
    erlang:send_after(?T_RETRY_START, self(), {restart_consumer, PartitionTopic}).
7✔
209

210
next_consumer_id(#state{consumer_id = ?MAX_CONSUMER_ID} = Stat) ->
211
    Stat#state{consumer_id = 0};
×
212
next_consumer_id(#state{consumer_id = ID} = Stat) ->
213
    Stat#state{consumer_id = ID + 1}.
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc