• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 714256730

pending completion
714256730

Pull #79

gitlab-ci

Fabio Tranchitella
refac: optimize the mapping cache taking into account the max size
Pull Request #79: MEN-5598: map inventory attributes to sequential fields

343 of 410 new or added lines in 12 files covered. (83.66%)

4 existing lines in 2 files now uncovered.

1689 of 2125 relevant lines covered (79.48%)

12.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.11
/client/nats/client.go
1
// Copyright 2021 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package nats
16

17
import (
18
        "context"
19
        "time"
20

21
        "github.com/nats-io/nats.go"
22
)
23

24
const (
25
        // Set reconnect buffer size in bytes (10 MB)
26
        reconnectBufSize = 10 * 1024 * 1024
27
        // Set reconnect interval to 1 second
28
        reconnectWaitTimeSeconds = 1 * time.Second
29
        // Set the number of redeliveries for a message
30
        maxRedeliverCount = 3
31
        // Set the number of inflight messages; setting it to 1 we explicitly
32
        // tell the NATS server that we want to process jobs serially, one by one
33
        maxAckPending = 1
34
        // Set the ACK wait
35
        ackWaitSeconds = 30 * time.Second
36
)
37

38
type UnsubscribeFunc func() error
39

40
// Client is the nats client
41
//go:generate ../../x/mockgen.sh
42
type Client interface {
43
        Close()
44
        WithStreamName(streamName string) Client
45
        StreamName() string
46
        IsConnected() bool
47
        JetStreamCreateStream(streamName string) error
48
        JetStreamSubscribe(ctx context.Context, subj, durable string,
49
                q chan *nats.Msg) (UnsubscribeFunc, error)
50
        JetStreamPublish(string, []byte) error
51
}
52

53
// NewClient returns a new connected NATS client
54
func NewClient(url string, opts ...nats.Option) (Client, error) {
1✔
55
        natsClient, err := nats.Connect(url, opts...)
1✔
56
        if err != nil {
1✔
57
                return nil, err
×
58
        }
×
59
        js, err := natsClient.JetStream()
1✔
60
        if err != nil {
1✔
61
                return nil, err
×
62
        }
×
63
        return &client{
1✔
64
                nats: natsClient,
1✔
65
                js:   js,
1✔
66
        }, nil
1✔
67
}
68

69
// NewClient returns a new nats client with default options
70
func NewClientWithDefaults(url string) (Client, error) {
1✔
71
        natsClient, err := NewClient(url,
1✔
72
                nats.ReconnectBufSize(reconnectBufSize),
1✔
73
                nats.ReconnectWait(reconnectWaitTimeSeconds),
1✔
74
        )
1✔
75
        if err != nil {
1✔
76
                return nil, err
×
77
        }
×
78
        return natsClient, nil
1✔
79
}
80

81
type client struct {
82
        nats       *nats.Conn
83
        js         nats.JetStreamContext
84
        streamName string
85
}
86

87
// IsConnected returns true if the client is connected to nats
88
func (c *client) WithStreamName(streamName string) Client {
1✔
89
        c.streamName = streamName
1✔
90
        return c
1✔
91
}
1✔
92

93
// IsConnected returns true if the client is connected to nats
94
func (c *client) StreamName() string {
×
95
        return c.streamName
×
96
}
×
97

98
// Close closes the connection to nats
UNCOV
99
func (c *client) Close() {
×
UNCOV
100
        c.nats.Close()
×
UNCOV
101
}
×
102

103
// IsConnected returns true if the client is connected to nats
104
func (c *client) IsConnected() bool {
×
105
        return c.nats.IsConnected()
×
106
}
×
107

108
// JetStreamCreateStream creates a stream
109
func (c *client) JetStreamCreateStream(streamName string) error {
1✔
110
        stream, err := c.js.StreamInfo(streamName)
1✔
111
        if err != nil && err != nats.ErrStreamNotFound {
1✔
112
                return err
×
113
        }
×
114
        if stream == nil {
2✔
115
                _, err = c.js.AddStream(&nats.StreamConfig{
1✔
116
                        Name:      streamName,
1✔
117
                        NoAck:     false,
1✔
118
                        MaxAge:    24 * time.Hour,
1✔
119
                        Retention: nats.WorkQueuePolicy,
1✔
120
                        Storage:   nats.FileStorage,
1✔
121
                        Subjects:  []string{streamName + ".>"},
1✔
122
                })
1✔
123
                if err != nil {
1✔
124
                        return err
×
125
                }
×
126
        }
127
        return nil
1✔
128
}
129

130
func noop() error {
×
131
        return nil
×
132
}
×
133

134
// JetStreamSubscribe subscribes to messages from the given subject with a durable subscriber
135
func (c *client) JetStreamSubscribe(
136
        ctx context.Context,
137
        subj, durable string,
138
        q chan *nats.Msg,
139
) (UnsubscribeFunc, error) {
1✔
140
        sub, err := c.js.ChanQueueSubscribe(subj, durable, q,
1✔
141
                nats.AckExplicit(),
1✔
142
                nats.AckWait(ackWaitSeconds),
1✔
143
                nats.ManualAck(),
1✔
144
                nats.MaxAckPending(maxAckPending),
1✔
145
                nats.MaxDeliver(maxRedeliverCount),
1✔
146
        )
1✔
147
        if err != nil {
1✔
148
                return noop, err
×
149
        }
×
150

151
        return sub.Unsubscribe, nil
1✔
152
}
153

154
// JetStreamPublish publishes a message to the given subject
155
func (c *client) JetStreamPublish(subj string, data []byte) error {
×
156
        _, err := c.js.Publish(subj, data)
×
157
        return err
×
158
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc