• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1703021269

06 Mar 2025 01:06PM UTC coverage: 65.49% (+0.009%) from 65.481%
1703021269

Pull #506

gitlab-ci

merlin-northern
fix(iot-manager): do not store events if there are no integrations

Changelog: Title
Ticket: MEN-7868
Signed-off-by: Peter Grzybowski <peter@northern.tech>
Pull Request #506: fix(iot-manager): do not store events if there are no integrations.

9 of 9 new or added lines in 1 file covered. (100.0%)

130 existing lines in 4 files now uncovered.

31676 of 48368 relevant lines covered (65.49%)

1.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.69
/backend/services/deviceconnect/api/http/device.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "context"
19
        "encoding/binary"
20
        "encoding/json"
21
        "net/http"
22
        "time"
23

24
        "github.com/gin-gonic/gin"
25
        "github.com/gorilla/websocket"
26
        natsio "github.com/nats-io/nats.go"
27
        "github.com/pkg/errors"
28
        "github.com/vmihailenco/msgpack/v5"
29

30
        "github.com/mendersoftware/mender-server/pkg/identity"
31
        "github.com/mendersoftware/mender-server/pkg/log"
32
        "github.com/mendersoftware/mender-server/pkg/rest.utils"
33
        "github.com/mendersoftware/mender-server/pkg/ws"
34
        "github.com/mendersoftware/mender-server/pkg/ws/shell"
35

36
        "github.com/mendersoftware/mender-server/services/deviceconnect/app"
37
        "github.com/mendersoftware/mender-server/services/deviceconnect/client/nats"
38
        "github.com/mendersoftware/mender-server/services/deviceconnect/model"
39
)
40

41
var (
42
        // Time allowed to read the next pong message from the peer.
43
        pongWait = time.Minute
44

45
        // Seconds allowed to write a message to the peer.
46
        writeWait = time.Second * 10
47
)
48

49
// HTTP errors
50
var (
51
        ErrMissingAuthentication = errors.New(
52
                "missing or non-device identity in the authorization headers",
53
        )
54
)
55

56
// DeviceController container for end-points
57
type DeviceController struct {
58
        app  app.App
59
        nats nats.Client
60
}
61

62
// NewDeviceController returns a new DeviceController
63
func NewDeviceController(
64
        app app.App,
65
        natsClient nats.Client,
66
) *DeviceController {
3✔
67
        return &DeviceController{
3✔
68
                app:  app,
3✔
69
                nats: natsClient,
3✔
70
        }
3✔
71
}
3✔
72

73
// Provision responds to POST /tenants/:tenantId/devices
74
func (h DeviceController) Provision(c *gin.Context) {
3✔
75
        tenantID := c.Param("tenantId")
3✔
76

3✔
77
        rawData, err := c.GetRawData()
3✔
78
        if err != nil {
3✔
79
                c.JSON(http.StatusBadRequest, gin.H{
×
80
                        "error": "bad request",
×
UNCOV
81
                })
×
UNCOV
82
                return
×
UNCOV
83
        }
×
84

85
        device := &model.Device{}
3✔
86
        if err = json.Unmarshal(rawData, device); err != nil {
5✔
87
                c.JSON(http.StatusBadRequest, gin.H{
2✔
88
                        "error": errors.Wrap(err, "invalid payload").Error(),
2✔
89
                })
2✔
90
                return
2✔
91
        } else if device.ID == "" {
5✔
92
                c.JSON(http.StatusBadRequest, gin.H{
1✔
93
                        "error": "device_id is empty",
1✔
94
                })
1✔
95
                return
1✔
96
        }
1✔
97

98
        ctx := c.Request.Context()
2✔
99
        if err = h.app.ProvisionDevice(ctx, tenantID, device); err != nil {
3✔
100
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
101
                        "error": errors.Wrap(err, "error provisioning the device").Error(),
1✔
102
                })
1✔
103
                return
1✔
104
        }
1✔
105

106
        c.Writer.WriteHeader(http.StatusCreated)
2✔
107
}
108

109
// Delete responds to DELETE /tenants/:tenantId/devices/:deviceId
110
func (h DeviceController) Delete(c *gin.Context) {
2✔
111
        tenantID := c.Param("tenantId")
2✔
112
        deviceID := c.Param("deviceId")
2✔
113

2✔
114
        ctx := c.Request.Context()
2✔
115
        if err := h.app.DeleteDevice(ctx, tenantID, deviceID); err != nil {
3✔
116
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
117
                        "error": errors.Wrap(err, "error deleting the device").Error(),
1✔
118
                })
1✔
119
                return
1✔
120
        }
1✔
121

122
        c.Writer.WriteHeader(http.StatusAccepted)
2✔
123
}
124

125
// Connect starts a websocket connection with the device
126
func (h DeviceController) Connect(c *gin.Context) {
2✔
127
        ctx := c.Request.Context()
2✔
128
        l := log.FromContext(ctx)
2✔
129

2✔
130
        idata := identity.FromContext(ctx)
2✔
131
        if !idata.IsDevice {
3✔
132
                c.JSON(http.StatusBadRequest, gin.H{
1✔
133
                        "error": ErrMissingAuthentication.Error(),
1✔
134
                })
1✔
135
                return
1✔
136
        }
1✔
137

138
        msgChan := make(chan *natsio.Msg, channelSize)
2✔
139
        sub, err := h.nats.ChanSubscribe(
2✔
140
                model.GetDeviceSubject(idata.Tenant, idata.Subject),
2✔
141
                msgChan,
2✔
142
        )
2✔
143
        if err != nil {
3✔
144
                l.Error(err)
1✔
145
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
146
                        "error": "failed to allocate internal device channel",
1✔
147
                })
1✔
148
                return
1✔
149
        }
1✔
150
        //nolint:errcheck
151
        defer sub.Unsubscribe()
2✔
152

2✔
153
        upgrader := websocket.Upgrader{
2✔
154
                Subprotocols: []string{"protomsg/msgpack"},
2✔
155
                CheckOrigin: func(r *http.Request) bool {
4✔
156
                        return true
2✔
157
                },
2✔
158
                Error: func(
159
                        w http.ResponseWriter, r *http.Request, s int, e error) {
1✔
160
                        rest.RenderError(c, s, e)
1✔
161
                },
1✔
162
        }
163

164
        errChan := make(chan error)
2✔
165
        defer close(errChan)
2✔
166

2✔
167
        // upgrade get request to websocket protocol
2✔
168
        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
2✔
169
        if err != nil {
3✔
170
                err = errors.Wrap(err,
1✔
171
                        "failed to upgrade the request to "+
1✔
172
                                "websocket protocol",
1✔
173
                )
1✔
174
                l.Error(err)
1✔
175
                return
1✔
176
        }
1✔
177
        conn.SetReadLimit(int64(app.MessageSizeLimit))
2✔
178

2✔
179
        // register the websocket for graceful shutdown
2✔
180
        ctxWithCancel, cancel := context.WithCancel(ctx)
2✔
181
        registerID := h.app.RegisterShutdownCancel(cancel)
2✔
182
        defer h.app.UnregisterShutdownCancel(registerID)
2✔
183

2✔
184
        // websocketWriter is responsible for closing the websocket
2✔
185
        //nolint:errcheck
2✔
186
        go h.connectWSWriter(ctxWithCancel, conn, msgChan, errChan)
2✔
187
        err = h.ConnectServeWS(ctxWithCancel, conn)
2✔
188
        if err != nil {
4✔
189
                select {
2✔
190
                case errChan <- err:
2✔
191

UNCOV
192
                case <-time.After(time.Second):
×
UNCOV
193
                        l.Warn("Failed to propagate error to client")
×
194
                }
195
        }
196
}
197

198
// websocketWriter is the go-routine responsible for the writing end of the
199
// websocket. The routine forwards messages posted on the NATS session subject
200
// and periodically pings the connection. If the connection times out or a
201
// protocol violation occurs, the routine closes the connection.
202
func (h DeviceController) connectWSWriter(
203
        ctx context.Context,
204
        conn *websocket.Conn,
205
        msgChan <-chan *natsio.Msg,
206
        errChan <-chan error,
207
) (err error) {
2✔
208
        l := log.FromContext(ctx)
2✔
209
        defer func() {
4✔
210
                if err != nil {
4✔
211
                        if !websocket.IsUnexpectedCloseError(err) {
3✔
212
                                // If the peer didn't send a close message we must.
1✔
213
                                errMsg := err.Error()
1✔
214
                                errBody := make([]byte, len(errMsg)+2)
1✔
215
                                binary.BigEndian.PutUint16(errBody,
1✔
216
                                        websocket.CloseInternalServerErr,
1✔
217
                                )
1✔
218
                                copy(errBody[2:], errMsg)
1✔
219
                                errClose := conn.WriteControl(
1✔
220
                                        websocket.CloseMessage,
1✔
221
                                        errBody,
1✔
222
                                        time.Now().Add(writeWait),
1✔
223
                                )
1✔
224
                                if errClose != nil {
1✔
225
                                        err = errors.Wrapf(err,
×
226
                                                "error sending websocket close frame: %s",
×
UNCOV
227
                                                errClose.Error(),
×
UNCOV
228
                                        )
×
UNCOV
229
                                }
×
230
                        }
231
                        l.Errorf("websocket closed with error: %s", err.Error())
2✔
232
                }
233
                conn.Close()
2✔
234
        }()
235

236
        // handle the ping-pong connection health check
237
        err = conn.SetReadDeadline(time.Now().Add(pongWait))
2✔
238
        if err != nil {
2✔
UNCOV
239
                l.Error(err)
×
UNCOV
240
                return err
×
241
        }
×
242

243
        pingPeriod := (pongWait * 9) / 10
2✔
244
        ticker := time.NewTicker(pingPeriod)
2✔
245
        defer ticker.Stop()
2✔
246
        conn.SetPongHandler(func(string) error {
3✔
247
                ticker.Reset(pingPeriod)
1✔
248
                return conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
249
        })
1✔
250
        conn.SetPingHandler(func(msg string) error {
3✔
251
                ticker.Reset(pingPeriod)
1✔
252
                err := conn.SetReadDeadline(time.Now().Add(pongWait))
1✔
253
                if err != nil {
1✔
UNCOV
254
                        return err
×
255
                }
×
256
                return conn.WriteControl(
1✔
257
                        websocket.PongMessage,
1✔
258
                        []byte(msg),
1✔
259
                        time.Now().Add(writeWait),
1✔
260
                )
1✔
261
        })
262
Loop:
2✔
263
        for {
4✔
264
                select {
2✔
265
                case msg := <-msgChan:
2✔
266
                        err = conn.WriteMessage(websocket.BinaryMessage, msg.Data)
2✔
267
                        if err != nil {
2✔
UNCOV
268
                                l.Error(err)
×
UNCOV
269
                                break Loop
×
270
                        }
UNCOV
271
                case <-ctx.Done():
×
UNCOV
272
                        err = errors.New("connection closed by the server")
×
UNCOV
273
                        break Loop
×
274
                case <-ticker.C:
1✔
275
                        if !websocketPing(conn) {
1✔
UNCOV
276
                                err = errors.New("connection timeout")
×
UNCOV
277
                                break Loop
×
278
                        }
279
                case err := <-errChan:
2✔
280
                        return err
2✔
281
                }
282
        }
UNCOV
283
        return err
×
284
}
285

286
func (h DeviceController) ConnectServeWS(
287
        ctx context.Context,
288
        conn *websocket.Conn,
289
) (err error) {
2✔
290
        l := log.FromContext(ctx)
2✔
291
        id := identity.FromContext(ctx)
2✔
292
        sessMap := make(map[string]*model.ActiveSession)
2✔
293

2✔
294
        // update the device status on websocket opening
2✔
295
        var version int64
2✔
296
        version, err = h.app.SetDeviceConnected(ctx, id.Tenant, id.Subject)
2✔
297
        if err != nil {
2✔
UNCOV
298
                l.Error(err)
×
UNCOV
299
                return
×
UNCOV
300
        }
×
301
        defer func() {
4✔
302
                for sessionID, session := range sessMap {
4✔
303
                        // TODO: notify the session NATS topic about the session
2✔
304
                        //       being released.
2✔
305
                        if session.RemoteTerminal {
3✔
306
                                msg := ws.ProtoMsg{
1✔
307
                                        Header: ws.ProtoHdr{
1✔
308
                                                Proto:     ws.ProtoTypeShell,
1✔
309
                                                MsgType:   shell.MessageTypeStopShell,
1✔
310
                                                SessionID: sessionID,
1✔
311
                                                Properties: map[string]interface{}{
1✔
312
                                                        "status": shell.ErrorMessage,
1✔
313
                                                },
1✔
314
                                        },
1✔
315
                                        Body: []byte("device disconnected"),
1✔
316
                                }
1✔
317
                                data, _ := msgpack.Marshal(msg)
1✔
318
                                err = h.nats.Publish(
1✔
319
                                        model.GetSessionSubject(id.Tenant, sessionID),
1✔
320
                                        data,
1✔
321
                                )
1✔
322
                        }
1✔
323
                }
324
                // update the device status on websocket closing
325
                eStatus := h.app.SetDeviceDisconnected(
2✔
326
                        ctx, id.Tenant,
2✔
327
                        id.Subject, version,
2✔
328
                )
2✔
329
                if eStatus != nil {
2✔
UNCOV
330
                        l.Error(eStatus)
×
UNCOV
331
                }
×
332
        }()
333

334
        var data []byte
2✔
335
        for {
4✔
336
                _, data, err = conn.ReadMessage()
2✔
337
                if err != nil {
3✔
338
                        return err
1✔
339
                }
1✔
340
                m := &ws.ProtoMsg{}
2✔
341
                err = msgpack.Unmarshal(data, m)
2✔
342
                if err != nil {
3✔
343
                        return err
1✔
344
                }
1✔
345

346
                sessMap[m.Header.SessionID] = &model.ActiveSession{}
2✔
347
                switch m.Header.Proto {
2✔
348
                case ws.ProtoTypeShell:
2✔
349
                        if m.Header.SessionID == "" {
3✔
350
                                return errors.New("api: message missing required session ID")
1✔
351
                        } else if m.Header.MsgType == shell.MessageTypeSpawnShell {
4✔
352
                                if session, ok := sessMap[m.Header.SessionID]; ok {
2✔
353
                                        session.RemoteTerminal = true
1✔
354
                                }
1✔
355
                        } else if m.Header.MsgType == shell.MessageTypeStopShell {
3✔
356
                                delete(sessMap, m.Header.SessionID)
1✔
357
                        }
1✔
UNCOV
358
                default:
×
359
                        // TODO: Handle protocol violation
360
                }
361

362
                err = h.nats.Publish(
2✔
363
                        model.GetSessionSubject(id.Tenant, m.Header.SessionID),
2✔
364
                        data,
2✔
365
                )
2✔
366
                if err != nil {
2✔
UNCOV
367
                        return err
×
UNCOV
368
                }
×
369
        }
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc