• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 833

28 Feb 2025 12:48PM UTC coverage: 72.601% (+0.3%) from 72.261%
833

push

github

web-flow
Merge pull request #75 from thalesmg/20250227-oom-drop

fix(producer): handle `drop_if_high_mem` option

74 of 94 new or added lines in 4 files covered. (78.72%)

4 existing lines in 2 files now uncovered.

991 of 1365 relevant lines covered (72.6%)

261.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.31
/src/pulsar_socket.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_socket).
16

17
-export([ peername/2
18
        , connect/3
19
        , close/2
20
        , controlling_process/3
21
        ]).
22

23
-export([ send_connect_packet/2
24
        , send_lookup_topic_packet/5
25
        , send_topic_metadata_packet/4
26
        , send_subscribe_packet/7
27
        , send_set_flow_packet/4
28
        , send_ack_packet/5
29
        , encode_send_batch_message_packet/5
30
        , send_batch_message_packet/7
31
        , send_create_producer_packet/5
32
        , ping/2
33
        , pong/2
34
        , getstat/2
35
        , get_pulsar_uri/2
36
        ]).
37

38
%% exposed ONLY for mocking
39
-export([internal_getopts/3]).
40

41
%% Internal export, only for `pulsar_socket_writer'
42
-export([tcp_module/1]).
43

44
-define(SEND_TIMEOUT, 60000).
45
-define(CONN_TIMEOUT, 30000).
46

47
-define(INTERNAL_TCP_OPTS,
48
    [ binary
49
    , {packet, 4}
50
    , {active, true}
51
    , {reuseaddr, true}
52
    ]).
53

54
-define(DEF_TCP_OPTS,
55
    [ {nodelay, true}
56
    , {sndbuf, 1_000_000}
57
    , {recbuf, 1_000_000}
58
    , {send_timeout, ?SEND_TIMEOUT}
59
    , {send_timeout_close, true}
60
    ]).
61

62
send_connect_packet(Sock, Opts) ->
63
    Mod = tcp_module(Opts),
86✔
64
    ConnOpts = opt(conn_opts, Opts, #{}),
86✔
65
    Mod:send(Sock, pulsar_protocol_frame:connect(ConnOpts)).
86✔
66

67
send_topic_metadata_packet(Sock, Topic, RequestId, Opts) ->
68
    Mod = tcp_module(Opts),
34✔
69
    Metadata = topic_metadata_cmd(Topic, RequestId),
34✔
70
    Mod:send(Sock, pulsar_protocol_frame:topic_metadata(Metadata)).
34✔
71

72
send_lookup_topic_packet(Sock, Topic, RequestId, ReqOpts, Opts) ->
73
    Mod = tcp_module(Opts),
76✔
74
    LookupCmd = lookup_topic_cmd(Topic, RequestId, ReqOpts),
76✔
75
    Mod:send(Sock, pulsar_protocol_frame:lookup_topic(LookupCmd)).
76✔
76

77
send_subscribe_packet(Sock, Topic, RequestId, ConsumerId, Subscription, SubType, Opts) ->
78
    Mod = tcp_module(Opts),
24✔
79
    SubInfo = #{
24✔
80
        topic => Topic,
81
        subscription => Subscription,
82
        subType => SubType,
83
        consumer_id => ConsumerId,
84
        request_id => RequestId
85
    },
86
    Mod:send(Sock, pulsar_protocol_frame:create_subscribe(SubInfo)).
24✔
87

88
send_set_flow_packet(Sock, ConsumerId, FlowSize, Opts) ->
89
    Mod = tcp_module(Opts),
26✔
90
    FlowInfo = #{
26✔
91
        consumer_id => ConsumerId,
92
        messagePermits => FlowSize
93
    },
94
    Mod:send(Sock, pulsar_protocol_frame:set_flow(FlowInfo)).
26✔
95

96
send_ack_packet(Sock, ConsumerId, AckType, MsgIds, Opts) ->
97
    Mod = tcp_module(Opts),
638✔
98
    Ack = #{
638✔
99
        consumer_id => ConsumerId,
100
        ack_type => AckType,
101
        message_id => MsgIds
102
    },
103
    Mod:send(Sock, pulsar_protocol_frame:ack(Ack)).
638✔
104

105
send_batch_message_packet(Sock, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts) ->
NEW
106
    {NumMessages, EncodedMsg} =
×
107
        encode_send_batch_message_packet(Messages, SequenceId, ProducerId,
108
                                         ProducerName, Opts),
UNCOV
109
    Mod = tcp_module(Opts),
×
NEW
110
    pulsar_metrics:send(Topic, NumMessages),
×
NEW
111
    Mod:send(Sock, EncodedMsg).
×
112

113
encode_send_batch_message_packet(Messages, SequenceId, ProducerId, ProducerName, Opts) ->
114
    Len = length(Messages),
799✔
115
    SendCmd = message_snd_cmd(Len, ProducerId, SequenceId),
799✔
116
    BatchMsg = batch_message_cmd(Messages, Opts),
799✔
117
    MsgMetadata = batch_message_cmd_metadata(ProducerName, SequenceId, Len),
799✔
118
    {Len, pulsar_protocol_frame:send(SendCmd, MsgMetadata, BatchMsg)}.
799✔
119

120
send_create_producer_packet(Sock, Topic, RequestId, ProducerId, Opts) ->
121
    Mod = tcp_module(Opts),
27✔
122
    Producer = #{
27✔
123
        topic => Topic,
124
        producer_id => ProducerId,
125
        request_id => RequestId
126
    },
127
    Mod:send(Sock, pulsar_protocol_frame:create_producer(Producer)).
27✔
128

129
ping(Sock, Opts) ->
130
    Mod = tcp_module(Opts),
12✔
131
    Mod:send(Sock, pulsar_protocol_frame:ping()).
12✔
132

133
pong(Sock, Opts) ->
134
    Mod = tcp_module(Opts),
15✔
135
    Mod:send(Sock, pulsar_protocol_frame:pong()).
15✔
136

137
getstat(Sock, Opts) ->
138
    InetM = inet_module(Opts),
179✔
139
    InetM:getstat(Sock).
179✔
140

141
peername(Sock, Opts) ->
142
    Mod = inet_module(Opts),
69✔
143
    Mod:peername(Sock).
69✔
144

145
connect(Host, Port, Opts) ->
146
    TcpMod = tcp_module(Opts),
1,249✔
147
    {ConnOpts, Timeout} = connect_opts(Opts),
1,249✔
148
    case TcpMod:connect(Host, Port, ConnOpts, Timeout) of
1,249✔
149
        {ok, Sock} ->
150
            case tune_buffer(inet_module(Opts), Sock) of
381✔
151
                ok ->
152
                    TcpMod:controlling_process(Sock, self()),
86✔
153
                    {ok, Sock};
86✔
154
                Error ->
155
                    Error
295✔
156
            end;
157
        {error, _} = Error ->
158
            Error
868✔
159
    end.
160

161
controlling_process(Sock, Pid, Opts) ->
162
    TcpMod = tcp_module(Opts),
31✔
163
    TcpMod:controlling_process(Sock, Pid),
31✔
164
    ok.
31✔
165

166
close(Sock, Opts) ->
167
    try
33✔
168
        TcpMod = tcp_module(Opts),
33✔
169
        TcpMod:close(Sock)
33✔
170
    catch
171
        _:_ ->
172
            ok
×
173
    end.
174

175
get_pulsar_uri(Sock, Opts) ->
176
    case peername(Sock, Opts) of
69✔
177
        {ok, {IP, Port}} ->
178
            {ok, pulsar_scheme(Opts) ++ "://" ++ inet:ntoa(IP) ++ ":" ++ integer_to_list(Port)};
69✔
179
        {error, _} = Error ->
180
            Error
×
181
    end.
182

183
connect_opts(Opts) ->
184
    TcpOpts = opt(tcp_opts, Opts, []),
1,249✔
185
    SslOpts = opt(ssl_opts, Opts, []),
1,249✔
186
    ConnTimeout = opt(connect_timeout, Opts, ?CONN_TIMEOUT),
1,249✔
187
    ConnOpts = case opt(enable_ssl, Opts, false) of
1,249✔
188
        true -> pulsar_utils:merge_opts([?DEF_TCP_OPTS, TcpOpts, SslOpts, ?INTERNAL_TCP_OPTS]);
×
189
        false -> pulsar_utils:merge_opts([?DEF_TCP_OPTS, TcpOpts, ?INTERNAL_TCP_OPTS])
1,249✔
190
    end,
191
    {ConnOpts, ConnTimeout}.
1,249✔
192

193
topic_metadata_cmd(Topic, RequestId) ->
194
    #{
34✔
195
        topic => Topic,
196
        request_id => RequestId
197
    }.
198

199
lookup_topic_cmd(Topic, RequestId, ReqOpts) ->
200
    Authoritative = maps:get(authoritative, ReqOpts, false),
76✔
201
    #{
76✔
202
        topic => Topic,
203
        request_id => RequestId,
204
        authoritative => Authoritative
205
    }.
206

207
message_snd_cmd(Len, ProducerId, SequenceId) when is_integer(Len) ->
208
    #{
799✔
209
        producer_id => ProducerId,
210
        sequence_id => SequenceId,
211
        num_messages => Len
212
    }.
213

214
batch_message_cmd(Messages, Opts) ->
215
    Compression = compression_type(opt(compression, Opts, no_compression)),
799✔
216
    lists:foldl(fun(#{key := Key, value := Msg}, Acc) ->
799✔
217
            Msg1 = maybe_compression(Msg, Compression),
2,275✔
218
            SMetadata = single_message_metadata(Key, erlang:iolist_size(Msg), Compression),
2,275✔
219
            SMetadataBin = pulsar_api:encode_msg(SMetadata, 'SingleMessageMetadata'),
2,275✔
220
            SMetadataBinSize = erlang:iolist_size(SMetadataBin),
2,275✔
221
            <<Acc/binary, SMetadataBinSize:32, SMetadataBin/binary, Msg1/binary>>
2,275✔
222
        end, <<>>, Messages).
223

224
single_message_metadata(undefined, Size, Compression) ->
225
    #{ payload_size => Size
2,275✔
226
     , compression => Compression
227
     };
228
single_message_metadata(Key, Size, Compression) ->
229
    (single_message_metadata(undefined, Size, Compression))#{partition_key => Key}.
2,275✔
230

231
batch_message_cmd_metadata(ProducerName, SequenceId, Len) ->
232
    #{
799✔
233
        producer_name => ProducerName,
234
        sequence_id => SequenceId,
235
        publish_time => erlang:system_time(millisecond),
236
        compression => 'NONE',
237
        num_messages_in_batch => Len
238
    }.
239

240
compression_type(snappy) ->'SNAPPY';
×
241
compression_type(zlib) ->'ZLIB';
×
242
compression_type(_) ->'NONE'.
799✔
243

244
maybe_compression(Bin, 'SNAPPY') ->
245
    {ok, Compressed} = snappyer:compress(Bin),
×
246
    Compressed;
×
247
maybe_compression(Bin, 'ZLIB') ->
248
    zlib:compress(Bin);
×
249
maybe_compression(Bin, _) ->
250
    Bin.
2,275✔
251

252
%%=======================================================================================
253
%% Helpers
254
%%=======================================================================================
255

256
pulsar_scheme(Opts) ->
257
    case opt(enable_ssl, Opts, false) of
69✔
258
        false -> "pulsar";
69✔
259
        true -> "pulsar+ssl"
×
260
    end.
261

262
tcp_module(Opts) ->
263
    case opt(enable_ssl, Opts, false) of
3,050✔
264
        false -> gen_tcp;
3,050✔
265
        true -> ssl
×
266
    end.
267

268
inet_module(Opts) ->
269
    case opt(enable_ssl, Opts, false) of
629✔
270
        false -> inet;
629✔
271
        true -> ssl
×
272
    end.
273

274
%% to allow mocking
275
internal_getopts(InetM, Sock, Opts) ->
276
    InetM:getopts(Sock, Opts).
86✔
277

278
tune_buffer(InetM, Sock) ->
279
    case ?MODULE:internal_getopts(InetM, Sock, [recbuf, sndbuf]) of
381✔
280
        {ok, Opts} ->
281
            RecBuf = proplists:get_value(recbuf, Opts),
86✔
282
            SndBuf = proplists:get_value(sndbuf, Opts),
86✔
283
            InetM:setopts(Sock, [{buffer, max(RecBuf, SndBuf)}]),
86✔
284
            ok;
86✔
285
        Error ->
286
            Error
295✔
287
    end.
288

289
opt(Key, Opts, Default) when is_list(Opts) ->
290
    opt(Key, maps:from_list(Opts), Default);
×
291
opt(Key, Opts, Default) ->
292
    maps:get(Key, Opts, Default).
9,629✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc