• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1495380963

14 Oct 2024 03:35PM UTC coverage: 70.373% (-2.5%) from 72.904%
1495380963

Pull #101

gitlab-ci

mineralsfree
feat: tenant list added

Ticket: MEN-7568
Changelog: None

Signed-off-by: Mikita Pilinka <mikita.pilinka@northern.tech>
Pull Request #101: feat: tenant list added

4406 of 6391 branches covered (68.94%)

Branch coverage included in aggregate %.

88 of 183 new or added lines in 10 files covered. (48.09%)

2623 existing lines in 65 files now uncovered.

36673 of 51982 relevant lines covered (70.55%)

31.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.89
/backend/services/deviceconnect/api/http/device.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "context"
19
        "encoding/binary"
20
        "encoding/json"
21
        "net/http"
22
        "time"
23

24
        "github.com/gin-gonic/gin"
25
        "github.com/gorilla/websocket"
26
        natsio "github.com/nats-io/nats.go"
27
        "github.com/pkg/errors"
28
        "github.com/vmihailenco/msgpack/v5"
29

30
        "github.com/mendersoftware/mender-server/pkg/identity"
31
        "github.com/mendersoftware/mender-server/pkg/log"
32
        "github.com/mendersoftware/mender-server/pkg/rest.utils"
33
        "github.com/mendersoftware/mender-server/pkg/ws"
34
        "github.com/mendersoftware/mender-server/pkg/ws/shell"
35

36
        "github.com/mendersoftware/mender-server/services/deviceconnect/app"
37
        "github.com/mendersoftware/mender-server/services/deviceconnect/client/nats"
38
        "github.com/mendersoftware/mender-server/services/deviceconnect/model"
39
)
40

41
var (
42
        // Time allowed to read the next pong message from the peer.
43
        pongWait = time.Minute
44

45
        // Seconds allowed to write a message to the peer.
46
        writeWait = time.Second * 10
47
)
48

49
// HTTP errors
50
var (
51
        ErrMissingAuthentication = errors.New(
52
                "missing or non-device identity in the authorization headers",
53
        )
54
)
55

56
// DeviceController container for end-points
57
type DeviceController struct {
58
        app  app.App
59
        nats nats.Client
60
}
61

62
// NewDeviceController returns a new DeviceController
63
func NewDeviceController(
64
        app app.App,
65
        natsClient nats.Client,
66
) *DeviceController {
1✔
67
        return &DeviceController{
1✔
68
                app:  app,
1✔
69
                nats: natsClient,
1✔
70
        }
1✔
71
}
1✔
72

73
// Provision responds to POST /tenants/:tenantId/devices
74
func (h DeviceController) Provision(c *gin.Context) {
1✔
75
        tenantID := c.Param("tenantId")
1✔
76

1✔
77
        rawData, err := c.GetRawData()
1✔
78
        if err != nil {
1✔
79
                c.JSON(http.StatusBadRequest, gin.H{
×
80
                        "error": "bad request",
×
81
                })
×
82
                return
×
83
        }
×
84

85
        device := &model.Device{}
1✔
86
        if err = json.Unmarshal(rawData, device); err != nil {
2✔
87
                c.JSON(http.StatusBadRequest, gin.H{
1✔
88
                        "error": errors.Wrap(err, "invalid payload").Error(),
1✔
89
                })
1✔
90
                return
1✔
91
        } else if device.ID == "" {
3✔
92
                c.JSON(http.StatusBadRequest, gin.H{
1✔
93
                        "error": "device_id is empty",
1✔
94
                })
1✔
95
                return
1✔
96
        }
1✔
97

98
        ctx := c.Request.Context()
1✔
99
        if err = h.app.ProvisionDevice(ctx, tenantID, device); err != nil {
2✔
100
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
101
                        "error": errors.Wrap(err, "error provisioning the device").Error(),
1✔
102
                })
1✔
103
                return
1✔
104
        }
1✔
105

106
        c.Writer.WriteHeader(http.StatusCreated)
1✔
107
}
108

109
// Delete responds to DELETE /tenants/:tenantId/devices/:deviceId
110
func (h DeviceController) Delete(c *gin.Context) {
1✔
111
        tenantID := c.Param("tenantId")
1✔
112
        deviceID := c.Param("deviceId")
1✔
113

1✔
114
        ctx := c.Request.Context()
1✔
115
        if err := h.app.DeleteDevice(ctx, tenantID, deviceID); err != nil {
2✔
116
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
117
                        "error": errors.Wrap(err, "error deleting the device").Error(),
1✔
118
                })
1✔
119
                return
1✔
120
        }
1✔
121

122
        c.Writer.WriteHeader(http.StatusAccepted)
1✔
123
}
124

125
// Connect starts a websocket connection with the device
126
func (h DeviceController) Connect(c *gin.Context) {
1✔
127
        ctx := c.Request.Context()
1✔
128
        l := log.FromContext(ctx)
1✔
129

1✔
130
        idata := identity.FromContext(ctx)
1✔
131
        if !idata.IsDevice {
2✔
132
                c.JSON(http.StatusBadRequest, gin.H{
1✔
133
                        "error": ErrMissingAuthentication.Error(),
1✔
134
                })
1✔
135
                return
1✔
136
        }
1✔
137

138
        msgChan := make(chan *natsio.Msg, channelSize)
1✔
139
        sub, err := h.nats.ChanSubscribe(
1✔
140
                model.GetDeviceSubject(idata.Tenant, idata.Subject),
1✔
141
                msgChan,
1✔
142
        )
1✔
143
        if err != nil {
2✔
144
                l.Error(err)
1✔
145
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
146
                        "error": "failed to allocate internal device channel",
1✔
147
                })
1✔
148
                return
1✔
149
        }
1✔
150
        //nolint:errcheck
151
        defer sub.Unsubscribe()
1✔
152

1✔
153
        upgrader := websocket.Upgrader{
1✔
154
                Subprotocols: []string{"protomsg/msgpack"},
1✔
155
                CheckOrigin: func(r *http.Request) bool {
2✔
156
                        return true
1✔
157
                },
1✔
158
                Error: func(
159
                        w http.ResponseWriter, r *http.Request, s int, e error) {
1✔
160
                        rest.RenderError(c, s, e)
1✔
161
                },
1✔
162
        }
163

164
        errChan := make(chan error)
1✔
165
        defer close(errChan)
1✔
166

1✔
167
        // upgrade get request to websocket protocol
1✔
168
        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
1✔
169
        if err != nil {
2✔
170
                err = errors.Wrap(err,
1✔
171
                        "failed to upgrade the request to "+
1✔
172
                                "websocket protocol",
1✔
173
                )
1✔
174
                l.Error(err)
1✔
175
                return
1✔
176
        }
1✔
177
        conn.SetReadLimit(int64(app.MessageSizeLimit))
1✔
178

1✔
179
        // register the websocket for graceful shutdown
1✔
180
        ctxWithCancel, cancel := context.WithCancel(ctx)
1✔
181
        registerID := h.app.RegisterShutdownCancel(cancel)
1✔
182
        defer h.app.UnregisterShutdownCancel(registerID)
1✔
183

1✔
184
        // websocketWriter is responsible for closing the websocket
1✔
185
        //nolint:errcheck
1✔
186
        go h.connectWSWriter(ctxWithCancel, conn, msgChan, errChan)
1✔
187
        err = h.ConnectServeWS(ctxWithCancel, conn)
1✔
188
        if err != nil {
2✔
189
                select {
1✔
190
                case errChan <- err:
1✔
191

192
                case <-time.After(time.Second):
×
193
                        l.Warn("Failed to propagate error to client")
×
194
                }
195
        }
196
}
197

198
// websocketWriter is the go-routine responsible for the writing end of the
199
// websocket. The routine forwards messages posted on the NATS session subject
200
// and periodically pings the connection. If the connection times out or a
201
// protocol violation occurs, the routine closes the connection.
202
func (h DeviceController) connectWSWriter(
203
        ctx context.Context,
204
        conn *websocket.Conn,
205
        msgChan <-chan *natsio.Msg,
206
        errChan <-chan error,
207
) (err error) {
1✔
208
        l := log.FromContext(ctx)
1✔
209
        defer func() {
2✔
210
                if err != nil {
2✔
211
                        if !websocket.IsUnexpectedCloseError(err) {
2✔
212
                                // If the peer didn't send a close message we must.
1✔
213
                                errMsg := err.Error()
1✔
214
                                errBody := make([]byte, len(errMsg)+2)
1✔
215
                                binary.BigEndian.PutUint16(errBody,
1✔
216
                                        websocket.CloseInternalServerErr,
1✔
217
                                )
1✔
218
                                copy(errBody[2:], errMsg)
1✔
219
                                errClose := conn.WriteControl(
1✔
220
                                        websocket.CloseMessage,
1✔
221
                                        errBody,
1✔
222
                                        time.Now().Add(writeWait),
1✔
223
                                )
1✔
224
                                if errClose != nil {
1✔
225
                                        err = errors.Wrapf(err,
×
226
                                                "error sending websocket close frame: %s",
×
227
                                                errClose.Error(),
×
228
                                        )
×
229
                                }
×
230
                        }
231
                        l.Errorf("websocket closed with error: %s", err.Error())
1✔
232
                }
233
                conn.Close()
1✔
234
        }()
235

236
        // handle the ping-pong connection health check
237
        err = conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
238
        if err != nil {
1✔
239
                l.Error(err)
×
240
                return err
×
241
        }
×
242

243
        pingPeriod := (pongWait * 9) / 10
1✔
244
        ticker := time.NewTicker(pingPeriod)
1✔
245
        defer ticker.Stop()
1✔
246
        conn.SetPongHandler(func(string) error {
2✔
247
                ticker.Reset(pingPeriod)
1✔
248
                return conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
249
        })
1✔
250
        conn.SetPingHandler(func(msg string) error {
2✔
251
                ticker.Reset(pingPeriod)
1✔
252
                err := conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
253
                if err != nil {
1✔
254
                        return err
×
255
                }
×
256
                return conn.WriteControl(
1✔
257
                        websocket.PongMessage,
1✔
258
                        []byte(msg),
1✔
259
                        time.Now().Add(writeWait),
1✔
260
                )
1✔
261
        })
262
Loop:
1✔
263
        for {
2✔
264
                select {
1✔
265
                case msg := <-msgChan:
1✔
266
                        err = conn.WriteMessage(websocket.BinaryMessage, msg.Data)
1✔
267
                        if err != nil {
1✔
268
                                l.Error(err)
×
269
                                break Loop
×
270
                        }
271
                case <-ctx.Done():
×
272
                        err = errors.New("connection closed by the server")
×
273
                        break Loop
×
274
                case <-ticker.C:
1✔
275
                        if !websocketPing(conn) {
1✔
276
                                err = errors.New("connection timeout")
×
277
                                break Loop
×
278
                        }
279
                case err := <-errChan:
1✔
280
                        return err
1✔
281
                }
282
        }
283
        return err
×
284
}
285

286
func (h DeviceController) ConnectServeWS(
287
        ctx context.Context,
288
        conn *websocket.Conn,
289
) (err error) {
1✔
290
        l := log.FromContext(ctx)
1✔
291
        id := identity.FromContext(ctx)
1✔
292
        sessMap := make(map[string]*model.ActiveSession)
1✔
293

1✔
294
        // update the device status on websocket opening
1✔
295
        var version int64
1✔
296
        version, err = h.app.SetDeviceConnected(ctx, id.Tenant, id.Subject)
1✔
297
        if err != nil {
1✔
298
                l.Error(err)
×
299
                return
×
300
        }
×
301
        defer func() {
2✔
302
                for sessionID, session := range sessMap {
2✔
303
                        // TODO: notify the session NATS topic about the session
1✔
304
                        //       being released.
1✔
305
                        if session.RemoteTerminal {
2✔
306
                                msg := ws.ProtoMsg{
1✔
307
                                        Header: ws.ProtoHdr{
1✔
308
                                                Proto:     ws.ProtoTypeShell,
1✔
309
                                                MsgType:   shell.MessageTypeStopShell,
1✔
310
                                                SessionID: sessionID,
1✔
311
                                                Properties: map[string]interface{}{
1✔
312
                                                        "status": shell.ErrorMessage,
1✔
313
                                                },
1✔
314
                                        },
1✔
315
                                        Body: []byte("device disconnected"),
1✔
316
                                }
1✔
317
                                data, _ := msgpack.Marshal(msg)
1✔
318
                                err = h.nats.Publish(
1✔
319
                                        model.GetSessionSubject(id.Tenant, sessionID),
1✔
320
                                        data,
1✔
321
                                )
1✔
322
                        }
1✔
323
                }
324
                // update the device status on websocket closing
325
                eStatus := h.app.SetDeviceDisconnected(
1✔
326
                        ctx, id.Tenant,
1✔
327
                        id.Subject, version,
1✔
328
                )
1✔
329
                if eStatus != nil {
1✔
330
                        l.Error(eStatus)
×
331
                }
×
332
        }()
333

334
        var data []byte
1✔
335
        for {
2✔
336
                _, data, err = conn.ReadMessage()
1✔
337
                if err != nil {
1✔
UNCOV
338
                        return err
×
UNCOV
339
                }
×
340
                m := &ws.ProtoMsg{}
1✔
341
                err = msgpack.Unmarshal(data, m)
1✔
342
                if err != nil {
2✔
343
                        return err
1✔
344
                }
1✔
345

346
                sessMap[m.Header.SessionID] = &model.ActiveSession{}
1✔
347
                switch m.Header.Proto {
1✔
348
                case ws.ProtoTypeShell:
1✔
349
                        if m.Header.SessionID == "" {
2✔
350
                                return errors.New("api: message missing required session ID")
1✔
351
                        } else if m.Header.MsgType == shell.MessageTypeSpawnShell {
3✔
352
                                if session, ok := sessMap[m.Header.SessionID]; ok {
2✔
353
                                        session.RemoteTerminal = true
1✔
354
                                }
1✔
355
                        } else if m.Header.MsgType == shell.MessageTypeStopShell {
2✔
356
                                delete(sessMap, m.Header.SessionID)
1✔
357
                        }
1✔
358
                default:
×
359
                        // TODO: Handle protocol violation
360
                }
361

362
                err = h.nats.Publish(
1✔
363
                        model.GetSessionSubject(id.Tenant, m.Header.SessionID),
1✔
364
                        data,
1✔
365
                )
1✔
366
                if err != nil {
1✔
367
                        return err
×
368
                }
×
369
        }
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc