• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-connect / 1592022205

16 Dec 2024 02:11PM UTC coverage: 77.911% (+0.5%) from 77.427%
1592022205

push

gitlab-ci

web-flow
Merge pull request #145 from alfrunes/QA-764

refac: Migrate from gorilla/websocket to coder/websocket

19 of 28 new or added lines in 2 files covered. (67.86%)

4 existing lines in 1 file now uncovered.

2476 of 3178 relevant lines covered (77.91%)

6.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.97
/connection/connection.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package connection
16

17
import (
18
        "context"
19
        "fmt"
20
        "net/http"
21
        "net/url"
22
        "sync"
23
        "time"
24

25
        "github.com/coder/websocket"
26
        log "github.com/sirupsen/logrus"
27
        "github.com/vmihailenco/msgpack/v5"
28

29
        "github.com/mendersoftware/go-lib-micro/ws"
30
)
31

32
type Connection struct {
33
        writeMutex sync.Mutex
34
        // the connection handler
35
        connection *websocket.Conn
36
        // Time allowed to write a message to the peer.
37
        writeWait time.Duration
38
        // Maximum message size allowed from peer.
39
        maxMessageSize int64
40
        // Time allowed to read the next pong message from the peer.
41
        defaultPingWait time.Duration
42
        // Channel to stop the go routines
43
        done chan bool
44
}
45

46
// Websocket connection routine. setup the ping-pong and connection settings
47
func NewConnection(
48
        ctx context.Context,
49
        u url.URL,
50
        token string,
51
        writeWait time.Duration,
52
        maxMessageSize int64,
53
        defaultPingWait time.Duration) (*Connection, error) {
4✔
54

4✔
55
        var ws *websocket.Conn
4✔
56

4✔
57
        headers := http.Header{}
4✔
58
        headers.Set("Authorization", "Bearer "+token)
4✔
59
        // nolint:bodyclose
4✔
60
        ws, _, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{
4✔
61
                HTTPHeader: headers,
4✔
62
        })
4✔
63
        if err != nil {
4✔
NEW
64
                return nil, fmt.Errorf("failed to establish websocket connection: %s", err.Error())
×
65
        }
×
66

67
        c := &Connection{
4✔
68
                connection:      ws,
4✔
69
                writeWait:       writeWait,
4✔
70
                maxMessageSize:  maxMessageSize,
4✔
71
                defaultPingWait: defaultPingWait,
4✔
72
                done:            make(chan bool),
4✔
73
        }
4✔
74
        ws.SetReadLimit(maxMessageSize)
4✔
75

4✔
76
        go c.pingPongHandler()
4✔
77

4✔
78
        return c, nil
4✔
79
}
80

81
func (c *Connection) pingPongHandler() {
4✔
82
        const pingPeriod = time.Hour
4✔
83
        rootCtx := context.Background()
4✔
84
        ticker := time.NewTicker(pingPeriod)
4✔
85
        defer ticker.Stop()
4✔
86

4✔
87
Loop:
4✔
88
        for {
8✔
89
                select {
4✔
90
                case <-c.done:
1✔
91
                        break Loop
1✔
92
                case <-ticker.C:
×
93
                        log.Debug("ping message")
×
NEW
94
                        ctx, cancel := context.WithTimeout(rootCtx, time.Second*30)
×
NEW
95
                        err := c.connection.Ping(ctx)
×
NEW
96
                        cancel()
×
NEW
97
                        if err != nil {
×
NEW
98
                                log.Errorf("failed to ping server: %s", err.Error())
×
NEW
99
                                log.Warn("terminating connection")
×
NEW
100
                                _ = c.Close()
×
NEW
101
                                break Loop
×
102
                        }
103
                }
104
        }
105
}
106

107
func (c *Connection) GetWriteTimeout() time.Duration {
1✔
108
        return c.writeWait
1✔
109
}
1✔
110

111
func (c *Connection) WriteMessage(ctx context.Context, m *ws.ProtoMsg) (err error) {
17✔
112
        data, err := msgpack.Marshal(m)
17✔
113
        if err != nil {
17✔
114
                return err
×
115
        }
×
116
        c.writeMutex.Lock()
17✔
117
        defer c.writeMutex.Unlock()
17✔
118
        return c.connection.Write(ctx, websocket.MessageBinary, data)
17✔
119
}
120

121
func (c *Connection) ReadMessage(ctx context.Context) (*ws.ProtoMsg, error) {
2✔
122
        _, data, err := c.connection.Read(ctx)
2✔
123
        if err != nil {
2✔
124
                return nil, err
×
125
        }
×
126

127
        m := &ws.ProtoMsg{}
2✔
128
        err = msgpack.Unmarshal(data, m)
2✔
129
        if err != nil {
2✔
130
                return nil, err
×
131
        }
×
132
        return m, nil
2✔
133
}
134

135
func (c *Connection) Close() error {
1✔
136
        select {
1✔
137
        case c.done <- true:
1✔
138
        default:
×
139
        }
140
        return c.connection.Close(websocket.StatusNormalClosure, "disconnecting")
1✔
141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc