• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 815512710

pending completion
815512710

Pull #121

gitlab-ci

Alf-Rune Siqveland
chore: Gracefully shut down indexer on SIGINT/SIGTERM
Pull Request #121: fix: Prevent overflowing the per_page parameter on device deployment requests

63 of 73 new or added lines in 2 files covered. (86.3%)

7 existing lines in 2 files now uncovered.

2808 of 3294 relevant lines covered (85.25%)

17.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.86
/client/nats/client.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package nats
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "errors"
21
        "time"
22

23
        "github.com/mendersoftware/go-lib-micro/log"
24
        "github.com/nats-io/nats.go"
25

26
        "github.com/mendersoftware/reporting/model"
27
)
28

29
const (
30
        // Set reconnect buffer size in bytes (10 MB)
31
        reconnectBufSize = 10 * 1024 * 1024
32
        // Set reconnect interval to 1 second
33
        reconnectWaitTimeSeconds = 1 * time.Second
34
        // Set the number of redeliveries for a message
35
        maxRedeliverCount = 3
36
        // Set the number of inflight messages; setting it to 1 we explicitly
37
        // tell the NATS server that we want to process jobs serially, one by one
38
        maxAckPending = 10
39
        // Set the ACK wait
40
        ackWaitSeconds = 30 * time.Second
41

42
        replicas = 2
43
)
44

45
var (
46
        ErrInconsistentConsumerConfig = errors.New(
47
                "consumer configuration is inconsistent: requires migration",
48
        )
49
)
50

51
type UnsubscribeFunc func() error
52

53
// Client is the nats client
54
//
55
//go:generate ../../x/mockgen.sh
56
type Client interface {
57
        Close()
58
        IsConnected() bool
59
        JetStreamSubscribe(ctx context.Context, sub, dur string, q chan model.Job) error
60
        JetStreamPublish(string, []byte) error
61
        Migrate(ctx context.Context, sub, dur string, recreate bool) error
62
}
63

64
// NewClient returns a new nats client with default options
65
func NewClient(url string) (Client, error) {
3✔
66
        natsClient, err := nats.Connect(url,
3✔
67
                nats.ReconnectBufSize(reconnectBufSize),
3✔
68
                nats.ReconnectWait(reconnectWaitTimeSeconds),
3✔
69
        )
3✔
70
        if err != nil {
3✔
71
                return nil, err
×
72
        }
×
73
        js, err := natsClient.JetStream()
3✔
74
        if err != nil {
3✔
75
                return nil, err
×
76
        }
×
77
        return &client{
3✔
78
                nats: natsClient,
3✔
79
                js:   js,
3✔
80
        }, nil
3✔
81
}
82

83
type client struct {
84
        nats *nats.Conn
85
        js   nats.JetStreamContext
86
}
87

88
// Close closes the connection to nats
89
func (c *client) Close() {
2✔
90
        c.nats.Close()
2✔
91
}
2✔
92

93
// IsConnected returns true if the client is connected to nats
94
func (c *client) IsConnected() bool {
×
95
        return c.nats.IsConnected()
×
96
}
×
97

98
func (c *client) Migrate(ctx context.Context, sub, dur string, recreate bool) error {
3✔
99
        cfg := &nats.ConsumerConfig{
3✔
100
                Name:          dur,
3✔
101
                Durable:       dur,
3✔
102
                Description:   "reporting/v2", // pull mode
3✔
103
                FilterSubject: sub,
3✔
104
                AckPolicy:     nats.AckExplicitPolicy,
3✔
105
                AckWait:       ackWaitSeconds,
3✔
106
                MaxAckPending: maxAckPending,
3✔
107
                MaxDeliver:    maxRedeliverCount,
3✔
108
                Replicas:      replicas,
3✔
109
        }
3✔
110
        stream, err := c.js.StreamNameBySubject(sub)
3✔
111
        if err != nil {
3✔
112
                return err
×
113
        }
×
114
        info, err := c.js.ConsumerInfo(stream, dur)
3✔
115
        if err == nats.ErrConsumerNotFound {
4✔
116
                _, err = c.js.AddConsumer(stream, cfg)
1✔
117
                return err
1✔
118
        } else if err != nil {
3✔
119
                return err
×
120
        }
×
121

122
        if info.Config.Description != cfg.Description {
2✔
123
                if !recreate {
×
124
                        return ErrInconsistentConsumerConfig
×
125
                }
×
126
                l := log.FromContext(ctx)
×
127
                l.Info("removing conflicting consumer configuration")
×
128
                err = c.js.DeleteConsumer(stream, dur)
×
129
                if err != nil {
×
130
                        return err
×
131
                }
×
132
                l.Info("recreating consumer configuration")
×
133
                _, err = c.js.AddConsumer(stream, cfg)
×
134
        }
135
        return err
2✔
136
}
137

138
// JetStreamSubscribe subscribes to messages from the given subject with a durable subscriber
139
func (c *client) JetStreamSubscribe(
140
        ctx context.Context,
141
        subj, durable string,
142
        q chan model.Job,
143
) error {
1✔
144
        if q == nil {
1✔
145
                return errors.New("nats: nil subscription channel")
×
146
        }
×
147
        err := c.Migrate(ctx, subj, durable, false)
1✔
148
        if err != nil {
1✔
149
                return err
×
150
        }
×
151

152
        sub, err := c.js.PullSubscribe(subj, durable, nats.Context(ctx))
1✔
153
        if err != nil {
1✔
154
                if err == nats.ErrPullSubscribeToPushConsumer {
×
155
                        return ErrInconsistentConsumerConfig
×
156
                }
×
157
                return err
×
158
        }
159
        go func() (err error) {
2✔
160
                l := log.FromContext(ctx)
1✔
161
                defer func() {
1✔
UNCOV
162
                        _ = sub.Unsubscribe()
×
UNCOV
163
                        if err != nil {
×
UNCOV
164
                                l.Error(err)
×
UNCOV
165
                        }
×
166
                }()
167
                opt := nats.Context(ctx)
1✔
168
                done := ctx.Done()
1✔
169
                var msgs []*nats.Msg
1✔
170
                for {
26✔
171
                        msgs, err = sub.Fetch(1, opt)
25✔
172
                        if err != nil {
30✔
173
                                if err == context.DeadlineExceeded {
10✔
174
                                        continue
5✔
175
                                }
UNCOV
176
                                close(q)
×
UNCOV
177
                                return err
×
178
                        }
179
                        for _, msg := range msgs {
38✔
180
                                var job model.Job
19✔
181
                                err = msg.Ack(opt)
19✔
182
                                if err != nil {
19✔
183
                                        close(q)
×
184
                                        return err
×
185
                                }
×
186
                                err = json.Unmarshal(msg.Data, &job)
19✔
187
                                if err != nil {
19✔
188
                                        close(q)
×
189
                                        return err
×
190
                                }
×
191
                                select {
19✔
192
                                case q <- job:
19✔
193

194
                                case <-done:
×
195
                                        close(q)
×
196
                                        return nil
×
197
                                }
198
                        }
199
                }
200
        }() //nolint: errcheck
201

202
        return nil
1✔
203
}
204

205
// JetStreamPublish publishes a message to the given subject
206
func (c *client) JetStreamPublish(subj string, data []byte) error {
×
207
        _, err := c.js.Publish(subj, data)
×
208
        return err
×
209
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc