• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 833

28 Feb 2025 12:48PM UTC coverage: 72.601% (+0.3%) from 72.261%
833

push

github

web-flow
Merge pull request #75 from thalesmg/20250227-oom-drop

fix(producer): handle `drop_if_high_mem` option

74 of 94 new or added lines in 4 files covered. (78.72%)

4 existing lines in 2 files now uncovered.

991 of 1365 relevant lines covered (72.6%)

261.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.09
/src/pulsar_socket_writer.erl
1
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_socket_writer).
16

17
-behaviour(gen_server).
18

19
%% API
20
-export([
21
    start_link/4,
22
    stop/1,
23

24
    send_batch_async/7,
25
    ping_async/1,
26
    pong_async/1
27
]).
28

29
%% `gen_server' API
30
-export([
31
    init/1,
32
    handle_call/3,
33
    handle_cast/2,
34
    handle_info/2
35
]).
36

37
%%------------------------------------------------------------------------------
38
%% Type declarations
39
%%------------------------------------------------------------------------------
40

41
%% Calls/Casts/Infos
42
-record(send_batch, {topic, encoded_packet, num_messages}).
43
-record(ping, {}).
44
-record(pong, {}).
45

46
%%------------------------------------------------------------------------------
47
%% API
48
%%------------------------------------------------------------------------------
49

50
start_link(PartitionTopic, Host, Port, Opts) ->
51
    case pulsar_socket:connect(Host, Port, Opts) of
27✔
52
        {ok, Sock} ->
53
            Params = #{partition_topic => PartitionTopic, sock => Sock, opts => Opts},
27✔
54
            case gen_server:start_link(?MODULE, Params, []) of
27✔
55
                {ok, SockPid} ->
56
                    {ok, {SockPid, Sock}};
27✔
57
                Error ->
NEW
58
                    pulsar_socket:close(Sock, Opts),
×
NEW
59
                    Error
×
60
            end;
61
        Error ->
NEW
62
            Error
×
63
    end.
64

65
stop(SockPid) when is_pid(SockPid) ->
66
    link(SockPid),
8✔
67
    exit(SockPid, normal),
8✔
68
    receive
8✔
69
        {'EXIT', SockPid, _} ->
70
            ok
1✔
71
    after 5_000 ->
72
            exit(SockPid, kill),
6✔
73
            receive
6✔
74
                {'EXIT', SockPid, _} ->
75
                    ok
6✔
76
            end
77
    end.
78

79
send_batch_async(SockPid, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts) ->
80
    {NumMessages, EncodedPacket0} =
799✔
81
        pulsar_socket:encode_send_batch_message_packet(Messages, SequenceId,
82
                                                       ProducerId, ProducerName, Opts),
83
    EncodedPacket = iolist_to_binary(EncodedPacket0),
799✔
84
    Req = #send_batch{topic = Topic, encoded_packet = EncodedPacket, num_messages = NumMessages},
799✔
85
    safe_cast(SockPid, Req).
799✔
86

87
ping_async(SockPid) ->
88
    safe_cast(SockPid, #ping{}).
6✔
89

90
pong_async(SockPid) ->
91
    safe_cast(SockPid, #pong{}).
5✔
92

93
%%------------------------------------------------------------------------------
94
%% `gen_server' API
95
%%------------------------------------------------------------------------------
96

97
init(#{sock := Sock, partition_topic := PartitionTopic, opts := Opts}) ->
98
    logger:set_process_metadata(#{domain => [pulsar, socket]}),
27✔
99
    pulsar_utils:set_label({?MODULE, PartitionTopic}),
27✔
100
    State = #{sock => Sock, opts => Opts},
27✔
101
    {ok, State}.
27✔
102

103
handle_call(Call, _From, State) ->
NEW
104
    {reply, {error, {unknown_call, Call}}, State}.
×
105

106
handle_cast(#ping{}, State) ->
107
    #{sock := Sock, opts := Opts} = State,
6✔
108
    Res = pulsar_socket:ping(Sock, Opts),
6✔
109
    ok_or_die(Res, State);
6✔
110
handle_cast(#pong{}, State) ->
111
    #{sock := Sock, opts := Opts} = State,
5✔
112
    Res = pulsar_socket:pong(Sock, Opts),
5✔
113
    ok_or_die(Res, State);
5✔
114
handle_cast(#send_batch{} = Req, State) ->
115
    #{sock := Sock, opts := Opts} = State,
799✔
116
    #send_batch{ topic = Topic
799✔
117
               , num_messages = NumMessages
118
               , encoded_packet = EncodedPacket} = Req,
119
    Mod = pulsar_socket:tcp_module(Opts),
799✔
120
    pulsar_metrics:send(Topic, NumMessages),
799✔
121
    Res = Mod:send(Sock, EncodedPacket),
799✔
122
    ok_or_die(Res, State);
799✔
123
handle_cast(_Cast, State) ->
NEW
124
    {noreply, State}.
×
125

126
handle_info(_Cast, State) ->
NEW
127
    {noreply, State}.
×
128

129
%%------------------------------------------------------------------------------
130
%% Internal fns
131
%%------------------------------------------------------------------------------
132

133
safe_cast(SockPid, Req) ->
134
    try
810✔
135
        gen_server:cast(SockPid, Req)
810✔
136
    catch
137
        exit:{noproc, _} ->
138
            %% Owner will notice process is down later
NEW
139
            ok
×
140
    end.
141

142
%% Only for casts/infos
143
ok_or_die(ok, State) ->
144
    {noreply, State};
809✔
145
ok_or_die(Error, State) ->
146
    {stop, Error, State}.
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc