• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deviceconnect / 1031814854

10 Oct 2023 11:09AM UTC coverage: 77.923% (+1.6%) from 76.323%
1031814854

push

gitlab-ci

web-flow
Merge pull request #316 from mendersoftware/master

Sync staging with master

32 of 33 new or added lines in 2 files covered. (96.97%)

1 existing line in 1 file now uncovered.

1493 of 1916 relevant lines covered (77.92%)

36.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/api/http/device.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "context"
19
        "encoding/binary"
20
        "encoding/json"
21
        "net/http"
22
        "time"
23

24
        "github.com/gin-gonic/gin"
25
        "github.com/gorilla/websocket"
26
        "github.com/mendersoftware/go-lib-micro/identity"
27
        "github.com/mendersoftware/go-lib-micro/log"
28
        "github.com/mendersoftware/go-lib-micro/rest.utils"
29
        "github.com/mendersoftware/go-lib-micro/ws"
30
        "github.com/mendersoftware/go-lib-micro/ws/shell"
31
        natsio "github.com/nats-io/nats.go"
32
        "github.com/pkg/errors"
33
        "github.com/vmihailenco/msgpack/v5"
34

35
        "github.com/mendersoftware/deviceconnect/app"
36
        "github.com/mendersoftware/deviceconnect/client/nats"
37
        "github.com/mendersoftware/deviceconnect/model"
38
)
39

40
var (
41
        // Time allowed to read the next pong message from the peer.
42
        pongWait = time.Minute
43

44
        // Seconds allowed to write a message to the peer.
45
        writeWait = time.Second * 10
46
)
47

48
// HTTP errors
49
var (
50
        ErrMissingAuthentication = errors.New(
51
                "missing or non-device identity in the authorization headers",
52
        )
53
)
54

55
// DeviceController container for end-points
56
type DeviceController struct {
57
        app  app.App
58
        nats nats.Client
59
}
60

61
// NewDeviceController returns a new DeviceController
62
func NewDeviceController(
63
        app app.App,
64
        natsClient nats.Client,
65
) *DeviceController {
93✔
66
        return &DeviceController{
93✔
67
                app:  app,
93✔
68
                nats: natsClient,
93✔
69
        }
93✔
70
}
93✔
71

72
// Provision responds to POST /tenants/:tenantId/devices
73
func (h DeviceController) Provision(c *gin.Context) {
5✔
74
        tenantID := c.Param("tenantId")
5✔
75

5✔
76
        rawData, err := c.GetRawData()
5✔
77
        if err != nil {
5✔
78
                c.JSON(http.StatusBadRequest, gin.H{
×
79
                        "error": "bad request",
×
80
                })
×
81
                return
×
82
        }
×
83

84
        device := &model.Device{}
5✔
85
        if err = json.Unmarshal(rawData, device); err != nil {
7✔
86
                c.JSON(http.StatusBadRequest, gin.H{
2✔
87
                        "error": errors.Wrap(err, "invalid payload").Error(),
2✔
88
                })
2✔
89
                return
2✔
90
        } else if device.ID == "" {
6✔
91
                c.JSON(http.StatusBadRequest, gin.H{
1✔
92
                        "error": "device_id is empty",
1✔
93
                })
1✔
94
                return
1✔
95
        }
1✔
96

97
        ctx := c.Request.Context()
2✔
98
        if err = h.app.ProvisionDevice(ctx, tenantID, device); err != nil {
3✔
99
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
100
                        "error": errors.Wrap(err, "error provisioning the device").Error(),
1✔
101
                })
1✔
102
                return
1✔
103
        }
1✔
104

105
        c.Writer.WriteHeader(http.StatusCreated)
1✔
106
}
107

108
// Delete responds to DELETE /tenants/:tenantId/devices/:deviceId
109
func (h DeviceController) Delete(c *gin.Context) {
2✔
110
        tenantID := c.Param("tenantId")
2✔
111
        deviceID := c.Param("deviceId")
2✔
112

2✔
113
        ctx := c.Request.Context()
2✔
114
        if err := h.app.DeleteDevice(ctx, tenantID, deviceID); err != nil {
3✔
115
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
116
                        "error": errors.Wrap(err, "error deleting the device").Error(),
1✔
117
                })
1✔
118
                return
1✔
119
        }
1✔
120

121
        c.Writer.WriteHeader(http.StatusAccepted)
1✔
122
}
123

124
// Connect starts a websocket connection with the device
125
func (h DeviceController) Connect(c *gin.Context) {
5✔
126
        ctx := c.Request.Context()
5✔
127
        l := log.FromContext(ctx)
5✔
128

5✔
129
        idata := identity.FromContext(ctx)
5✔
130
        if !idata.IsDevice {
6✔
131
                c.JSON(http.StatusBadRequest, gin.H{
1✔
132
                        "error": ErrMissingAuthentication.Error(),
1✔
133
                })
1✔
134
                return
1✔
135
        }
1✔
136

137
        msgChan := make(chan *natsio.Msg, channelSize)
4✔
138
        sub, err := h.nats.ChanSubscribe(
4✔
139
                model.GetDeviceSubject(idata.Tenant, idata.Subject),
4✔
140
                msgChan,
4✔
141
        )
4✔
142
        if err != nil {
5✔
143
                l.Error(err)
1✔
144
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
145
                        "error": "failed to allocate internal device channel",
1✔
146
                })
1✔
147
                return
1✔
148
        }
1✔
149
        //nolint:errcheck
150
        defer sub.Unsubscribe()
3✔
151

3✔
152
        upgrader := websocket.Upgrader{
3✔
153
                Subprotocols: []string{"protomsg/msgpack"},
3✔
154
                CheckOrigin: func(r *http.Request) bool {
5✔
155
                        return true
2✔
156
                },
2✔
157
                Error: func(
158
                        w http.ResponseWriter, r *http.Request, s int, e error) {
1✔
159
                        rest.RenderError(c, s, e)
1✔
160
                },
1✔
161
        }
162

163
        errChan := make(chan error)
3✔
164
        defer close(errChan)
3✔
165

3✔
166
        // upgrade get request to websocket protocol
3✔
167
        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
3✔
168
        if err != nil {
4✔
169
                err = errors.Wrap(err,
1✔
170
                        "failed to upgrade the request to "+
1✔
171
                                "websocket protocol",
1✔
172
                )
1✔
173
                l.Error(err)
1✔
174
                return
1✔
175
        }
1✔
176
        conn.SetReadLimit(int64(app.MessageSizeLimit))
2✔
177

2✔
178
        // register the websocket for graceful shutdown
2✔
179
        ctxWithCancel, cancel := context.WithCancel(ctx)
2✔
180
        registerID := h.app.RegisterShutdownCancel(cancel)
2✔
181
        defer h.app.UnregisterShutdownCancel(registerID)
2✔
182

2✔
183
        // websocketWriter is responsible for closing the websocket
2✔
184
        //nolint:errcheck
2✔
185
        go h.connectWSWriter(ctxWithCancel, conn, msgChan, errChan)
2✔
186
        err = h.ConnectServeWS(ctxWithCancel, conn)
2✔
187
        if err != nil {
3✔
188
                select {
1✔
189
                case errChan <- err:
1✔
190

191
                case <-time.After(time.Second):
×
192
                        l.Warn("Failed to propagate error to client")
×
193
                }
194
        }
195
}
196

197
// websocketWriter is the go-routine responsible for the writing end of the
198
// websocket. The routine forwards messages posted on the NATS session subject
199
// and periodically pings the connection. If the connection times out or a
200
// protocol violation occurs, the routine closes the connection.
201
func (h DeviceController) connectWSWriter(
202
        ctx context.Context,
203
        conn *websocket.Conn,
204
        msgChan <-chan *natsio.Msg,
205
        errChan <-chan error,
206
) (err error) {
2✔
207
        l := log.FromContext(ctx)
2✔
208
        defer func() {
4✔
209
                if err != nil {
3✔
210
                        if !websocket.IsUnexpectedCloseError(err) {
2✔
211
                                // If the peer didn't send a close message we must.
1✔
212
                                errMsg := err.Error()
1✔
213
                                errBody := make([]byte, len(errMsg)+2)
1✔
214
                                binary.BigEndian.PutUint16(errBody,
1✔
215
                                        websocket.CloseInternalServerErr,
1✔
216
                                )
1✔
217
                                copy(errBody[2:], errMsg)
1✔
218
                                errClose := conn.WriteControl(
1✔
219
                                        websocket.CloseMessage,
1✔
220
                                        errBody,
1✔
221
                                        time.Now().Add(writeWait),
1✔
222
                                )
1✔
223
                                if errClose != nil {
1✔
224
                                        err = errors.Wrapf(err,
×
225
                                                "error sending websocket close frame: %s",
×
226
                                                errClose.Error(),
×
227
                                        )
×
228
                                }
×
229
                        }
230
                        l.Errorf("websocket closed with error: %s", err.Error())
1✔
231
                }
232
                conn.Close()
2✔
233
        }()
234

235
        // handle the ping-pong connection health check
236
        err = conn.SetReadDeadline(time.Now().Add(pongWait))
2✔
237
        if err != nil {
2✔
238
                l.Error(err)
×
239
                return err
×
240
        }
×
241

242
        pingPeriod := (pongWait * 9) / 10
2✔
243
        ticker := time.NewTicker(pingPeriod)
2✔
244
        defer ticker.Stop()
2✔
245
        conn.SetPongHandler(func(string) error {
3✔
246
                ticker.Reset(pingPeriod)
1✔
247
                return conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
248
        })
1✔
249
        conn.SetPingHandler(func(msg string) error {
3✔
250
                ticker.Reset(pingPeriod)
1✔
251
                err := conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
252
                if err != nil {
1✔
253
                        return err
×
254
                }
×
255
                return conn.WriteControl(
1✔
256
                        websocket.PongMessage,
1✔
257
                        []byte(msg),
1✔
258
                        time.Now().Add(writeWait),
1✔
259
                )
1✔
260
        })
261
Loop:
2✔
262
        for {
6✔
263
                select {
4✔
264
                case msg := <-msgChan:
1✔
265
                        err = conn.WriteMessage(websocket.BinaryMessage, msg.Data)
1✔
266
                        if err != nil {
1✔
267
                                l.Error(err)
×
268
                                break Loop
×
269
                        }
270
                case <-ctx.Done():
×
NEW
271
                        err = errors.New("connection closed by the server")
×
UNCOV
272
                        break Loop
×
273
                case <-ticker.C:
1✔
274
                        if !websocketPing(conn) {
1✔
275
                                err = errors.New("connection timeout")
×
276
                                break Loop
×
277
                        }
278
                case err := <-errChan:
2✔
279
                        return err
2✔
280
                }
281
        }
282
        return err
×
283
}
284

285
func (h DeviceController) ConnectServeWS(
286
        ctx context.Context,
287
        conn *websocket.Conn,
288
) (err error) {
2✔
289
        l := log.FromContext(ctx)
2✔
290
        id := identity.FromContext(ctx)
2✔
291
        sessMap := make(map[string]*model.ActiveSession)
2✔
292

2✔
293
        // update the device status on websocket opening
2✔
294
        err = h.app.UpdateDeviceStatus(
2✔
295
                ctx, id.Tenant,
2✔
296
                id.Subject, model.DeviceStatusConnected,
2✔
297
        )
2✔
298
        if err != nil {
2✔
299
                l.Error(err)
×
300
                return
×
301
        }
×
302
        defer func() {
4✔
303
                for sessionID, session := range sessMap {
4✔
304
                        // TODO: notify the session NATS topic about the session
2✔
305
                        //       being released.
2✔
306
                        if session.RemoteTerminal {
3✔
307
                                msg := ws.ProtoMsg{
1✔
308
                                        Header: ws.ProtoHdr{
1✔
309
                                                Proto:     ws.ProtoTypeShell,
1✔
310
                                                MsgType:   shell.MessageTypeStopShell,
1✔
311
                                                SessionID: sessionID,
1✔
312
                                                Properties: map[string]interface{}{
1✔
313
                                                        "status": shell.ErrorMessage,
1✔
314
                                                },
1✔
315
                                        },
1✔
316
                                        Body: []byte("device disconnected"),
1✔
317
                                }
1✔
318
                                data, _ := msgpack.Marshal(msg)
1✔
319
                                err = h.nats.Publish(
1✔
320
                                        model.GetSessionSubject(id.Tenant, sessionID),
1✔
321
                                        data,
1✔
322
                                )
1✔
323
                        }
1✔
324
                }
325
                // update the device status on websocket closing
326
                eStatus := h.app.UpdateDeviceStatus(
2✔
327
                        ctx, id.Tenant,
2✔
328
                        id.Subject, model.DeviceStatusDisconnected,
2✔
329
                )
2✔
330
                if eStatus != nil {
2✔
331
                        l.Error(eStatus)
×
332
                }
×
333
        }()
334

335
        var data []byte
2✔
336
        for {
8✔
337
                _, data, err = conn.ReadMessage()
6✔
338
                if err != nil {
6✔
339
                        return err
×
340
                }
×
341
                m := &ws.ProtoMsg{}
6✔
342
                err = msgpack.Unmarshal(data, m)
6✔
343
                if err != nil {
7✔
344
                        return err
1✔
345
                }
1✔
346

347
                sessMap[m.Header.SessionID] = &model.ActiveSession{}
5✔
348
                switch m.Header.Proto {
5✔
349
                case ws.ProtoTypeShell:
5✔
350
                        if m.Header.SessionID == "" {
6✔
351
                                return errors.New("api: message missing required session ID")
1✔
352
                        } else if m.Header.MsgType == shell.MessageTypeSpawnShell {
7✔
353
                                if session, ok := sessMap[m.Header.SessionID]; ok {
4✔
354
                                        session.RemoteTerminal = true
2✔
355
                                }
2✔
356
                        } else if m.Header.MsgType == shell.MessageTypeStopShell {
3✔
357
                                delete(sessMap, m.Header.SessionID)
1✔
358
                        }
1✔
359
                default:
×
360
                        // TODO: Handle protocol violation
361
                }
362

363
                err = h.nats.Publish(
4✔
364
                        model.GetSessionSubject(id.Tenant, m.Header.SessionID),
4✔
365
                        data,
4✔
366
                )
4✔
367
                if err != nil {
4✔
368
                        return err
×
369
                }
×
370
        }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc