• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 858

24 Nov 2025 04:28PM UTC coverage: 73.517%. First build
858

Pull #79

github

zmstone
ci: fix python venv
Pull Request #79: 251124 publish single message if batch size is 1

60 of 64 new or added lines in 8 files covered. (93.75%)

1041 of 1416 relevant lines covered (73.52%)

254.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.04
/src/pulsar_protocol_frame.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_protocol_frame).
16

17
-define(CONNECT, 'CONNECT').
18
-define(CONNECTED, 'CONNECTED').
19
-define(PARTITIONED_METADATA, 'PARTITIONED_METADATA').
20
-define(PARTITIONED_METADATA_RESPONSE, 'PARTITIONED_METADATA_RESPONSE').
21
-define(LOOKUP, 'LOOKUP').
22
-define(LOOKUP_RESPONSE, 'LOOKUP_RESPONSE').
23
-define(PRODUCER, 'PRODUCER').
24
-define(SUBSCRIBE, 'SUBSCRIBE').
25
-define(SUCCESS, 'SUCCESS').
26
-define(FLOW, 'FLOW').
27
-define(MESSAGE, 'MESSAGE').
28
-define(ACK, 'ACK').
29
-define(PRODUCER_SUCCESS, 'PRODUCER_SUCCESS').
30
-define(SEND, 'SEND').
31
-define(SEND_RECEIPT, 'SEND_RECEIPT').
32
-define(PING, 'PING').
33
-define(PONG, 'PONG').
34
-define(CLOSE_PRODUCER, 'CLOSE_PRODUCER').
35
-define(CLOSE_CONSUMER, 'CLOSE_CONSUMER').
36
-define(MAGIC_NUMBER, 3585).
37

38
%% Use protocol version 10 to work with pulsar proxy
39
-define(PROTO_VSN, 10).
40
-define(CLIENT_VSN, "Pulsar-Client-Erlang-v0.7").
41

42
-export ([ connect/0
43
         , connect/1
44
         , topic_metadata/1
45
         , lookup_topic/1
46
         , create_producer/1
47
         , create_subscribe/1
48
         , set_flow/1
49
         , ack/1
50
         , ping/0
51
         , pong/0
52
         , serialized_simple_command/1
53
         , serialized_payload_command/3
54
         , parse/1
55
         , send/3
56
         ]).
57

58
connect() ->
59
    connect(#{}).
×
60

61
connect(CommandConnect) ->
62
    serialized_simple_command(#{
90✔
63
        type => ?CONNECT,
64
        connect => maps:merge(default_connect_fields(), CommandConnect)
65
    }).
66

67
topic_metadata(PartitionMetadata) ->
68
    serialized_simple_command(#{
36✔
69
        type => ?PARTITIONED_METADATA,
70
        partitionMetadata => PartitionMetadata
71
    }).
72

73
lookup_topic(LookupTopic) ->
74
    serialized_simple_command(#{
80✔
75
        type => ?LOOKUP,
76
        lookupTopic => LookupTopic
77
    }).
78

79
create_producer(Producer) ->
80
    serialized_simple_command(#{
29✔
81
        type => ?PRODUCER,
82
        producer => Producer
83
    }).
84

85
create_subscribe(SubInfo) ->
86
    serialized_simple_command(#{
24✔
87
        type => ?SUBSCRIBE,
88
        subscribe => SubInfo
89
    }).
90

91
set_flow(FlowInfo) ->
92
    serialized_simple_command(#{
26✔
93
        type => ?FLOW,
94
        flow => FlowInfo
95
    }).
96

97
ack(Ack) ->
98
    serialized_simple_command(#{
638✔
99
        type => ?ACK,
100
        ack => Ack
101
    }).
102

103
send(Send, Metadata, BatchPayload) ->
104
    serialized_payload_command(#{
817✔
105
        type => ?SEND,
106
        send => Send
107
    }, pulsar_api:encode_msg(Metadata, 'MessageMetadata'), BatchPayload).
108

109
ping() ->
110
    serialized_simple_command(#{
12✔
111
        type => ?PING,
112
        ping => #{}
113
    }).
114

115
pong() ->
116
    serialized_simple_command(#{
15✔
117
        type => ?PONG,
118
        pong => #{}
119
    }).
120

121
default_connect_fields() ->
122
    #{ client_version => ?CLIENT_VSN
90✔
123
     , protocol_version => ?PROTO_VSN
124
     }.
125

126
parse(CmdBin) ->
127
    <<CommandSize:32, Command:CommandSize/binary, CmdRest/binary>> = CmdBin,
1,566✔
128
    BaseCommand = try_decode(CommandSize, Command),
1,566✔
129
    case maps:get(type, BaseCommand, unknown) of
1,566✔
130
        ?MESSAGE ->
131
            <<MetadataSize:32, Metadata:MetadataSize/binary, Payload0/binary>> = CmdRest,
638✔
132
            MetadataCmd = pulsar_api:decode_msg(<<MetadataSize:32, Metadata/binary>>, 'MessageMetadata'),
638✔
133
            %% If num_messages_in_batch is missing, it's a true single message (non-batch format)
134
            %% If present (even if = 1), it's batch format with SingleMessageMetadata headers
135
            Payloads = case maps:is_key(num_messages_in_batch, MetadataCmd) of
638✔
136
                false ->
137
                    %% num_messages_in_batch is missing - true single message format, no SingleMessageMetadata
NEW
138
                    [Payload0];
×
139
                true ->
140
                    %% num_messages_in_batch is present - batch format, parse accordingly
141
                    NumMessagesInBatch = maps:get(num_messages_in_batch, MetadataCmd),
638✔
142
                    parse_batch_message(Payload0, NumMessagesInBatch)
638✔
143
            end,
144
            {message, maps:get(message, BaseCommand), Payloads};
638✔
145
        ?CONNECTED ->
146
            {connected, maps:get(connected, BaseCommand)};
86✔
147
        ?PARTITIONED_METADATA_RESPONSE ->
148
            {partitionMetadataResponse, maps:get(partitionMetadataResponse, BaseCommand)};
36✔
149
        ?LOOKUP_RESPONSE ->
150
            {lookupTopicResponse, maps:get(lookupTopicResponse, BaseCommand)};
79✔
151
        ?PRODUCER_SUCCESS ->
152
            {producer_success, maps:get(producer_success, BaseCommand)};
29✔
153
        ?SEND_RECEIPT ->
154
            {send_receipt, maps:get(send_receipt, BaseCommand)};
651✔
155
        ?PING ->
156
            {ping, maps:get(ping, BaseCommand)};
15✔
157
        ?PONG ->
158
            {pong, maps:get(pong, BaseCommand)};
6✔
159
        ?CLOSE_PRODUCER ->
160
            {close_producer, maps:get(close_producer, BaseCommand)};
×
161
        ?CLOSE_CONSUMER ->
162
            {close_consumer, maps:get(close_consumer, BaseCommand)};
×
163
        ?SUCCESS ->
164
            {subscribe_success, maps:get(success, BaseCommand)};
24✔
165
        'ERROR' ->
166
            {error, maps:get(error, BaseCommand)};
2✔
167
        _Type ->
168
            logger:error("parse unknown type:~p~n", [BaseCommand]),
×
169
            unknown
×
170
    end.
171

172
serialized_simple_command(BaseCommand) ->
173
    BaseCommandBin = pulsar_api:encode_msg(BaseCommand, 'BaseCommand'),
950✔
174
    Size = size(BaseCommandBin),
950✔
175
    [<<Size:32>>, BaseCommandBin].
950✔
176

177
serialized_payload_command(BaseCommand, Metadata, BatchPayload) ->
178
    BaseCommandBin = pulsar_api:encode_msg(BaseCommand, 'BaseCommand'),
817✔
179
    BaseCommandSize = size(BaseCommandBin),
817✔
180
    MetadataSize = size(Metadata),
817✔
181
    %% Ensure Payload is binary so crc32cer do not need to copy again
182
    Payload = <<MetadataSize:32, Metadata/binary, BatchPayload/binary>>,
817✔
183
    Checksum = crc32cer:nif(Payload),
817✔
184
    [<<BaseCommandSize:32>>, BaseCommandBin, <<?MAGIC_NUMBER:16, Checksum:32>>, Payload].
817✔
185

186
parse_batch_message(Payloads, Size) ->
187
    parse_batch_message(Payloads, Size, []).
638✔
188
parse_batch_message(_Payloads, 0, Acc) ->
189
    lists:reverse(Acc);
638✔
190
parse_batch_message(Payloads, Size, Acc) ->
191
    <<SMetadataSize:32, SMetadata:SMetadataSize/binary, Rest/binary>> = Payloads,
2,114✔
192
    SingleMessageMetadata = pulsar_api:decode_msg(<<SMetadataSize:32, SMetadata/binary>>, 'SingleMessageMetadata'),
2,114✔
193
    PayloadSize = maps:get(payload_size, SingleMessageMetadata),
2,114✔
194
    <<Payload:PayloadSize/binary, Rest1/binary>> = Rest,
2,114✔
195
    parse_batch_message(Rest1, Size - 1, [Payload | Acc]).
2,114✔
196

197
try_decode(CommandSize, Command) ->
198
    try pulsar_api:decode_msg(<<CommandSize:32, Command/binary>>, 'BaseCommand') of
1,566✔
199
        BaseCommand -> BaseCommand
1,566✔
200
    catch _:_ ->
201
        pulsar_api:decode_msg(Command, 'BaseCommand')
×
202
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc