• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1688388367

13 Sep 2024 11:01AM UTC coverage: 71.907% (-10.0%) from 81.9%
1688388367

push

gitlab-ci

web-flow
Merge pull request #335 from mzedel/chore/deprecate

Chore/deprecate

1633 of 2271 relevant lines covered (71.91%)

12.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.27
/client/nats/client.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package nats
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "time"
22

23
        natsio "github.com/nats-io/nats.go"
24

25
        "github.com/mendersoftware/go-lib-micro/log"
26
)
27

28
const (
29
        // Set reconnect buffer size in bytes (10 MB)
30
        reconnectBufSize = 10 * 1024 * 1024
31
        // Set reconnect interval to 1 second
32
        reconnectWaitTime = 1 * time.Second
33
)
34

35
var (
36
        ErrIncompatibleConsumer = errors.New("nats: cannot subscribe to a pull consumer")
37
)
38

39
type UnsubscribeFunc func() error
40

41
// Client is the nats client
42
//
43
//go:generate ../../utils/mockgen.sh
44
type Client interface {
45
        Close()
46
        StreamName() string
47
        IsConnected() bool
48
        JetStreamCreateStream(streamName string) error
49
        GetConsumerConfig(name string) (*ConsumerConfig, error)
50
        CreateConsumer(name string, upsert bool, config ConsumerConfig) error
51
        JetStreamSubscribe(
52
                ctx context.Context,
53
                subj,
54
                durable string,
55
                q chan *natsio.Msg,
56
        ) (UnsubscribeFunc, error)
57
        JetStreamPublish(string, []byte) error
58
}
59

60
// NewClient returns a new nats client
61
func NewClient(url string, streamName string, opts ...natsio.Option) (Client, error) {
2✔
62
        natsClient, err := natsio.Connect(url, opts...)
2✔
63
        if err != nil {
2✔
64
                return nil, err
×
65
        }
×
66
        js, err := natsClient.JetStream()
2✔
67
        if err != nil {
2✔
68
                return nil, err
×
69
        }
×
70
        return &client{
2✔
71
                nats:       natsClient,
2✔
72
                streamName: streamName,
2✔
73
                js:         js,
2✔
74
        }, nil
2✔
75
}
76

77
// NewClient returns a new nats client with default options
78
func NewClientWithDefaults(url string, streamName string) (Client, error) {
2✔
79
        ctx := context.Background()
2✔
80
        l := log.FromContext(ctx)
2✔
81

2✔
82
        natsClient, err := NewClient(url,
2✔
83
                streamName,
2✔
84
                func(o *natsio.Options) error {
4✔
85
                        o.AllowReconnect = true
2✔
86
                        o.MaxReconnect = -1
2✔
87
                        o.ReconnectBufSize = reconnectBufSize
2✔
88
                        o.ReconnectWait = reconnectWaitTime
2✔
89
                        o.RetryOnFailedConnect = true
2✔
90
                        o.ClosedCB = func(_ *natsio.Conn) {
4✔
91
                                l.Info("nats client closed the connection")
2✔
92
                        }
2✔
93
                        o.DisconnectedErrCB = func(_ *natsio.Conn, e error) {
4✔
94
                                if e != nil {
2✔
95
                                        l.Warnf("nats client disconnected, err: %v", e)
×
96
                                }
×
97
                        }
98
                        o.ReconnectedCB = func(_ *natsio.Conn) {
2✔
99
                                l.Warn("nats client reconnected")
×
100
                        }
×
101
                        return nil
2✔
102
                },
103
        )
104
        if err != nil {
2✔
105
                return nil, err
×
106
        }
×
107
        return natsClient, nil
2✔
108
}
109

110
type client struct {
111
        nats       *natsio.Conn
112
        js         natsio.JetStreamContext
113
        streamName string
114
}
115

116
// IsConnected returns true if the client is connected to nats
117
func (c *client) StreamName() string {
13✔
118
        return c.streamName
13✔
119
}
13✔
120

121
// Close closes the connection to nats
122
func (c *client) Close() {
2✔
123
        c.nats.Close()
2✔
124
}
2✔
125

126
// IsConnected returns true if the client is connected to nats
127
func (c *client) IsConnected() bool {
×
128
        return c.nats.IsConnected()
×
129
}
×
130

131
// JetStreamCreateStream creates a stream
132
func (c *client) JetStreamCreateStream(streamName string) error {
1✔
133
        stream, err := c.js.StreamInfo(streamName)
1✔
134
        if err != nil && err != natsio.ErrStreamNotFound {
1✔
135
                return err
×
136
        }
×
137
        if stream == nil {
2✔
138
                _, err = c.js.AddStream(&natsio.StreamConfig{
1✔
139
                        Name:      streamName,
1✔
140
                        NoAck:     false,
1✔
141
                        MaxAge:    24 * time.Hour,
1✔
142
                        Retention: natsio.WorkQueuePolicy,
1✔
143
                        Storage:   natsio.FileStorage,
1✔
144
                        Subjects:  []string{streamName + ".>"},
1✔
145
                })
1✔
146
                if err != nil {
1✔
147
                        return err
×
148
                }
×
149
        }
150
        return nil
1✔
151
}
152

153
func noop() error {
×
154
        return nil
×
155
}
×
156

157
type ConsumerConfig struct {
158
        // Filter expression for which topics this consumer covers.
159
        Filter string
160
        // MaxPending messages in the work queue.
161
        // NOTE: This sets an upper limit on the horizontal scalability of the
162
        // service.
163
        MaxPending int
164
        // MaxDeliver sets the maximum amount of time the message will be
165
        // (re-) delivered.
166
        MaxDeliver int
167
        // AckWait sets the time to wait for message acknowledgement before
168
        // resending the message.
169
        AckWait time.Duration
170
}
171

172
func (cfg ConsumerConfig) Validate() error {
1✔
173
        if cfg.AckWait < time.Second {
1✔
174
                return fmt.Errorf(
×
175
                        "invalid consumer configuration AckWait: %s < 1s",
×
176
                        cfg.AckWait)
×
177
        }
×
178
        if cfg.MaxDeliver < 1 {
1✔
179
                return fmt.Errorf(
×
180
                        "invalid consumer configuration MaxDeliver: %d < 1",
×
181
                        cfg.MaxDeliver)
×
182
        }
×
183
        if cfg.MaxPending < 1 {
1✔
184
                return fmt.Errorf(
×
185
                        "invalid consumer configuration MaxPending: %d < 1",
×
186
                        cfg.MaxPending)
×
187
        }
×
188
        return nil
1✔
189
}
190

191
const consumerVersionString = "workflows/v1"
192

193
func (cfg ConsumerConfig) toNats(name string, deliverSubject string) *natsio.ConsumerConfig {
1✔
194
        if deliverSubject == "" {
2✔
195
                deliverSubject = natsio.NewInbox()
1✔
196
        }
1✔
197
        return &natsio.ConsumerConfig{
1✔
198
                Name:         name, // To preserve behavior of the internal library,
1✔
199
                Durable:      name, // the consumer-, durable- and delivery group name
1✔
200
                DeliverGroup: name, // are all set to the durable name.
1✔
201

1✔
202
                Description:    consumerVersionString,
1✔
203
                DeliverSubject: deliverSubject,
1✔
204

1✔
205
                FilterSubject: cfg.Filter,
1✔
206
                AckWait:       cfg.AckWait,
1✔
207
                MaxAckPending: cfg.MaxPending,
1✔
208
                MaxDeliver:    cfg.MaxDeliver,
1✔
209

1✔
210
                AckPolicy:     natsio.AckExplicitPolicy,
1✔
211
                DeliverPolicy: natsio.DeliverAllPolicy,
1✔
212
        }
1✔
213
}
214

215
func configFromNats(cfg natsio.ConsumerConfig) ConsumerConfig {
1✔
216
        return ConsumerConfig{
1✔
217
                Filter:     cfg.FilterSubject,
1✔
218
                MaxPending: cfg.MaxAckPending,
1✔
219
                MaxDeliver: cfg.MaxDeliver,
1✔
220
                AckWait:    cfg.AckWait,
1✔
221
        }
1✔
222
}
1✔
223

224
func (c *client) GetConsumerConfig(name string) (*ConsumerConfig, error) {
1✔
225
        consumerInfo, err := c.js.ConsumerInfo(c.streamName, name)
1✔
226
        if err != nil {
1✔
227
                return nil, err
×
228
        } else if consumerInfo == nil {
1✔
229
                return nil, fmt.Errorf("nats: nil consumer")
×
230
        }
×
231
        cfg := configFromNats(consumerInfo.Config)
1✔
232
        return &cfg, nil
1✔
233
}
234

235
func (c *client) CreateConsumer(name string, upsert bool, config ConsumerConfig) error {
1✔
236
        consumerInfo, err := c.js.ConsumerInfo(c.streamName, name)
1✔
237
        if errors.Is(err, natsio.ErrConsumerNotFound) {
2✔
238
                _, err = c.js.AddConsumer(c.streamName, config.toNats(name, ""))
1✔
239
                var apiErr *natsio.APIError
1✔
240
                if err == nil {
2✔
241
                        return nil
1✔
242
                } else if errors.As(err, &apiErr) &&
1✔
243
                        apiErr.ErrorCode == natsio.JSErrCodeConsumerAlreadyExists {
×
244
                        // Race: consumer was just created between ConsumerInfo and AddConsumer
×
245
                        consumerInfo, err = c.js.ConsumerInfo(c.streamName, name)
×
246
                }
×
247
        }
248
        if err != nil {
×
249
                return fmt.Errorf("nats: error getting consumer info: %w", err)
×
250
        }
×
251
        if upsert {
×
252
                if consumerInfo.Config.DeliverSubject == "" {
×
253
                        return ErrIncompatibleConsumer
×
254
                }
×
255
                _, err = c.js.UpdateConsumer(
×
256
                        c.streamName,
×
257
                        config.toNats(name, consumerInfo.Config.DeliverSubject),
×
258
                )
×
259
                if err == nil {
×
260
                        return nil
×
261
                }
×
262
        }
263
        return err
×
264
}
265

266
// JetStreamSubscribe subscribes to messages from the given subject with a durable subscriber
267
func (c *client) JetStreamSubscribe(
268
        ctx context.Context,
269
        subj, durable string,
270
        q chan *natsio.Msg,
271
) (UnsubscribeFunc, error) {
1✔
272
        sub, err := c.js.ChanQueueSubscribe(subj, durable, q,
1✔
273
                natsio.Bind(c.streamName, durable),
1✔
274
                natsio.ManualAck(),
1✔
275
                natsio.Context(ctx),
1✔
276
        )
1✔
277
        if err != nil {
1✔
278
                return noop, err
×
279
        }
×
280

281
        return sub.Unsubscribe, nil
1✔
282
}
283

284
// JetStreamPublish publishes a message to the given subject
285
func (c *client) JetStreamPublish(subj string, data []byte) error {
12✔
286
        _, err := c.js.Publish(subj, data)
12✔
287
        return err
12✔
288
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc