• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / pulsar-client-erl / 836

28 Feb 2025 01:14PM UTC coverage: 72.222% (-0.4%) from 72.601%
836

push

github

web-flow
Merge pull request #76 from thalesmg/20250228-fix-down-exit

fix: don't link with socket writer when stopping it

3 of 4 new or added lines in 1 file covered. (75.0%)

6 existing lines in 2 files now uncovered.

988 of 1368 relevant lines covered (72.22%)

261.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.98
/src/pulsar_socket_writer.erl
1
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(pulsar_socket_writer).
16

17
-behaviour(gen_server).
18

19
%% API
20
-export([
21
    start_link/4,
22
    stop/1,
23

24
    send_batch_async/7,
25
    ping_async/1,
26
    pong_async/1
27
]).
28

29
%% `gen_server' API
30
-export([
31
    init/1,
32
    handle_call/3,
33
    handle_cast/2,
34
    handle_info/2
35
]).
36

37
%%------------------------------------------------------------------------------
38
%% Type declarations
39
%%------------------------------------------------------------------------------
40

41
%% Calls/Casts/Infos
42
-record(send_batch, {topic, encoded_packet, num_messages}).
43
-record(ping, {}).
44
-record(pong, {}).
45

46
%%------------------------------------------------------------------------------
47
%% API
48
%%------------------------------------------------------------------------------
49

50
start_link(PartitionTopic, Host, Port, Opts) ->
51
    case pulsar_socket:connect(Host, Port, Opts) of
27✔
52
        {ok, Sock} ->
53
            Params = #{partition_topic => PartitionTopic, sock => Sock, opts => Opts},
27✔
54
            case gen_server:start_link(?MODULE, Params, []) of
27✔
55
                {ok, SockPid} ->
56
                    {ok, {SockPid, Sock}};
27✔
57
                Error ->
58
                    pulsar_socket:close(Sock, Opts),
×
59
                    Error
×
60
            end;
61
        Error ->
62
            Error
×
63
    end.
64

65
stop(SockPid) when is_pid(SockPid) ->
66
    MRef = monitor(process, SockPid),
8✔
67
    exit(SockPid, normal),
8✔
68
    receive
8✔
69
        {'DOWN', MRef, process, SockPid, _} ->
70
            ok
1✔
71
    after 5_000 ->
72
            exit(SockPid, kill),
6✔
73
            receive
6✔
74
                {'DOWN', MRef, process, SockPid, _} ->
75
                    ok
6✔
76
            end
77
    end,
78
    %% clean exit signal, if linked to it
79
    receive
7✔
80
        {'EXIT', SockPid, _} ->
81
            ok
7✔
82
    after 1 ->
NEW
83
            ok
×
84
    end.
85

86
send_batch_async(SockPid, Topic, Messages, SequenceId, ProducerId, ProducerName, Opts) ->
87
    {NumMessages, EncodedPacket0} =
799✔
88
        pulsar_socket:encode_send_batch_message_packet(Messages, SequenceId,
89
                                                       ProducerId, ProducerName, Opts),
90
    EncodedPacket = iolist_to_binary(EncodedPacket0),
799✔
91
    Req = #send_batch{topic = Topic, encoded_packet = EncodedPacket, num_messages = NumMessages},
799✔
92
    safe_cast(SockPid, Req).
799✔
93

94
ping_async(SockPid) ->
95
    safe_cast(SockPid, #ping{}).
6✔
96

97
pong_async(SockPid) ->
98
    safe_cast(SockPid, #pong{}).
5✔
99

100
%%------------------------------------------------------------------------------
101
%% `gen_server' API
102
%%------------------------------------------------------------------------------
103

104
init(#{sock := Sock, partition_topic := PartitionTopic, opts := Opts}) ->
105
    logger:set_process_metadata(#{domain => [pulsar, socket]}),
27✔
106
    pulsar_utils:set_label({?MODULE, PartitionTopic}),
27✔
107
    State = #{sock => Sock, opts => Opts},
27✔
108
    {ok, State}.
27✔
109

110
handle_call(Call, _From, State) ->
111
    {reply, {error, {unknown_call, Call}}, State}.
×
112

113
handle_cast(#ping{}, State) ->
114
    #{sock := Sock, opts := Opts} = State,
6✔
115
    Res = pulsar_socket:ping(Sock, Opts),
6✔
116
    ok_or_die(Res, State);
6✔
117
handle_cast(#pong{}, State) ->
118
    #{sock := Sock, opts := Opts} = State,
5✔
119
    Res = pulsar_socket:pong(Sock, Opts),
5✔
120
    ok_or_die(Res, State);
5✔
121
handle_cast(#send_batch{} = Req, State) ->
122
    #{sock := Sock, opts := Opts} = State,
799✔
123
    #send_batch{ topic = Topic
799✔
124
               , num_messages = NumMessages
125
               , encoded_packet = EncodedPacket} = Req,
126
    Mod = pulsar_socket:tcp_module(Opts),
799✔
127
    pulsar_metrics:send(Topic, NumMessages),
799✔
128
    Res = Mod:send(Sock, EncodedPacket),
799✔
129
    ok_or_die(Res, State);
799✔
130
handle_cast(_Cast, State) ->
131
    {noreply, State}.
×
132

133
handle_info(_Cast, State) ->
134
    {noreply, State}.
×
135

136
%%------------------------------------------------------------------------------
137
%% Internal fns
138
%%------------------------------------------------------------------------------
139

140
safe_cast(SockPid, Req) ->
141
    try
810✔
142
        gen_server:cast(SockPid, Req)
810✔
143
    catch
144
        exit:{noproc, _} ->
145
            %% Owner will notice process is down later
146
            ok
×
147
    end.
148

149
%% Only for casts/infos
150
ok_or_die(ok, State) ->
151
    {noreply, State};
809✔
152
ok_or_die(Error, State) ->
153
    {stop, Error, State}.
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc