• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-connect / 1095727126

05 Dec 2023 10:22AM UTC coverage: 100.0% (+35.9%) from 64.099%
1095727126

Pull #118

gitlab-ci

danielskinstad
test: added tests for exponential backoff and modified tests to make them work with backoff implementation

Changelog: None

Signed-off-by: Daniel Skinstad Drabitzius <daniel.drabitzius@northern.tech>
Pull Request #118: feat: added exponential back-off to handle failed attempts to establish connection to the server

29 of 49 new or added lines in 3 files covered. (59.18%)

1 existing line in 1 file now uncovered.

0 of 0 relevant lines covered (NaN%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.91
/connectionmanager/connectionmanager.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package connectionmanager
16

17
import (
18
        "context"
19
        "errors"
20
        "net/url"
21
        "sync"
22
        "time"
23

24
        "github.com/mendersoftware/go-lib-micro/ws"
25
        log "github.com/sirupsen/logrus"
26

27
        "github.com/mendersoftware/mender-connect/connection"
28
)
29

30
const (
31
        // Time allowed to write a message to the peer.
32
        writeWait = 4 * time.Second
33
        // Maximum message size allowed from peer.
34
        maxMessageSize = 8192
35

36
        httpsProtocol = "https"
37
        httpProtocol  = "http"
38
        wssProtocol   = "wss"
39
        wsProtocol    = "ws"
40
)
41

42
var (
43
        ErrHandlerNotRegistered       = errors.New("protocol handler not registered")
44
        ErrHandlerAlreadyRegistered   = errors.New("protocol handler already registered")
45
        ErrConnectionRetriesExhausted = errors.New("failed to connect after max number of retries")
46
)
47

48
var (
49
        reconnectingMutex   = sync.Mutex{}
50
        reconnecting        = map[ws.ProtoType]bool{}
51
        cancelReconnectChan = map[ws.ProtoType]chan bool{}
52
)
53

54
type ProtocolHandler struct {
55
        proto      ws.ProtoType
56
        connection *connection.Connection
57
        mutex      *sync.Mutex
58
}
59

60
var handlersByTypeMutex = &sync.Mutex{}
61
var handlersByType = map[ws.ProtoType]*ProtocolHandler{}
62
var DefaultPingWait = time.Minute
63

64
func GetWriteTimeout() time.Duration {
1✔
65
        return writeWait
1✔
66
}
1✔
67

68
var a *expBackoff = &expBackoff{
69
        maxInterval:  60 * time.Minute,
70
        attempts:     0,
71
        exceededMax:  false,
72
        maxBackoff:   120 * time.Minute,
73
        smallestUnit: time.Minute,
74
}
75

76
func connect(
77
        proto ws.ProtoType,
78
        serverUrl, connectUrl, token string,
79
        ctx context.Context,
80
) error {
2✔
81
        parsedUrl, err := url.Parse(serverUrl)
2✔
82
        if err != nil {
2✔
83
                return err
×
84
        }
×
85
        scheme := getWebSocketScheme(parsedUrl.Scheme)
2✔
86
        u := url.URL{Scheme: scheme, Host: parsedUrl.Host, Path: connectUrl}
2✔
87

2✔
88
        cancelReconnectChan[proto] = make(chan bool)
2✔
89
        var c *connection.Connection
2✔
90
        defer func() {
4✔
91
                setReconnecting(proto, false)
2✔
92
        }()
2✔
93

94
        setReconnecting(proto, true)
2✔
95
        c, err = connection.NewConnection(u, token, writeWait, maxMessageSize, DefaultPingWait)
2✔
96
        if err != nil || c == nil {
2✔
NEW
97
                if err == nil {
×
NEW
98
                        err = errors.New(
×
NEW
99
                                "unknown error: connection was nil but no error provided by" +
×
NEW
100
                                        " connection.NewConnection",
×
NEW
101
                        )
×
NEW
102
                }
×
103
                // Calculates backoff time before next connection attempt
NEW
104
                intvl := a.GetExponentialBackoffTime()
×
NEW
105
                a.attempts++
×
NEW
106
                log.Errorf("connection manager failed to connect to %s%s: %s; "+
×
NEW
107
                        "reconnecting in %s; len(token)=%d", serverUrl, connectUrl,
×
NEW
108
                        err.Error(), intvl, len(token))
×
NEW
109
                select {
×
NEW
110
                case cancelFlag := <-cancelReconnectChan[proto]:
×
NEW
111
                        log.Tracef("connectionmanager connect got cancelFlag=%+v", cancelFlag)
×
NEW
112
                        if cancelFlag {
×
NEW
113
                                return nil
×
114
                        }
×
NEW
115
                case <-ctx.Done():
×
NEW
116
                        return nil
×
117
                // Waits for the interval calculated by GetExponentialBackoffTime
NEW
118
                case <-time.After(intvl):
×
UNCOV
119
                        return err
×
120
                }
121
        }
122
        a.resetBackoff()
2✔
123

2✔
124
        handlersByType[proto] = &ProtocolHandler{
2✔
125
                proto:      proto,
2✔
126
                connection: c,
2✔
127
                mutex:      &sync.Mutex{},
2✔
128
        }
2✔
129

2✔
130
        // There could be a pending cancelReconnectChan request from CancelReconnection unprocessed
2✔
131
        select {
2✔
132
        case <-cancelReconnectChan[proto]:
×
133
                log.Trace("connectionmanager drained cancelReconnectChan")
×
134
        default:
2✔
135
        }
136

137
        return nil
2✔
138
}
139

140
func Connect(
141
        proto ws.ProtoType,
142
        serverUrl, connectUrl, token string,
143
        ctx context.Context,
144
) error {
2✔
145
        handlersByTypeMutex.Lock()
2✔
146
        defer handlersByTypeMutex.Unlock()
2✔
147

2✔
148
        if _, exists := handlersByType[proto]; exists {
3✔
149
                return ErrHandlerAlreadyRegistered
1✔
150
        }
1✔
151

152
        return connect(proto, serverUrl, connectUrl, token, ctx)
1✔
153
}
154

155
func Reconnect(
156
        proto ws.ProtoType,
157
        serverUrl, connectUrl, token string,
158
        ctx context.Context,
159
) error {
1✔
160
        handlersByTypeMutex.Lock()
1✔
161
        defer handlersByTypeMutex.Unlock()
1✔
162

1✔
163
        if h, exists := handlersByType[proto]; exists {
2✔
164
                if h != nil && h.connection != nil {
2✔
165
                        h.connection.Close()
1✔
166
                }
1✔
167
        }
168

169
        delete(handlersByType, proto)
1✔
170
        return connect(proto, serverUrl, connectUrl, token, ctx)
1✔
171
}
172

173
func Read(proto ws.ProtoType) (*ws.ProtoMsg, error) {
1✔
174
        handlersByTypeMutex.Lock()
1✔
175
        h := handlersByType[proto]
1✔
176
        if h == nil {
1✔
177
                handlersByTypeMutex.Unlock()
×
178
                return nil, ErrHandlerNotRegistered
×
179
        }
×
180

181
        handlersByTypeMutex.Unlock()
1✔
182
        return h.connection.ReadMessage()
1✔
183
}
184

185
func Write(proto ws.ProtoType, m *ws.ProtoMsg) error {
2✔
186
        handlersByTypeMutex.Lock()
2✔
187
        h := handlersByType[proto]
2✔
188
        if h == nil {
3✔
189
                handlersByTypeMutex.Unlock()
1✔
190
                return ErrHandlerNotRegistered
1✔
191
        }
1✔
192

193
        handlersByTypeMutex.Unlock()
1✔
194
        h.mutex.Lock()
1✔
195
        defer h.mutex.Unlock()
1✔
196

1✔
197
        return h.connection.WriteMessage(m)
1✔
198
}
199

200
func IsReconnecting(proto ws.ProtoType) bool {
6✔
201
        reconnectingMutex.Lock()
6✔
202
        defer reconnectingMutex.Unlock()
6✔
203
        return reconnecting[proto]
6✔
204
}
6✔
205

206
func setReconnecting(proto ws.ProtoType, v bool) bool {
4✔
207
        reconnectingMutex.Lock()
4✔
208
        defer reconnectingMutex.Unlock()
4✔
209
        reconnecting[proto] = v
4✔
210
        return v
4✔
211
}
4✔
212

213
func CancelReconnection(proto ws.ProtoType) bool {
×
214
        maxWaitSeconds := 8
×
215
        go func() {
×
216
                cancelReconnectChan[proto] <- true
×
217
        }()
×
218
        for maxWaitSeconds > 0 {
×
219
                time.Sleep(time.Second)
×
220
                if !IsReconnecting(proto) {
×
221
                        break
×
222
                }
223
                maxWaitSeconds--
×
224
        }
225
        if IsReconnecting(proto) {
×
226
                log.Error("failed to cancel reconnection")
×
227
                return false
×
228
        }
×
229
        return true
×
230
}
231

232
func Close(proto ws.ProtoType) error {
6✔
233
        if IsReconnecting(proto) {
6✔
234
                if !CancelReconnection(proto) {
×
235
                        return errors.New("failed to cancel ongoing reconnection")
×
236
                }
×
237
        }
238

239
        handlersByTypeMutex.Lock()
6✔
240
        defer handlersByTypeMutex.Unlock()
6✔
241

6✔
242
        h := handlersByType[proto]
6✔
243
        if h == nil {
8✔
244
                return ErrHandlerNotRegistered
2✔
245
        }
2✔
246

247
        return h.connection.Close()
4✔
248
}
249

250
func getWebSocketScheme(scheme string) string {
6✔
251
        if scheme == httpsProtocol {
7✔
252
                scheme = wssProtocol
1✔
253
        } else if scheme == httpProtocol {
7✔
254
                scheme = wsProtocol
1✔
255
        }
1✔
256
        return scheme
6✔
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc