• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12187193240

05 Dec 2024 08:04PM UTC coverage: 58.933% (-0.02%) from 58.951%
12187193240

push

github

web-flow
Merge pull request #9333 from guggero/aux-traffic-shaper-refactor

[custom channels]: refactor AuxTrafficManager to be used for forwarding as well

30 of 81 new or added lines in 7 files covered. (37.04%)

57 existing lines in 14 files now uncovered.

133458 of 226459 relevant lines covered (58.93%)

19547.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.95
/watchtower/wtclient/session_queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/input"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
16
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
17
)
18

19
// sessionQueueStatus is an enum that signals how full a particular session is.
20
type sessionQueueStatus uint8
21

22
const (
23
        // sessionQueueAvailable indicates that the session has space for at
24
        // least one more backup.
25
        sessionQueueAvailable sessionQueueStatus = iota
26

27
        // sessionQueueExhausted indicates that all slots in the session have
28
        // been allocated.
29
        sessionQueueExhausted
30

31
        // sessionQueueShuttingDown indicates that the session queue is
32
        // shutting down and so is no longer accepting any more backups.
33
        sessionQueueShuttingDown
34
)
35

36
// sessionQueueConfig bundles the resources required by the sessionQueue to
37
// perform its duties. All entries MUST be non-nil.
38
type sessionQueueConfig struct {
39
        // ClientSession provides access to the negotiated session parameters
40
        // and updating its persistent storage.
41
        ClientSession *ClientSession
42

43
        // ChainHash identifies the chain for which the session's justice
44
        // transactions are targeted.
45
        ChainHash chainhash.Hash
46

47
        // Dial allows the client to dial the tower using it's public key and
48
        // net address.
49
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
50
                error)
51

52
        // SendMessage encodes, encrypts, and writes a message to the given
53
        // peer.
54
        SendMessage func(wtserver.Peer, wtwire.Message) error
55

56
        // ReadMessage receives, decypts, and decodes a message from the given
57
        // peer.
58
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
59

60
        // Signer facilitates signing of inputs, used to construct the witnesses
61
        // for justice transaction inputs.
62
        Signer input.Signer
63

64
        // BuildBreachRetribution is a function closure that allows the client
65
        // to fetch the breach retribution info for a certain channel at a
66
        // certain revoked commitment height.
67
        BuildBreachRetribution BreachRetributionBuilder
68

69
        // TaskPipeline is a pipeline which the sessionQueue should use to send
70
        // any unhandled tasks on shutdown of the queue.
71
        TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
72

73
        // DB provides access to the client's stable storage.
74
        DB DB
75

76
        // MinBackoff defines the initial backoff applied by the session
77
        // queue before reconnecting to the tower after a failed or partially
78
        // successful batch is sent. Subsequent backoff durations will grow
79
        // exponentially up until MaxBackoff.
80
        MinBackoff time.Duration
81

82
        // MaxBackoff defines the maximum backoff applied by the session
83
        // queue before reconnecting to the tower after a failed or partially
84
        // successful batch is sent. If the exponential backoff produces a
85
        // timeout greater than this value, the backoff duration will be clamped
86
        // to MaxBackoff.
87
        MaxBackoff time.Duration
88

89
        // Log specifies the desired log output, which should be prefixed by the
90
        // client type, e.g. anchor or legacy.
91
        Log btclog.Logger
92
}
93

94
// sessionQueue implements a reliable queue that will encrypt and send accepted
95
// backups to the watchtower specified in the config's ClientSession. Calling
96
// Stop will attempt to perform a clean shutdown replaying any un-committed
97
// pending updates to the client's main task pipeline.
98
type sessionQueue struct {
99
        started sync.Once
100
        stopped sync.Once
101
        forced  sync.Once
102

103
        cfg *sessionQueueConfig
104
        log btclog.Logger
105

106
        commitQueue  *list.List
107
        pendingQueue *list.List
108
        queueMtx     sync.Mutex
109
        queueCond    *sync.Cond
110

111
        localInit *wtwire.Init
112
        tower     *Tower
113

114
        seqNum uint16
115

116
        retryBackoff time.Duration
117

118
        quit chan struct{}
119
        wg   sync.WaitGroup
120
}
121

122
// newSessionQueue initializes a fresh sessionQueue.
123
func newSessionQueue(cfg *sessionQueueConfig,
124
        updates []wtdb.CommittedUpdate) *sessionQueue {
85✔
125

85✔
126
        localInit := wtwire.NewInitMessage(
85✔
127
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
85✔
128
                cfg.ChainHash,
85✔
129
        )
85✔
130

85✔
131
        sq := &sessionQueue{
85✔
132
                cfg:          cfg,
85✔
133
                log:          cfg.Log,
85✔
134
                commitQueue:  list.New(),
85✔
135
                pendingQueue: list.New(),
85✔
136
                localInit:    localInit,
85✔
137
                tower:        cfg.ClientSession.Tower,
85✔
138
                seqNum:       cfg.ClientSession.SeqNum,
85✔
139
                retryBackoff: cfg.MinBackoff,
85✔
140
                quit:         make(chan struct{}),
85✔
141
        }
85✔
142
        sq.queueCond = sync.NewCond(&sq.queueMtx)
85✔
143

85✔
144
        // The database should return them in sorted order, and session queue's
85✔
145
        // sequence number will be equal to that of the last committed update.
85✔
146
        for _, update := range updates {
88✔
147
                sq.commitQueue.PushBack(update)
3✔
148
        }
3✔
149

150
        return sq
85✔
151
}
152

153
// Start idempotently starts the sessionQueue so that it can begin accepting
154
// backups.
155
func (q *sessionQueue) Start() {
85✔
156
        q.started.Do(func() {
170✔
157
                q.wg.Add(1)
85✔
158
                go q.sessionManager()
85✔
159
        })
85✔
160
}
161

162
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
163
// will clear all pending tasks in the queue before returning to the caller.
164
// The final param should only be set to true if this is the last time that
165
// this session will be used. Otherwise, during normal shutdown, the final param
166
// should be false.
167
func (q *sessionQueue) Stop(final bool) error {
85✔
168
        var returnErr error
85✔
169
        q.stopped.Do(func() {
170✔
170
                q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
85✔
171

85✔
172
                close(q.quit)
85✔
173

85✔
174
                shutdown := make(chan struct{})
85✔
175
                go func() {
170✔
176
                        for {
244✔
177
                                select {
159✔
178
                                case <-time.After(time.Millisecond):
78✔
179
                                        q.queueCond.Signal()
78✔
180
                                case <-shutdown:
85✔
181
                                        return
85✔
182
                                }
183
                        }
184
                }()
185

186
                q.wg.Wait()
85✔
187
                close(shutdown)
85✔
188

85✔
189
                // Now, for any task in the pending queue that we have not yet
85✔
190
                // created a CommittedUpdate for, re-add the task to the main
85✔
191
                // task pipeline.
85✔
192
                updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
85✔
193
                if err != nil {
85✔
194
                        returnErr = err
×
195
                        return
×
196
                }
×
197

198
                unAckedUpdates := make(map[wtdb.BackupID]bool)
85✔
199
                for _, update := range updates {
92✔
200
                        unAckedUpdates[update.BackupID] = true
7✔
201

7✔
202
                        if !final {
10✔
203
                                continue
3✔
204
                        }
205

206
                        err := q.cfg.TaskPipeline.QueueBackupID(
4✔
207
                                &update.BackupID,
4✔
208
                        )
4✔
209
                        if err != nil {
4✔
210
                                log.Errorf("could not re-queue %s: %v",
×
211
                                        update.BackupID, err)
×
212
                                continue
×
213
                        }
214
                }
215

216
                if final {
101✔
217
                        err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
16✔
218
                        if err != nil {
16✔
219
                                log.Errorf("could not delete committed "+
×
220
                                        "updates for session %s", q.ID())
×
221
                        }
×
222
                }
223

224
                // Push any task that was on the pending queue that there is
225
                // not yet a committed update for back to the main task
226
                // pipeline.
227
                q.queueCond.L.Lock()
85✔
228
                for q.pendingQueue.Len() > 0 {
96✔
229
                        next := q.pendingQueue.Front()
11✔
230
                        q.pendingQueue.Remove(next)
11✔
231

11✔
232
                        //nolint:forcetypeassert
11✔
233
                        task := next.Value.(*backupTask)
11✔
234

11✔
235
                        if unAckedUpdates[task.id] {
16✔
236
                                continue
5✔
237
                        }
238

239
                        err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
6✔
240
                        if err != nil {
6✔
241
                                log.Errorf("could not re-queue backup task: "+
×
242
                                        "%v", err)
×
243
                                continue
×
244
                        }
245
                }
246
                q.queueCond.L.Unlock()
85✔
247

85✔
248
                q.log.Debugf("SessionQueue(%s) stopped", q.ID())
85✔
249
        })
250

251
        return returnErr
85✔
252
}
253

254
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
255
// identify this a particular queue.
256
func (q *sessionQueue) ID() *wtdb.SessionID {
4,274✔
257
        return &q.cfg.ClientSession.ID
4,274✔
258
}
4,274✔
259

260
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
261
// tower. The session will only be accepted if the queue is not already
262
// exhausted or shutting down and the task is successfully bound to the
263
// ClientSession.
264
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
484✔
265
        // Exit early if the queue has started shutting down.
484✔
266
        select {
484✔
267
        case <-q.quit:
×
268
                return sessionQueueShuttingDown, false
×
269
        default:
484✔
270
        }
271

272
        q.queueCond.L.Lock()
484✔
273

484✔
274
        // There is a chance that sessionQueue started shutting down between
484✔
275
        // the last quit channel check and waiting for the lock. So check one
484✔
276
        // more time here.
484✔
277
        select {
484✔
278
        case <-q.quit:
×
279
                q.queueCond.L.Unlock()
×
280
                return sessionQueueShuttingDown, false
×
281
        default:
484✔
282
        }
283

284
        numPending := uint32(q.pendingQueue.Len())
484✔
285
        maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
484✔
286
        q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
484✔
287
                "pending=%d max-updates=%d",
484✔
288
                q.ID(), task.id, q.seqNum, numPending, maxUpdates)
484✔
289

484✔
290
        // Examine the current reserve status of the session queue.
484✔
291
        curStatus := q.status()
484✔
292

484✔
293
        switch curStatus {
484✔
294

295
        // The session queue is exhausted, and cannot accept the task because it
296
        // is full. Reject the task such that it can be tried against a
297
        // different session.
298
        case sessionQueueExhausted:
1✔
299
                q.queueCond.L.Unlock()
1✔
300
                return curStatus, false
1✔
301

302
        // The session queue is not exhausted. Compute the sweep and reward
303
        // outputs as a function of the session parameters. If the outputs are
304
        // dusty or uneconomical to backup, the task is rejected and will not be
305
        // tried again.
306
        //
307
        // TODO(conner): queue backups and retry with different session params.
308
        case sessionQueueAvailable:
483✔
309
                err := task.bindSession(
483✔
310
                        &q.cfg.ClientSession.ClientSessionBody,
483✔
311
                        q.cfg.BuildBreachRetribution,
483✔
312
                )
483✔
313
                if err != nil {
488✔
314
                        q.queueCond.L.Unlock()
5✔
315
                        q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
5✔
316
                                q.ID(), task.id, err)
5✔
317
                        return curStatus, false
5✔
318
                }
5✔
319
        }
320

321
        // The sweep and reward outputs satisfy the session's policy, queue the
322
        // task for final signing and delivery.
323
        q.pendingQueue.PushBack(task)
478✔
324

478✔
325
        // Finally, compute the session's *new* reserve status. This will be
478✔
326
        // used by the client to determine if it can continue using this session
478✔
327
        // queue, or if it should negotiate a new one.
478✔
328
        newStatus := q.status()
478✔
329
        q.queueCond.L.Unlock()
478✔
330

478✔
331
        q.queueCond.Signal()
478✔
332

478✔
333
        return newStatus, true
478✔
334
}
335

336
// sessionManager is the primary event loop for the sessionQueue, and is
337
// responsible for encrypting and sending accepted tasks to the tower.
338
func (q *sessionQueue) sessionManager() {
85✔
339
        defer q.wg.Done()
85✔
340

85✔
341
        for {
497✔
342
                q.queueCond.L.Lock()
412✔
343
                for q.commitQueue.Len() == 0 &&
412✔
344
                        q.pendingQueue.Len() == 0 {
520✔
345

108✔
346
                        q.queueCond.Wait()
108✔
347

108✔
348
                        select {
108✔
349
                        case <-q.quit:
77✔
350
                                q.queueCond.L.Unlock()
77✔
351
                                return
77✔
352
                        default:
35✔
353
                        }
354
                }
355
                q.queueCond.L.Unlock()
339✔
356

339✔
357
                // Exit immediately if the sessionQueue has been stopped.
339✔
358
                select {
339✔
359
                case <-q.quit:
8✔
360
                        return
8✔
361
                default:
331✔
362
                }
363

364
                // Initiate a new connection to the watchtower and attempt to
365
                // drain all pending tasks.
366
                q.drainBackups()
331✔
367
        }
368
}
369

370
// drainBackups attempts to send all pending updates in the queue to the tower.
371
func (q *sessionQueue) drainBackups() {
331✔
372
        var (
331✔
373
                conn      wtserver.Peer
331✔
374
                err       error
331✔
375
                towerAddr = q.tower.Addresses.Peek()
331✔
376
        )
331✔
377

331✔
378
        for {
662✔
379
                q.log.Infof("SessionQueue(%s) attempting to dial tower at %v",
331✔
380
                        q.ID(), towerAddr)
331✔
381

331✔
382
                // First, check that we are able to dial this session's tower.
331✔
383
                conn, err = q.cfg.Dial(
331✔
384
                        q.cfg.ClientSession.SessionKeyECDH, &lnwire.NetAddress{
331✔
385
                                IdentityKey: q.tower.IdentityKey,
331✔
386
                                Address:     towerAddr,
331✔
387
                        },
331✔
388
                )
331✔
389
                if err != nil {
338✔
390
                        // If there are more addrs available, immediately try
7✔
391
                        // those.
7✔
392
                        nextAddr, iteratorErr := q.tower.Addresses.Next()
7✔
393
                        if iteratorErr == nil {
7✔
394
                                towerAddr = nextAddr
×
395
                                continue
×
396
                        }
397

398
                        // Otherwise, if we have exhausted the address list,
399
                        // back off and try again later.
400
                        q.tower.Addresses.Reset()
7✔
401

7✔
402
                        q.log.Errorf("SessionQueue(%s) unable to dial tower "+
7✔
403
                                "at any available Addresses: %v", q.ID(), err)
7✔
404

7✔
405
                        q.increaseBackoff()
7✔
406
                        select {
7✔
407
                        case <-time.After(q.retryBackoff):
6✔
408
                        case <-q.quit:
1✔
409
                        }
410
                        return
7✔
411
                }
412

413
                break
324✔
414
        }
415
        defer conn.Close()
324✔
416

324✔
417
        // Begin draining the queue of pending state updates. Before the first
324✔
418
        // update is sent, we will precede it with an Init message. If the first
324✔
419
        // is successful, subsequent updates can be streamed without sending an
324✔
420
        // Init.
324✔
421
        for sendInit := true; ; sendInit = false {
1,011✔
422
                // Generate the next state update to upload to the tower. This
687✔
423
                // method will first proceed in dequeuing committed updates
687✔
424
                // before attempting to dequeue any pending updates.
687✔
425
                stateUpdate, isPending, backupID, err := q.nextStateUpdate()
687✔
426
                if err != nil {
687✔
427
                        q.log.Errorf("SessionQueue(%v) unable to get next "+
×
428
                                "state update: %v", q.ID(), err)
×
429
                        return
×
430
                }
×
431

432
                // Now, send the state update to the tower and wait for a reply.
433
                err = q.sendStateUpdate(conn, stateUpdate, sendInit, isPending)
687✔
434
                if err != nil {
906✔
435
                        q.log.Errorf("SessionQueue(%s) unable to send state "+
219✔
436
                                "update: %v", q.ID(), err)
219✔
437

219✔
438
                        q.increaseBackoff()
219✔
439
                        select {
219✔
440
                        case <-time.After(q.retryBackoff):
212✔
441
                        case <-q.quit:
7✔
442
                        }
443
                        return
219✔
444
                }
445

446
                q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
468✔
447
                        q.ID(), backupID, stateUpdate.SeqNum)
468✔
448

468✔
449
                // If the last task was backed up successfully, we'll exit and
468✔
450
                // continue once more tasks are added to the queue. We'll also
468✔
451
                // clear any accumulated backoff as this batch was able to be
468✔
452
                // sent reliably.
468✔
453
                if stateUpdate.IsComplete == 1 {
573✔
454
                        q.resetBackoff()
105✔
455
                        return
105✔
456
                }
105✔
457

458
                // Always apply a small delay between sends, which makes the
459
                // unit tests more reliable. If we were requested to back off,
460
                // when we will do so.
461
                select {
367✔
462
                case <-time.After(time.Millisecond):
367✔
463
                case <-q.quit:
×
464
                        return
×
465
                }
466
        }
467
}
468

469
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
470
// If any committed updates are present, this method will reconstruct the state
471
// update from the committed update using the current last applied value found
472
// in the database. Otherwise, it will select the next pending update, craft the
473
// payload, and commit an update before returning the state update to send. The
474
// boolean value in the response is true if the state update is taken from the
475
// pending queue, allowing the caller to remove the update from either the
476
// commit or pending queue if the update is successfully acked.
477
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
478
        wtdb.BackupID, error) {
687✔
479

687✔
480
        var (
687✔
481
                seqNum    uint16
687✔
482
                update    wtdb.CommittedUpdate
687✔
483
                isLast    bool
687✔
484
                isPending bool
687✔
485
        )
687✔
486

687✔
487
        q.queueCond.L.Lock()
687✔
488
        switch {
687✔
489

490
        // If the commit queue is non-empty, parse the next committed update.
491
        case q.commitQueue.Len() > 0:
3✔
492
                next := q.commitQueue.Front()
3✔
493

3✔
494
                update = next.Value.(wtdb.CommittedUpdate)
3✔
495
                seqNum = update.SeqNum
3✔
496

3✔
497
                // If this is the last item in the commit queue and no items
3✔
498
                // exist in the pending queue, we will use the IsComplete flag
3✔
499
                // in the StateUpdate to signal that the tower can release the
3✔
500
                // connection after replying to free up resources.
3✔
501
                isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
3✔
502
                q.queueCond.L.Unlock()
3✔
503

3✔
504
                q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
3✔
505
                        "update for %v seqnum=%d",
3✔
506
                        q.ID(), update.BackupID, seqNum)
3✔
507

508
        // Otherwise, craft and commit the next update from the pending queue.
509
        default:
684✔
510
                isPending = true
684✔
511

684✔
512
                // Determine the current sequence number to apply for this
684✔
513
                // pending update.
684✔
514
                seqNum = q.seqNum + 1
684✔
515

684✔
516
                // Obtain the next task from the queue.
684✔
517
                next := q.pendingQueue.Front()
684✔
518
                task := next.Value.(*backupTask)
684✔
519

684✔
520
                // If this is the last item in the pending queue, we will use
684✔
521
                // the IsComplete flag in the StateUpdate to signal that the
684✔
522
                // tower can release the connection after replying to free up
684✔
523
                // resources.
684✔
524
                isLast = q.pendingQueue.Len() == 1
684✔
525
                q.queueCond.L.Unlock()
684✔
526

684✔
527
                hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
684✔
528
                if err != nil {
684✔
529
                        // TODO(conner): mark will not send
×
530
                        err := fmt.Errorf("unable to craft session payload: %w",
×
531
                                err)
×
532
                        return nil, false, wtdb.BackupID{}, err
×
533
                }
×
534
                // TODO(conner): special case other obscure errors
535

536
                update = wtdb.CommittedUpdate{
684✔
537
                        SeqNum: seqNum,
684✔
538
                        CommittedUpdateBody: wtdb.CommittedUpdateBody{
684✔
539
                                BackupID:      task.id,
684✔
540
                                Hint:          hint,
684✔
541
                                EncryptedBlob: encBlob,
684✔
542
                        },
684✔
543
                }
684✔
544

684✔
545
                q.log.Debugf("SessionQueue(%s) committing state update "+
684✔
546
                        "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
684✔
547
        }
548

549
        // Before sending the task to the tower, commit the state update
550
        // to disk using the assigned sequence number. If this task has already
551
        // been committed, the call will succeed and only be used for the
552
        // purpose of obtaining the last applied value to send to the tower.
553
        //
554
        // This step ensures that if we crash before receiving an ack that we
555
        // will retransmit the same update. If the tower successfully received
556
        // the update from before, it will reply with an ACK regardless of what
557
        // we send the next time. This step ensures that if we reliably send the
558
        // same update for a given sequence number, to prevent us from thinking
559
        // we backed up a state when we instead backed up another.
560
        lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
687✔
561
        if err != nil {
687✔
562
                // TODO(conner): mark failed/reschedule
×
563
                err := fmt.Errorf("unable to commit state update for "+
×
564
                        "%v seqnum=%d: %v", update.BackupID, seqNum, err)
×
565
                return nil, false, wtdb.BackupID{}, err
×
566
        }
×
567

568
        stateUpdate := &wtwire.StateUpdate{
687✔
569
                SeqNum:        update.SeqNum,
687✔
570
                LastApplied:   lastApplied,
687✔
571
                Hint:          update.Hint,
687✔
572
                EncryptedBlob: update.EncryptedBlob,
687✔
573
        }
687✔
574

687✔
575
        // Set the IsComplete flag if this is the last queued item.
687✔
576
        if isLast {
816✔
577
                stateUpdate.IsComplete = 1
129✔
578
        }
129✔
579

580
        return stateUpdate, isPending, update.BackupID, nil
687✔
581
}
582

583
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
584
// the ACK before returning. If sendInit is true, this method will first send
585
// the localInit message and verify that the tower supports our required feature
586
// bits. And error is returned if any part of the send fails. The boolean return
587
// variable indicates whether we should back off before attempting to send the
588
// next state update.
589
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
590
        stateUpdate *wtwire.StateUpdate, sendInit, isPending bool) error {
687✔
591

687✔
592
        towerAddr := &lnwire.NetAddress{
687✔
593
                IdentityKey: conn.RemotePub(),
687✔
594
                Address:     conn.RemoteAddr(),
687✔
595
        }
687✔
596

687✔
597
        // If this is the first message being sent to the tower, we must send an
687✔
598
        // Init message to establish that server supports the features we
687✔
599
        // require.
687✔
600
        if sendInit {
1,011✔
601
                // Send Init to tower.
324✔
602
                err := q.cfg.SendMessage(conn, q.localInit)
324✔
603
                if err != nil {
324✔
UNCOV
604
                        return err
×
UNCOV
605
                }
×
606

607
                // Receive Init from tower.
608
                remoteMsg, err := q.cfg.ReadMessage(conn)
324✔
609
                if err != nil {
324✔
610
                        return err
×
611
                }
×
612

613
                remoteInit, ok := remoteMsg.(*wtwire.Init)
324✔
614
                if !ok {
324✔
615
                        return fmt.Errorf("watchtower %s responded with %T "+
×
616
                                "to Init", towerAddr, remoteMsg)
×
617
                }
×
618

619
                // Validate Init.
620
                err = q.localInit.CheckRemoteInit(
324✔
621
                        remoteInit, wtwire.FeatureNames,
324✔
622
                )
324✔
623
                if err != nil {
324✔
624
                        return err
×
625
                }
×
626
        }
627

628
        // Send StateUpdate to tower.
629
        err := q.cfg.SendMessage(conn, stateUpdate)
687✔
630
        if err != nil {
687✔
631
                return err
×
632
        }
×
633

634
        // Receive StateUpdate from tower.
635
        remoteMsg, err := q.cfg.ReadMessage(conn)
687✔
636
        if err != nil {
906✔
637
                return err
219✔
638
        }
219✔
639

640
        stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
468✔
641
        if !ok {
468✔
642
                return fmt.Errorf("watchtower %s responded with %T to "+
×
643
                        "StateUpdate", towerAddr, remoteMsg)
×
644
        }
×
645

646
        // Process the reply from the tower.
647
        switch stateUpdateReply.Code {
468✔
648

649
        // The tower reported a successful update, validate the response and
650
        // record the last applied returned.
651
        case wtwire.CodeOK:
468✔
652

653
        // TODO(conner): handle other error cases properly, ban towers, etc.
654
        default:
×
655
                err := fmt.Errorf("received error code %v in "+
×
656
                        "StateUpdateReply for seqnum=%d",
×
657
                        stateUpdateReply.Code, stateUpdate.SeqNum)
×
658
                q.log.Warnf("SessionQueue(%s) unable to upload state update "+
×
659
                        "to tower=%s: %v", q.ID(), towerAddr, err)
×
660
                return err
×
661
        }
662

663
        lastApplied := stateUpdateReply.LastApplied
468✔
664
        err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
468✔
665
        switch {
468✔
666
        case err == wtdb.ErrUnallocatedLastApplied:
×
667
                // TODO(conner): borked watchtower
×
668
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
669
                        stateUpdate.SeqNum, err)
×
670
                q.log.Errorf("SessionQueue(%v) failed to ack update: %v",
×
671
                        q.ID(), err)
×
672
                return err
×
673

674
        case err == wtdb.ErrLastAppliedReversion:
×
675
                // TODO(conner): borked watchtower
×
676
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
677
                        stateUpdate.SeqNum, err)
×
678
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
679
                        q.ID(), err)
×
680
                return err
×
681

682
        case err != nil:
×
683
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
684
                        stateUpdate.SeqNum, err)
×
685
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
686
                        q.ID(), err)
×
687
                return err
×
688
        }
689

690
        q.queueCond.L.Lock()
468✔
691
        if isPending {
935✔
692
                // If a pending update was successfully sent, increment the
467✔
693
                // sequence number and remove the item from the queue. This
467✔
694
                // ensures the total number of backups in the session remains
467✔
695
                // unchanged, which maintains the external view of the session's
467✔
696
                // reserve status.
467✔
697
                q.seqNum++
467✔
698
                q.pendingQueue.Remove(q.pendingQueue.Front())
467✔
699
        } else {
468✔
700
                // Otherwise, simply remove the update from the committed queue.
1✔
701
                // This has no effect on the queues reserve status since the
1✔
702
                // update had already been committed.
1✔
703
                q.commitQueue.Remove(q.commitQueue.Front())
1✔
704
        }
1✔
705
        q.queueCond.L.Unlock()
468✔
706

468✔
707
        return nil
468✔
708
}
709

710
// status returns a sessionQueueStatus indicating whether the sessionQueue can
711
// accept another task. sessionQueueAvailable is returned when a task can be
712
// accepted, and sessionQueueExhausted is returned if the all slots in the
713
// session have been allocated.
714
//
715
// NOTE: This method MUST be called with queueCond's exclusive lock held.
716
func (q *sessionQueue) status() sessionQueueStatus {
958✔
717
        numPending := uint32(q.pendingQueue.Len())
958✔
718
        maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
958✔
719

958✔
720
        if uint32(q.seqNum)+numPending < maxUpdates {
1,872✔
721
                return sessionQueueAvailable
914✔
722
        }
914✔
723

724
        return sessionQueueExhausted
48✔
725

726
}
727

728
// resetBackoff returns the connection backoff the minimum configured backoff.
729
func (q *sessionQueue) resetBackoff() {
105✔
730
        q.retryBackoff = q.cfg.MinBackoff
105✔
731
}
105✔
732

733
// increaseBackoff doubles the current connection backoff, clamping to the
734
// configured maximum backoff if it would exceed the limit.
735
func (q *sessionQueue) increaseBackoff() {
226✔
736
        q.retryBackoff *= 2
226✔
737
        if q.retryBackoff > q.cfg.MaxBackoff {
226✔
738
                q.retryBackoff = q.cfg.MaxBackoff
×
739
        }
×
740
}
741

742
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
743
// sessionQueue.
744
type sessionQueueSet struct {
745
        queues map[wtdb.SessionID]*sessionQueue
746
        mu     sync.Mutex
747
}
748

749
// newSessionQueueSet constructs a new sessionQueueSet.
750
func newSessionQueueSet() *sessionQueueSet {
40✔
751
        return &sessionQueueSet{
40✔
752
                queues: make(map[wtdb.SessionID]*sessionQueue),
40✔
753
        }
40✔
754
}
40✔
755

756
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
757
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
85✔
758
        s.mu.Lock()
85✔
759
        defer s.mu.Unlock()
85✔
760

85✔
761
        s.queues[*sessionQueue.ID()] = sessionQueue
85✔
762

85✔
763
        sessionQueue.Start()
85✔
764
}
85✔
765

766
// StopAndRemove stops the given session queue and removes it from the
767
// sessionQueueSet.
768
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID, final bool) error {
21✔
769
        s.mu.Lock()
21✔
770
        defer s.mu.Unlock()
21✔
771

21✔
772
        queue, ok := s.queues[id]
21✔
773
        if !ok {
27✔
774
                return nil
6✔
775
        }
6✔
776

777
        delete(s.queues, id)
19✔
778

19✔
779
        return queue.Stop(final)
19✔
780
}
781

782
// Get fetches and returns the sessionQueue with the given ID.
783
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
87✔
784
        s.mu.Lock()
87✔
785
        defer s.mu.Unlock()
87✔
786

87✔
787
        q, ok := s.queues[id]
87✔
788

87✔
789
        return q, ok
87✔
790
}
87✔
791

792
// ApplyAndWait executes the nil-adic function returned from getApply for each
793
// sessionQueue in the set in parallel, then waits for all of them to finish
794
// before returning to the caller.
795
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
40✔
796
        s.mu.Lock()
40✔
797
        defer s.mu.Unlock()
40✔
798

40✔
799
        var wg sync.WaitGroup
40✔
800
        for _, sessionq := range s.queues {
110✔
801
                wg.Add(1)
70✔
802
                go func(sq *sessionQueue) {
140✔
803
                        defer wg.Done()
70✔
804

70✔
805
                        getApply(sq)()
70✔
806
                }(sessionq)
70✔
807
        }
808
        wg.Wait()
40✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc