• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-connect / 1210990983

29 Feb 2024 10:56AM UTC coverage: 64.671% (-9.7%) from 74.341%
1210990983

push

gitlab-ci

web-flow
Merge pull request #118 from danielskinstad/backoff

feat: added back-off in connectionmanager

38 of 60 new or added lines in 3 files covered. (63.33%)

5 existing lines in 2 files now uncovered.

551 of 852 relevant lines covered (64.67%)

25.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.21
/connectionmanager/connectionmanager.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package connectionmanager
16

17
import (
18
        "context"
19
        "errors"
20
        "net/url"
21
        "sync"
22
        "time"
23

24
        "github.com/mendersoftware/go-lib-micro/ws"
25
        log "github.com/sirupsen/logrus"
26

27
        "github.com/mendersoftware/mender-connect/connection"
28
)
29

30
const (
31
        // Time allowed to write a message to the peer.
32
        writeWait = 4 * time.Second
33
        // Maximum message size allowed from peer.
34
        maxMessageSize = 8192
35

36
        httpsProtocol = "https"
37
        httpProtocol  = "http"
38
        wssProtocol   = "wss"
39
        wsProtocol    = "ws"
40
)
41

42
var (
43
        ErrHandlerNotRegistered       = errors.New("protocol handler not registered")
44
        ErrHandlerAlreadyRegistered   = errors.New("protocol handler already registered")
45
        ErrConnectionRetriesExhausted = errors.New("failed to connect after max number of retries")
46
)
47

48
var (
49
        reconnectingMutex   = sync.Mutex{}
50
        reconnecting        = map[ws.ProtoType]bool{}
51
        cancelReconnectChan = map[ws.ProtoType]chan bool{}
52
)
53

54
type ProtocolHandler struct {
55
        proto      ws.ProtoType
56
        connection *connection.Connection
57
        mutex      *sync.Mutex
58
}
59

60
var handlersByTypeMutex = &sync.Mutex{}
61
var handlersByType = map[ws.ProtoType]*ProtocolHandler{}
62
var DefaultPingWait = time.Minute
63

64
func GetWriteTimeout() time.Duration {
1✔
65
        return writeWait
1✔
66
}
1✔
67

68
var a *expBackoff = &expBackoff{
69
        maxInterval:  60 * time.Minute,
70
        attempts:     0,
71
        exceededMax:  false,
72
        maxBackoff:   120 * time.Minute,
73
        smallestUnit: time.Minute,
74
}
75

76
func connect(
77
        proto ws.ProtoType,
78
        serverUrl, connectUrl, token string,
79
        ctx context.Context,
80
) error {
2✔
81
        parsedUrl, err := url.Parse(serverUrl)
2✔
82
        if err != nil {
2✔
83
                return err
×
84
        }
×
85
        scheme := getWebSocketScheme(parsedUrl.Scheme)
2✔
86
        u := url.URL{Scheme: scheme, Host: parsedUrl.Host, Path: connectUrl}
2✔
87

2✔
88
        cancelReconnectChan[proto] = make(chan bool)
2✔
89
        var c *connection.Connection
2✔
90
        defer func() {
4✔
91
                setReconnecting(proto, false)
2✔
92
        }()
2✔
93

94
        setReconnecting(proto, true)
2✔
95
        if err := a.WaitForBackoff(ctx, proto); err != nil {
2✔
NEW
96
                log.Errorf("error in WaitForBackoff: %s", err)
×
NEW
97
                return err
×
NEW
98
        }
×
99
        c, err = connection.NewConnection(u, token, writeWait, maxMessageSize, DefaultPingWait)
2✔
100
        if err != nil || c == nil {
2✔
NEW
101
                if err == nil {
×
NEW
102
                        err = errors.New(
×
NEW
103
                                "unknown error: connection was nil but no error provided by" +
×
NEW
104
                                        " connection.NewConnection",
×
NEW
105
                        )
×
UNCOV
106
                }
×
NEW
107
                log.Errorf("connection manager failed to connect to %s%s: %s; "+
×
NEW
108
                        "len(token)=%d", serverUrl, connectUrl,
×
NEW
109
                        err.Error(), len(token))
×
NEW
110
                return err
×
111
        }
112

113
        a.resetBackoff()
2✔
114

2✔
115
        handlersByType[proto] = &ProtocolHandler{
2✔
116
                proto:      proto,
2✔
117
                connection: c,
2✔
118
                mutex:      &sync.Mutex{},
2✔
119
        }
2✔
120

2✔
121
        // There could be a pending cancelReconnectChan request from CancelReconnection unprocessed
2✔
122
        select {
2✔
123
        case <-cancelReconnectChan[proto]:
×
124
                log.Trace("connectionmanager drained cancelReconnectChan")
×
125
        default:
2✔
126
        }
127

128
        return nil
2✔
129
}
130

131
func Connect(
132
        proto ws.ProtoType,
133
        serverUrl, connectUrl, token string,
134
        ctx context.Context,
135
) error {
2✔
136
        handlersByTypeMutex.Lock()
2✔
137
        defer handlersByTypeMutex.Unlock()
2✔
138

2✔
139
        if _, exists := handlersByType[proto]; exists {
3✔
140
                return ErrHandlerAlreadyRegistered
1✔
141
        }
1✔
142

143
        return connect(proto, serverUrl, connectUrl, token, ctx)
1✔
144
}
145

146
func Reconnect(
147
        proto ws.ProtoType,
148
        serverUrl, connectUrl, token string,
149
        ctx context.Context,
150
) error {
1✔
151
        handlersByTypeMutex.Lock()
1✔
152
        defer handlersByTypeMutex.Unlock()
1✔
153

1✔
154
        if h, exists := handlersByType[proto]; exists {
2✔
155
                if h != nil && h.connection != nil {
2✔
156
                        h.connection.Close()
1✔
157
                }
1✔
158
        }
159

160
        delete(handlersByType, proto)
1✔
161
        return connect(proto, serverUrl, connectUrl, token, ctx)
1✔
162
}
163

164
func Read(proto ws.ProtoType) (*ws.ProtoMsg, error) {
1✔
165
        handlersByTypeMutex.Lock()
1✔
166
        h := handlersByType[proto]
1✔
167
        if h == nil {
1✔
168
                handlersByTypeMutex.Unlock()
×
169
                return nil, ErrHandlerNotRegistered
×
170
        }
×
171

172
        handlersByTypeMutex.Unlock()
1✔
173
        return h.connection.ReadMessage()
1✔
174
}
175

176
func Write(proto ws.ProtoType, m *ws.ProtoMsg) error {
2✔
177
        handlersByTypeMutex.Lock()
2✔
178
        h := handlersByType[proto]
2✔
179
        if h == nil {
3✔
180
                handlersByTypeMutex.Unlock()
1✔
181
                return ErrHandlerNotRegistered
1✔
182
        }
1✔
183

184
        handlersByTypeMutex.Unlock()
1✔
185
        h.mutex.Lock()
1✔
186
        defer h.mutex.Unlock()
1✔
187

1✔
188
        return h.connection.WriteMessage(m)
1✔
189
}
190

191
func IsReconnecting(proto ws.ProtoType) bool {
6✔
192
        reconnectingMutex.Lock()
6✔
193
        defer reconnectingMutex.Unlock()
6✔
194
        return reconnecting[proto]
6✔
195
}
6✔
196

197
func setReconnecting(proto ws.ProtoType, v bool) bool {
4✔
198
        reconnectingMutex.Lock()
4✔
199
        defer reconnectingMutex.Unlock()
4✔
200
        reconnecting[proto] = v
4✔
201
        return v
4✔
202
}
4✔
203

204
func CancelReconnection(proto ws.ProtoType) bool {
×
205
        maxWaitSeconds := 8
×
206
        go func() {
×
207
                cancelReconnectChan[proto] <- true
×
208
        }()
×
209
        for maxWaitSeconds > 0 {
×
210
                time.Sleep(time.Second)
×
211
                if !IsReconnecting(proto) {
×
212
                        break
×
213
                }
214
                maxWaitSeconds--
×
215
        }
216
        if IsReconnecting(proto) {
×
217
                log.Error("failed to cancel reconnection")
×
218
                return false
×
219
        }
×
220
        return true
×
221
}
222

223
func Close(proto ws.ProtoType) error {
6✔
224
        if IsReconnecting(proto) {
6✔
225
                if !CancelReconnection(proto) {
×
226
                        return errors.New("failed to cancel ongoing reconnection")
×
227
                }
×
228
        }
229

230
        handlersByTypeMutex.Lock()
6✔
231
        defer handlersByTypeMutex.Unlock()
6✔
232

6✔
233
        h := handlersByType[proto]
6✔
234
        if h == nil {
8✔
235
                return ErrHandlerNotRegistered
2✔
236
        }
2✔
237

238
        return h.connection.Close()
4✔
239
}
240

241
func getWebSocketScheme(scheme string) string {
6✔
242
        if scheme == httpsProtocol {
7✔
243
                scheme = wssProtocol
1✔
244
        } else if scheme == httpProtocol {
7✔
245
                scheme = wsProtocol
1✔
246
        }
1✔
247
        return scheme
6✔
248
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc