• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 861

24 Nov 2025 09:12PM UTC coverage: 73.517% (+1.0%) from 72.54%
861

push

github

web-flow
Merge pull request #79 from emqx/251124-publish-single-message-if-batch-size-is-1

251124 publish single message if batch size is 1

60 of 64 new or added lines in 8 files covered. (93.75%)

2 existing lines in 1 file now uncovered.

1041 of 1416 relevant lines covered (73.52%)

256.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.96
/src/pulsar_socket.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_socket).
16

17
-export([ peername/2
18
        , connect/3
19
        , close/2
20
        , controlling_process/3
21
        ]).
22

23
-export([ send_connect_packet/2
24
        , send_lookup_topic_packet/5
25
        , send_topic_metadata_packet/4
26
        , send_subscribe_packet/7
27
        , send_set_flow_packet/4
28
        , send_ack_packet/5
29
        , encode_send_batch_message_packet/5
30
        , encode_send_single_message_packet/5
31
        , send_batch_message_packet/7
32
        , send_create_producer_packet/5
33
        , ping/2
34
        , pong/2
35
        , getstat/2
36
        , get_pulsar_uri/2
37
        ]).
38

39
%% exposed ONLY for mocking
40
-export([internal_getopts/3]).
41

42
%% Internal export, only for `pulsar_socket_writer'
43
-export([tcp_module/1]).
44

45
-define(SEND_TIMEOUT, 60000).
46
-define(CONN_TIMEOUT, 30000).
47

48
-define(INTERNAL_TCP_OPTS,
49
    [ binary
50
    , {packet, 4}
51
    , {active, true}
52
    , {reuseaddr, true}
53
    ]).
54

55
-define(DEF_TCP_OPTS,
56
    [ {nodelay, true}
57
    , {sndbuf, 1_000_000}
58
    , {recbuf, 1_000_000}
59
    , {send_timeout, ?SEND_TIMEOUT}
60
    , {send_timeout_close, true}
61
    ]).
62

63
send_connect_packet(Sock, Opts) ->
64
    Mod = tcp_module(Opts),
90✔
65
    ConnOpts = opt(conn_opts, Opts, #{}),
90✔
66
    Mod:send(Sock, pulsar_protocol_frame:connect(ConnOpts)).
90✔
67

68
send_topic_metadata_packet(Sock, Topic, RequestId, Opts) ->
69
    Mod = tcp_module(Opts),
36✔
70
    Metadata = topic_metadata_cmd(Topic, RequestId),
36✔
71
    Mod:send(Sock, pulsar_protocol_frame:topic_metadata(Metadata)).
36✔
72

73
send_lookup_topic_packet(Sock, Topic, RequestId, ReqOpts, Opts) ->
74
    Mod = tcp_module(Opts),
80✔
75
    LookupCmd = lookup_topic_cmd(Topic, RequestId, ReqOpts),
80✔
76
    Mod:send(Sock, pulsar_protocol_frame:lookup_topic(LookupCmd)).
80✔
77

78
send_subscribe_packet(Sock, Topic, RequestId, ConsumerId, Subscription, SubType, Opts) ->
79
    Mod = tcp_module(Opts),
24✔
80
    SubInfo = #{
24✔
81
        topic => Topic,
82
        subscription => Subscription,
83
        subType => SubType,
84
        consumer_id => ConsumerId,
85
        request_id => RequestId
86
    },
87
    Mod:send(Sock, pulsar_protocol_frame:create_subscribe(SubInfo)).
24✔
88

89
send_set_flow_packet(Sock, ConsumerId, FlowSize, Opts) ->
90
    Mod = tcp_module(Opts),
26✔
91
    FlowInfo = #{
26✔
92
        consumer_id => ConsumerId,
93
        messagePermits => FlowSize
94
    },
95
    Mod:send(Sock, pulsar_protocol_frame:set_flow(FlowInfo)).
26✔
96

97
send_ack_packet(Sock, ConsumerId, AckType, MsgIds, Opts) ->
98
    Mod = tcp_module(Opts),
640✔
99
    Ack = #{
640✔
100
        consumer_id => ConsumerId,
101
        ack_type => AckType,
102
        message_id => MsgIds
103
    },
104
    Mod:send(Sock, pulsar_protocol_frame:ack(Ack)).
640✔
105

106
send_batch_message_packet(Sock, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts) ->
107
    {NumMessages, EncodedMsg} =
×
108
        encode_send_batch_message_packet(Messages, SequenceId, ProducerId,
109
                                         ProducerName, Opts),
110
    Mod = tcp_module(Opts),
×
111
    pulsar_metrics:send(Topic, NumMessages),
×
112
    Mod:send(Sock, EncodedMsg).
×
113

114
encode_send_batch_message_packet(Messages, SequenceId, ProducerId, ProducerName, Opts) ->
115
    Len = length(Messages),
800✔
116
    SendCmd = message_snd_cmd(Len, ProducerId, SequenceId),
800✔
117
    BatchMsg = batch_message_cmd(Messages, Opts),
800✔
118
    MsgMetadata = batch_message_cmd_metadata(ProducerName, SequenceId, Len),
800✔
119
    {Len, pulsar_protocol_frame:send(SendCmd, MsgMetadata, BatchMsg)}.
800✔
120

121
encode_send_single_message_packet(Message, SequenceId, ProducerId, ProducerName, Opts) ->
122
    SendCmd = message_snd_cmd(1, ProducerId, SequenceId),
17✔
123
    Compression = compression_type(opt(compression, Opts, no_compression)),
17✔
124
    #{key := Key, value := Msg} = Message,
17✔
125
    Msg1 = maybe_compression(Msg, Compression),
17✔
126
    MsgMetadata = single_message_cmd_metadata(ProducerName, SequenceId, Key, erlang:iolist_size(Msg), Compression),
17✔
127
    {1, pulsar_protocol_frame:send(SendCmd, MsgMetadata, Msg1)}.
17✔
128

129
send_create_producer_packet(Sock, Topic, RequestId, ProducerId, Opts) ->
130
    Mod = tcp_module(Opts),
29✔
131
    Producer = #{
29✔
132
        topic => Topic,
133
        producer_id => ProducerId,
134
        request_id => RequestId
135
    },
136
    Mod:send(Sock, pulsar_protocol_frame:create_producer(Producer)).
29✔
137

138
ping(Sock, Opts) ->
139
    Mod = tcp_module(Opts),
12✔
140
    Mod:send(Sock, pulsar_protocol_frame:ping()).
12✔
141

142
pong(Sock, Opts) ->
143
    Mod = tcp_module(Opts),
15✔
144
    Mod:send(Sock, pulsar_protocol_frame:pong()).
15✔
145

146
getstat(Sock, Opts) ->
147
    InetM = inet_module(Opts),
189✔
148
    InetM:getstat(Sock).
189✔
149

150
peername(Sock, Opts) ->
151
    Mod = inet_module(Opts),
73✔
152
    Mod:peername(Sock).
73✔
153

154
connect(Host, Port, Opts) ->
155
    TcpMod = tcp_module(Opts),
1,256✔
156
    {ConnOpts, Timeout} = connect_opts(Opts),
1,256✔
157
    case TcpMod:connect(Host, Port, ConnOpts, Timeout) of
1,256✔
158
        {ok, Sock} ->
159
            case tune_buffer(inet_module(Opts), Sock) of
387✔
160
                ok ->
161
                    TcpMod:controlling_process(Sock, self()),
90✔
162
                    {ok, Sock};
90✔
163
                Error ->
164
                    Error
297✔
165
            end;
166
        {error, _} = Error ->
167
            Error
869✔
168
    end.
169

170
controlling_process(Sock, Pid, Opts) ->
171
    TcpMod = tcp_module(Opts),
33✔
172
    TcpMod:controlling_process(Sock, Pid),
33✔
173
    ok.
33✔
174

175
close(Sock, Opts) ->
176
    try
35✔
177
        TcpMod = tcp_module(Opts),
35✔
178
        TcpMod:close(Sock)
35✔
179
    catch
180
        _:_ ->
181
            ok
×
182
    end.
183

184
get_pulsar_uri(Sock, Opts) ->
185
    case peername(Sock, Opts) of
73✔
186
        {ok, {IP, Port}} ->
187
            {ok, pulsar_scheme(Opts) ++ "://" ++ inet:ntoa(IP) ++ ":" ++ integer_to_list(Port)};
73✔
188
        {error, _} = Error ->
189
            Error
×
190
    end.
191

192
connect_opts(Opts) ->
193
    TcpOpts = opt(tcp_opts, Opts, []),
1,256✔
194
    SslOpts = opt(ssl_opts, Opts, []),
1,256✔
195
    ConnTimeout = opt(connect_timeout, Opts, ?CONN_TIMEOUT),
1,256✔
196
    ConnOpts = case opt(enable_ssl, Opts, false) of
1,256✔
197
        true -> pulsar_utils:merge_opts([?DEF_TCP_OPTS, TcpOpts, SslOpts, ?INTERNAL_TCP_OPTS]);
×
198
        false -> pulsar_utils:merge_opts([?DEF_TCP_OPTS, TcpOpts, ?INTERNAL_TCP_OPTS])
1,256✔
199
    end,
200
    {ConnOpts, ConnTimeout}.
1,256✔
201

202
topic_metadata_cmd(Topic, RequestId) ->
203
    #{
36✔
204
        topic => Topic,
205
        request_id => RequestId
206
    }.
207

208
lookup_topic_cmd(Topic, RequestId, ReqOpts) ->
209
    Authoritative = maps:get(authoritative, ReqOpts, false),
80✔
210
    #{
80✔
211
        topic => Topic,
212
        request_id => RequestId,
213
        authoritative => Authoritative
214
    }.
215

216
message_snd_cmd(Len, ProducerId, SequenceId) when is_integer(Len) ->
217
    #{
817✔
218
        producer_id => ProducerId,
219
        sequence_id => SequenceId,
220
        num_messages => Len
221
    }.
222

223
batch_message_cmd(Messages, Opts) ->
224
    Compression = compression_type(opt(compression, Opts, no_compression)),
800✔
225
    lists:foldl(fun(#{key := Key, value := Msg}, Acc) ->
800✔
226
            Msg1 = maybe_compression(Msg, Compression),
2,275✔
227
            SMetadata = single_message_metadata(Key, erlang:iolist_size(Msg), Compression),
2,275✔
228
            SMetadataBin = pulsar_api:encode_msg(SMetadata, 'SingleMessageMetadata'),
2,275✔
229
            SMetadataBinSize = erlang:iolist_size(SMetadataBin),
2,275✔
230
            <<Acc/binary, SMetadataBinSize:32, SMetadataBin/binary, Msg1/binary>>
2,275✔
231
        end, <<>>, Messages).
232

233
single_message_metadata(undefined, Size, Compression) ->
234
    #{ payload_size => Size
2,275✔
235
     , compression => Compression
236
     };
237
single_message_metadata(Key, Size, Compression) ->
238
    (single_message_metadata(undefined, Size, Compression))#{partition_key => Key}.
2,275✔
239

240
batch_message_cmd_metadata(ProducerName, SequenceId, Len) ->
241
    #{
800✔
242
        producer_name => ProducerName,
243
        sequence_id => SequenceId,
244
        publish_time => erlang:system_time(millisecond),
245
        compression => 'NONE',
246
        num_messages_in_batch => Len
247
    }.
248

249
single_message_cmd_metadata(ProducerName, SequenceId, undefined, Size, Compression) ->
250
    BaseMetadata = #{
17✔
251
        producer_name => ProducerName,
252
        sequence_id => SequenceId,
253
        publish_time => erlang:system_time(millisecond),
254
        compression => Compression
255
    },
256
    case Compression of
17✔
257
        'NONE' -> BaseMetadata;
17✔
NEW
258
        _ -> BaseMetadata#{uncompressed_size => Size}
×
259
    end;
260
single_message_cmd_metadata(ProducerName, SequenceId, Key, Size, Compression) ->
261
    (single_message_cmd_metadata(ProducerName, SequenceId, undefined, Size, Compression))#{
13✔
262
        partition_key => Key
263
    }.
264

265
compression_type(snappy) ->'SNAPPY';
×
266
compression_type(zlib) ->'ZLIB';
×
267
compression_type(_) ->'NONE'.
817✔
268

269
maybe_compression(Bin, 'SNAPPY') ->
270
    {ok, Compressed} = snappyer:compress(Bin),
×
271
    Compressed;
×
272
maybe_compression(Bin, 'ZLIB') ->
273
    zlib:compress(Bin);
×
274
maybe_compression(Bin, _) ->
275
    Bin.
2,292✔
276

277
%%=======================================================================================
278
%% Helpers
279
%%=======================================================================================
280

281
pulsar_scheme(Opts) ->
282
    case opt(enable_ssl, Opts, false) of
73✔
283
        false -> "pulsar";
73✔
284
        true -> "pulsar+ssl"
×
285
    end.
286

287
tcp_module(Opts) ->
288
    case opt(enable_ssl, Opts, false) of
3,091✔
289
        false -> gen_tcp;
3,091✔
290
        true -> ssl
×
291
    end.
292

293
inet_module(Opts) ->
294
    case opt(enable_ssl, Opts, false) of
649✔
295
        false -> inet;
649✔
296
        true -> ssl
×
297
    end.
298

299
%% to allow mocking
300
internal_getopts(InetM, Sock, Opts) ->
301
    InetM:getopts(Sock, Opts).
90✔
302

303
tune_buffer(InetM, Sock) ->
304
    case ?MODULE:internal_getopts(InetM, Sock, [recbuf, sndbuf]) of
387✔
305
        {ok, Opts} ->
306
            RecBuf = proplists:get_value(recbuf, Opts),
90✔
307
            SndBuf = proplists:get_value(sndbuf, Opts),
90✔
308
            InetM:setopts(Sock, [{buffer, max(RecBuf, SndBuf)}]),
90✔
309
            ok;
90✔
310
        Error ->
311
            Error
297✔
312
    end.
313

314
opt(Key, Opts, Default) when is_list(Opts) ->
315
    opt(Key, maps:from_list(Opts), Default);
×
316
opt(Key, Opts, Default) ->
317
    maps:get(Key, Opts, Default).
9,744✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc