• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12430728843

20 Dec 2024 11:36AM UTC coverage: 61.336% (+2.6%) from 58.716%
12430728843

Pull #8777

github

ziggie1984
channeldb: fix typo.
Pull Request #8777: multi: make reassignment of alias channel edge atomic

161 of 213 new or added lines in 7 files covered. (75.59%)

70 existing lines in 17 files now uncovered.

23369 of 38100 relevant lines covered (61.34%)

115813.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.92
/watchtower/wtclient/session_queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/input"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
16
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
17
)
18

19
// sessionQueueStatus is an enum that signals how full a particular session is.
20
type sessionQueueStatus uint8
21

22
const (
23
        // sessionQueueAvailable indicates that the session has space for at
24
        // least one more backup.
25
        sessionQueueAvailable sessionQueueStatus = iota
26

27
        // sessionQueueExhausted indicates that all slots in the session have
28
        // been allocated.
29
        sessionQueueExhausted
30

31
        // sessionQueueShuttingDown indicates that the session queue is
32
        // shutting down and so is no longer accepting any more backups.
33
        sessionQueueShuttingDown
34
)
35

36
// sessionQueueConfig bundles the resources required by the sessionQueue to
37
// perform its duties. All entries MUST be non-nil.
38
type sessionQueueConfig struct {
39
        // ClientSession provides access to the negotiated session parameters
40
        // and updating its persistent storage.
41
        ClientSession *ClientSession
42

43
        // ChainHash identifies the chain for which the session's justice
44
        // transactions are targeted.
45
        ChainHash chainhash.Hash
46

47
        // Dial allows the client to dial the tower using it's public key and
48
        // net address.
49
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
50
                error)
51

52
        // SendMessage encodes, encrypts, and writes a message to the given
53
        // peer.
54
        SendMessage func(wtserver.Peer, wtwire.Message) error
55

56
        // ReadMessage receives, decypts, and decodes a message from the given
57
        // peer.
58
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
59

60
        // Signer facilitates signing of inputs, used to construct the witnesses
61
        // for justice transaction inputs.
62
        Signer input.Signer
63

64
        // BuildBreachRetribution is a function closure that allows the client
65
        // to fetch the breach retribution info for a certain channel at a
66
        // certain revoked commitment height.
67
        BuildBreachRetribution BreachRetributionBuilder
68

69
        // TaskPipeline is a pipeline which the sessionQueue should use to send
70
        // any unhandled tasks on shutdown of the queue.
71
        TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
72

73
        // DB provides access to the client's stable storage.
74
        DB DB
75

76
        // MinBackoff defines the initial backoff applied by the session
77
        // queue before reconnecting to the tower after a failed or partially
78
        // successful batch is sent. Subsequent backoff durations will grow
79
        // exponentially up until MaxBackoff.
80
        MinBackoff time.Duration
81

82
        // MaxBackoff defines the maximum backoff applied by the session
83
        // queue before reconnecting to the tower after a failed or partially
84
        // successful batch is sent. If the exponential backoff produces a
85
        // timeout greater than this value, the backoff duration will be clamped
86
        // to MaxBackoff.
87
        MaxBackoff time.Duration
88

89
        // Log specifies the desired log output, which should be prefixed by the
90
        // client type, e.g. anchor or legacy.
91
        Log btclog.Logger
92
}
93

94
// sessionQueue implements a reliable queue that will encrypt and send accepted
95
// backups to the watchtower specified in the config's ClientSession. Calling
96
// Stop will attempt to perform a clean shutdown replaying any un-committed
97
// pending updates to the client's main task pipeline.
98
type sessionQueue struct {
99
        started sync.Once
100
        stopped sync.Once
101
        forced  sync.Once
102

103
        cfg *sessionQueueConfig
104
        log btclog.Logger
105

106
        commitQueue  *list.List
107
        pendingQueue *list.List
108
        queueMtx     sync.Mutex
109
        queueCond    *sync.Cond
110

111
        localInit *wtwire.Init
112
        tower     *Tower
113

114
        seqNum uint16
115

116
        retryBackoff time.Duration
117

118
        quit chan struct{}
119
        wg   sync.WaitGroup
120
}
121

122
// newSessionQueue initializes a fresh sessionQueue.
123
func newSessionQueue(cfg *sessionQueueConfig,
124
        updates []wtdb.CommittedUpdate) *sessionQueue {
84✔
125

84✔
126
        localInit := wtwire.NewInitMessage(
84✔
127
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
84✔
128
                cfg.ChainHash,
84✔
129
        )
84✔
130

84✔
131
        sq := &sessionQueue{
84✔
132
                cfg:          cfg,
84✔
133
                log:          cfg.Log,
84✔
134
                commitQueue:  list.New(),
84✔
135
                pendingQueue: list.New(),
84✔
136
                localInit:    localInit,
84✔
137
                tower:        cfg.ClientSession.Tower,
84✔
138
                seqNum:       cfg.ClientSession.SeqNum,
84✔
139
                retryBackoff: cfg.MinBackoff,
84✔
140
                quit:         make(chan struct{}),
84✔
141
        }
84✔
142
        sq.queueCond = sync.NewCond(&sq.queueMtx)
84✔
143

84✔
144
        // The database should return them in sorted order, and session queue's
84✔
145
        // sequence number will be equal to that of the last committed update.
84✔
146
        for _, update := range updates {
86✔
147
                sq.commitQueue.PushBack(update)
2✔
148
        }
2✔
149

150
        return sq
84✔
151
}
152

153
// Start idempotently starts the sessionQueue so that it can begin accepting
154
// backups.
155
func (q *sessionQueue) Start() {
84✔
156
        q.started.Do(func() {
168✔
157
                q.wg.Add(1)
84✔
158
                go q.sessionManager()
84✔
159
        })
84✔
160
}
161

162
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
163
// will clear all pending tasks in the queue before returning to the caller.
164
// The final param should only be set to true if this is the last time that
165
// this session will be used. Otherwise, during normal shutdown, the final param
166
// should be false.
167
func (q *sessionQueue) Stop(final bool) error {
84✔
168
        var returnErr error
84✔
169
        q.stopped.Do(func() {
168✔
170
                q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
84✔
171

84✔
172
                close(q.quit)
84✔
173

84✔
174
                shutdown := make(chan struct{})
84✔
175
                go func() {
168✔
176
                        for {
241✔
177
                                select {
157✔
178
                                case <-time.After(time.Millisecond):
77✔
179
                                        q.queueCond.Signal()
77✔
180
                                case <-shutdown:
84✔
181
                                        return
84✔
182
                                }
183
                        }
184
                }()
185

186
                q.wg.Wait()
84✔
187
                close(shutdown)
84✔
188

84✔
189
                // Now, for any task in the pending queue that we have not yet
84✔
190
                // created a CommittedUpdate for, re-add the task to the main
84✔
191
                // task pipeline.
84✔
192
                updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
84✔
193
                if err != nil {
84✔
194
                        returnErr = err
×
195
                        return
×
196
                }
×
197

198
                unAckedUpdates := make(map[wtdb.BackupID]bool)
84✔
199
                for _, update := range updates {
90✔
200
                        unAckedUpdates[update.BackupID] = true
6✔
201

6✔
202
                        if !final {
8✔
203
                                continue
2✔
204
                        }
205

206
                        err := q.cfg.TaskPipeline.QueueBackupID(
4✔
207
                                &update.BackupID,
4✔
208
                        )
4✔
209
                        if err != nil {
4✔
210
                                log.Errorf("could not re-queue %s: %v",
×
211
                                        update.BackupID, err)
×
212
                                continue
×
213
                        }
214
                }
215

216
                if final {
99✔
217
                        err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
15✔
218
                        if err != nil {
15✔
219
                                log.Errorf("could not delete committed "+
×
220
                                        "updates for session %s", q.ID())
×
221
                        }
×
222
                }
223

224
                // Push any task that was on the pending queue that there is
225
                // not yet a committed update for back to the main task
226
                // pipeline.
227
                q.queueCond.L.Lock()
84✔
228
                for q.pendingQueue.Len() > 0 {
94✔
229
                        next := q.pendingQueue.Front()
10✔
230
                        q.pendingQueue.Remove(next)
10✔
231

10✔
232
                        //nolint:forcetypeassert
10✔
233
                        task := next.Value.(*backupTask)
10✔
234

10✔
235
                        if unAckedUpdates[task.id] {
14✔
236
                                continue
4✔
237
                        }
238

239
                        err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
6✔
240
                        if err != nil {
6✔
241
                                log.Errorf("could not re-queue backup task: "+
×
242
                                        "%v", err)
×
243
                                continue
×
244
                        }
245
                }
246
                q.queueCond.L.Unlock()
84✔
247

84✔
248
                q.log.Debugf("SessionQueue(%s) stopped", q.ID())
84✔
249
        })
250

251
        return returnErr
84✔
252
}
253

254
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
255
// identify this a particular queue.
256
func (q *sessionQueue) ID() *wtdb.SessionID {
4,264✔
257
        return &q.cfg.ClientSession.ID
4,264✔
258
}
4,264✔
259

260
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
261
// tower. The session will only be accepted if the queue is not already
262
// exhausted or shutting down and the task is successfully bound to the
263
// ClientSession.
264
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
484✔
265
        // Exit early if the queue has started shutting down.
484✔
266
        select {
484✔
267
        case <-q.quit:
×
268
                return sessionQueueShuttingDown, false
×
269
        default:
484✔
270
        }
271

272
        q.queueCond.L.Lock()
484✔
273

484✔
274
        // There is a chance that sessionQueue started shutting down between
484✔
275
        // the last quit channel check and waiting for the lock. So check one
484✔
276
        // more time here.
484✔
277
        select {
484✔
278
        case <-q.quit:
×
279
                q.queueCond.L.Unlock()
×
280
                return sessionQueueShuttingDown, false
×
281
        default:
484✔
282
        }
283

284
        numPending := uint32(q.pendingQueue.Len())
484✔
285
        maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
484✔
286
        q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
484✔
287
                "pending=%d max-updates=%d",
484✔
288
                q.ID(), task.id, q.seqNum, numPending, maxUpdates)
484✔
289

484✔
290
        // Examine the current reserve status of the session queue.
484✔
291
        curStatus := q.status()
484✔
292

484✔
293
        switch curStatus {
484✔
294

295
        // The session queue is exhausted, and cannot accept the task because it
296
        // is full. Reject the task such that it can be tried against a
297
        // different session.
298
        case sessionQueueExhausted:
1✔
299
                q.queueCond.L.Unlock()
1✔
300
                return curStatus, false
1✔
301

302
        // The session queue is not exhausted. Compute the sweep and reward
303
        // outputs as a function of the session parameters. If the outputs are
304
        // dusty or uneconomical to backup, the task is rejected and will not be
305
        // tried again.
306
        //
307
        // TODO(conner): queue backups and retry with different session params.
308
        case sessionQueueAvailable:
483✔
309
                err := task.bindSession(
483✔
310
                        &q.cfg.ClientSession.ClientSessionBody,
483✔
311
                        q.cfg.BuildBreachRetribution,
483✔
312
                )
483✔
313
                if err != nil {
488✔
314
                        q.queueCond.L.Unlock()
5✔
315
                        q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
5✔
316
                                q.ID(), task.id, err)
5✔
317
                        return curStatus, false
5✔
318
                }
5✔
319
        }
320

321
        // The sweep and reward outputs satisfy the session's policy, queue the
322
        // task for final signing and delivery.
323
        q.pendingQueue.PushBack(task)
478✔
324

478✔
325
        // Finally, compute the session's *new* reserve status. This will be
478✔
326
        // used by the client to determine if it can continue using this session
478✔
327
        // queue, or if it should negotiate a new one.
478✔
328
        newStatus := q.status()
478✔
329
        q.queueCond.L.Unlock()
478✔
330

478✔
331
        q.queueCond.Signal()
478✔
332

478✔
333
        return newStatus, true
478✔
334
}
335

336
// sessionManager is the primary event loop for the sessionQueue, and is
337
// responsible for encrypting and sending accepted tasks to the tower.
338
func (q *sessionQueue) sessionManager() {
84✔
339
        defer q.wg.Done()
84✔
340

84✔
341
        for {
493✔
342
                q.queueCond.L.Lock()
409✔
343
                for q.commitQueue.Len() == 0 &&
409✔
344
                        q.pendingQueue.Len() == 0 {
516✔
345

107✔
346
                        q.queueCond.Wait()
107✔
347

107✔
348
                        select {
107✔
349
                        case <-q.quit:
77✔
350
                                q.queueCond.L.Unlock()
77✔
351
                                return
77✔
352
                        default:
34✔
353
                        }
354
                }
355
                q.queueCond.L.Unlock()
336✔
356

336✔
357
                // Exit immediately if the sessionQueue has been stopped.
336✔
358
                select {
336✔
359
                case <-q.quit:
7✔
360
                        return
7✔
361
                default:
329✔
362
                }
363

364
                // Initiate a new connection to the watchtower and attempt to
365
                // drain all pending tasks.
366
                q.drainBackups()
329✔
367
        }
368
}
369

370
// drainBackups attempts to send all pending updates in the queue to the tower.
371
func (q *sessionQueue) drainBackups() {
329✔
372
        var (
329✔
373
                conn      wtserver.Peer
329✔
374
                err       error
329✔
375
                towerAddr = q.tower.Addresses.Peek()
329✔
376
        )
329✔
377

329✔
378
        for {
658✔
379
                q.log.Infof("SessionQueue(%s) attempting to dial tower at %v",
329✔
380
                        q.ID(), towerAddr)
329✔
381

329✔
382
                // First, check that we are able to dial this session's tower.
329✔
383
                conn, err = q.cfg.Dial(
329✔
384
                        q.cfg.ClientSession.SessionKeyECDH, &lnwire.NetAddress{
329✔
385
                                IdentityKey: q.tower.IdentityKey,
329✔
386
                                Address:     towerAddr,
329✔
387
                        },
329✔
388
                )
329✔
389
                if err != nil {
336✔
390
                        // If there are more addrs available, immediately try
7✔
391
                        // those.
7✔
392
                        nextAddr, iteratorErr := q.tower.Addresses.Next()
7✔
393
                        if iteratorErr == nil {
7✔
394
                                towerAddr = nextAddr
×
395
                                continue
×
396
                        }
397

398
                        // Otherwise, if we have exhausted the address list,
399
                        // back off and try again later.
400
                        q.tower.Addresses.Reset()
7✔
401

7✔
402
                        q.log.Errorf("SessionQueue(%s) unable to dial tower "+
7✔
403
                                "at any available Addresses: %v", q.ID(), err)
7✔
404

7✔
405
                        q.increaseBackoff()
7✔
406
                        select {
7✔
407
                        case <-time.After(q.retryBackoff):
6✔
408
                        case <-q.quit:
1✔
409
                        }
410
                        return
7✔
411
                }
412

413
                break
322✔
414
        }
415
        defer conn.Close()
322✔
416

322✔
417
        // Begin draining the queue of pending state updates. Before the first
322✔
418
        // update is sent, we will precede it with an Init message. If the first
322✔
419
        // is successful, subsequent updates can be streamed without sending an
322✔
420
        // Init.
322✔
421
        for sendInit := true; ; sendInit = false {
1,008✔
422
                // Generate the next state update to upload to the tower. This
686✔
423
                // method will first proceed in dequeuing committed updates
686✔
424
                // before attempting to dequeue any pending updates.
686✔
425
                stateUpdate, isPending, backupID, err := q.nextStateUpdate()
686✔
426
                if err != nil {
686✔
427
                        q.log.Errorf("SessionQueue(%v) unable to get next "+
×
428
                                "state update: %v", q.ID(), err)
×
429
                        return
×
430
                }
×
431

432
                // Now, send the state update to the tower and wait for a reply.
433
                err = q.sendStateUpdate(conn, stateUpdate, sendInit, isPending)
686✔
434
                if err != nil {
904✔
435
                        q.log.Errorf("SessionQueue(%s) unable to send state "+
218✔
436
                                "update: %v", q.ID(), err)
218✔
437

218✔
438
                        q.increaseBackoff()
218✔
439
                        select {
218✔
440
                        case <-time.After(q.retryBackoff):
212✔
441
                        case <-q.quit:
6✔
442
                        }
443
                        return
218✔
444
                }
445

446
                q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
468✔
447
                        q.ID(), backupID, stateUpdate.SeqNum)
468✔
448

468✔
449
                // If the last task was backed up successfully, we'll exit and
468✔
450
                // continue once more tasks are added to the queue. We'll also
468✔
451
                // clear any accumulated backoff as this batch was able to be
468✔
452
                // sent reliably.
468✔
453
                if stateUpdate.IsComplete == 1 {
572✔
454
                        q.resetBackoff()
104✔
455
                        return
104✔
456
                }
104✔
457

458
                // Always apply a small delay between sends, which makes the
459
                // unit tests more reliable. If we were requested to back off,
460
                // when we will do so.
461
                select {
368✔
462
                case <-time.After(time.Millisecond):
368✔
463
                case <-q.quit:
×
464
                        return
×
465
                }
466
        }
467
}
468

469
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
470
// If any committed updates are present, this method will reconstruct the state
471
// update from the committed update using the current last applied value found
472
// in the database. Otherwise, it will select the next pending update, craft the
473
// payload, and commit an update before returning the state update to send. The
474
// boolean value in the response is true if the state update is taken from the
475
// pending queue, allowing the caller to remove the update from either the
476
// commit or pending queue if the update is successfully acked.
477
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
478
        wtdb.BackupID, error) {
686✔
479

686✔
480
        var (
686✔
481
                seqNum    uint16
686✔
482
                update    wtdb.CommittedUpdate
686✔
483
                isLast    bool
686✔
484
                isPending bool
686✔
485
        )
686✔
486

686✔
487
        q.queueCond.L.Lock()
686✔
488
        switch {
686✔
489

490
        // If the commit queue is non-empty, parse the next committed update.
491
        case q.commitQueue.Len() > 0:
2✔
492
                next := q.commitQueue.Front()
2✔
493

2✔
494
                update = next.Value.(wtdb.CommittedUpdate)
2✔
495
                seqNum = update.SeqNum
2✔
496

2✔
497
                // If this is the last item in the commit queue and no items
2✔
498
                // exist in the pending queue, we will use the IsComplete flag
2✔
499
                // in the StateUpdate to signal that the tower can release the
2✔
500
                // connection after replying to free up resources.
2✔
501
                isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
2✔
502
                q.queueCond.L.Unlock()
2✔
503

2✔
504
                q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
2✔
505
                        "update for %v seqnum=%d",
2✔
506
                        q.ID(), update.BackupID, seqNum)
2✔
507

508
        // Otherwise, craft and commit the next update from the pending queue.
509
        default:
684✔
510
                isPending = true
684✔
511

684✔
512
                // Determine the current sequence number to apply for this
684✔
513
                // pending update.
684✔
514
                seqNum = q.seqNum + 1
684✔
515

684✔
516
                // Obtain the next task from the queue.
684✔
517
                next := q.pendingQueue.Front()
684✔
518
                task := next.Value.(*backupTask)
684✔
519

684✔
520
                // If this is the last item in the pending queue, we will use
684✔
521
                // the IsComplete flag in the StateUpdate to signal that the
684✔
522
                // tower can release the connection after replying to free up
684✔
523
                // resources.
684✔
524
                isLast = q.pendingQueue.Len() == 1
684✔
525
                q.queueCond.L.Unlock()
684✔
526

684✔
527
                hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
684✔
528
                if err != nil {
684✔
529
                        // TODO(conner): mark will not send
×
530
                        err := fmt.Errorf("unable to craft session payload: %w",
×
531
                                err)
×
532
                        return nil, false, wtdb.BackupID{}, err
×
533
                }
×
534
                // TODO(conner): special case other obscure errors
535

536
                update = wtdb.CommittedUpdate{
684✔
537
                        SeqNum: seqNum,
684✔
538
                        CommittedUpdateBody: wtdb.CommittedUpdateBody{
684✔
539
                                BackupID:      task.id,
684✔
540
                                Hint:          hint,
684✔
541
                                EncryptedBlob: encBlob,
684✔
542
                        },
684✔
543
                }
684✔
544

684✔
545
                q.log.Debugf("SessionQueue(%s) committing state update "+
684✔
546
                        "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
684✔
547
        }
548

549
        // Before sending the task to the tower, commit the state update
550
        // to disk using the assigned sequence number. If this task has already
551
        // been committed, the call will succeed and only be used for the
552
        // purpose of obtaining the last applied value to send to the tower.
553
        //
554
        // This step ensures that if we crash before receiving an ack that we
555
        // will retransmit the same update. If the tower successfully received
556
        // the update from before, it will reply with an ACK regardless of what
557
        // we send the next time. This step ensures that if we reliably send the
558
        // same update for a given sequence number, to prevent us from thinking
559
        // we backed up a state when we instead backed up another.
560
        lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
686✔
561
        if err != nil {
686✔
562
                // TODO(conner): mark failed/reschedule
×
563
                err := fmt.Errorf("unable to commit state update for "+
×
564
                        "%v seqnum=%d: %v", update.BackupID, seqNum, err)
×
565
                return nil, false, wtdb.BackupID{}, err
×
566
        }
×
567

568
        stateUpdate := &wtwire.StateUpdate{
686✔
569
                SeqNum:        update.SeqNum,
686✔
570
                LastApplied:   lastApplied,
686✔
571
                Hint:          update.Hint,
686✔
572
                EncryptedBlob: update.EncryptedBlob,
686✔
573
        }
686✔
574

686✔
575
        // Set the IsComplete flag if this is the last queued item.
686✔
576
        if isLast {
815✔
577
                stateUpdate.IsComplete = 1
129✔
578
        }
129✔
579

580
        return stateUpdate, isPending, update.BackupID, nil
686✔
581
}
582

583
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
584
// the ACK before returning. If sendInit is true, this method will first send
585
// the localInit message and verify that the tower supports our required feature
586
// bits. And error is returned if any part of the send fails. The boolean return
587
// variable indicates whether we should back off before attempting to send the
588
// next state update.
589
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
590
        stateUpdate *wtwire.StateUpdate, sendInit, isPending bool) error {
686✔
591

686✔
592
        towerAddr := &lnwire.NetAddress{
686✔
593
                IdentityKey: conn.RemotePub(),
686✔
594
                Address:     conn.RemoteAddr(),
686✔
595
        }
686✔
596

686✔
597
        // If this is the first message being sent to the tower, we must send an
686✔
598
        // Init message to establish that server supports the features we
686✔
599
        // require.
686✔
600
        if sendInit {
1,008✔
601
                // Send Init to tower.
322✔
602
                err := q.cfg.SendMessage(conn, q.localInit)
322✔
603
                if err != nil {
322✔
UNCOV
604
                        return err
×
UNCOV
605
                }
×
606

607
                // Receive Init from tower.
608
                remoteMsg, err := q.cfg.ReadMessage(conn)
322✔
609
                if err != nil {
322✔
610
                        return err
×
611
                }
×
612

613
                remoteInit, ok := remoteMsg.(*wtwire.Init)
322✔
614
                if !ok {
322✔
615
                        return fmt.Errorf("watchtower %s responded with %T "+
×
616
                                "to Init", towerAddr, remoteMsg)
×
617
                }
×
618

619
                // Validate Init.
620
                err = q.localInit.CheckRemoteInit(
322✔
621
                        remoteInit, wtwire.FeatureNames,
322✔
622
                )
322✔
623
                if err != nil {
322✔
624
                        return err
×
625
                }
×
626
        }
627

628
        // Send StateUpdate to tower.
629
        err := q.cfg.SendMessage(conn, stateUpdate)
686✔
630
        if err != nil {
686✔
631
                return err
×
632
        }
×
633

634
        // Receive StateUpdate from tower.
635
        remoteMsg, err := q.cfg.ReadMessage(conn)
686✔
636
        if err != nil {
904✔
637
                return err
218✔
638
        }
218✔
639

640
        stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
468✔
641
        if !ok {
468✔
642
                return fmt.Errorf("watchtower %s responded with %T to "+
×
643
                        "StateUpdate", towerAddr, remoteMsg)
×
644
        }
×
645

646
        // Process the reply from the tower.
647
        switch stateUpdateReply.Code {
468✔
648

649
        // The tower reported a successful update, validate the response and
650
        // record the last applied returned.
651
        case wtwire.CodeOK:
468✔
652

653
        // TODO(conner): handle other error cases properly, ban towers, etc.
654
        default:
×
655
                err := fmt.Errorf("received error code %v in "+
×
656
                        "StateUpdateReply for seqnum=%d",
×
657
                        stateUpdateReply.Code, stateUpdate.SeqNum)
×
658
                q.log.Warnf("SessionQueue(%s) unable to upload state update "+
×
659
                        "to tower=%s: %v", q.ID(), towerAddr, err)
×
660
                return err
×
661
        }
662

663
        lastApplied := stateUpdateReply.LastApplied
468✔
664
        err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
468✔
665
        switch {
468✔
666
        case err == wtdb.ErrUnallocatedLastApplied:
×
667
                // TODO(conner): borked watchtower
×
668
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
669
                        stateUpdate.SeqNum, err)
×
670
                q.log.Errorf("SessionQueue(%v) failed to ack update: %v",
×
671
                        q.ID(), err)
×
672
                return err
×
673

674
        case err == wtdb.ErrLastAppliedReversion:
×
675
                // TODO(conner): borked watchtower
×
676
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
677
                        stateUpdate.SeqNum, err)
×
678
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
679
                        q.ID(), err)
×
680
                return err
×
681

682
        case err != nil:
×
683
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
684
                        stateUpdate.SeqNum, err)
×
685
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
686
                        q.ID(), err)
×
687
                return err
×
688
        }
689

690
        q.queueCond.L.Lock()
468✔
691
        if isPending {
936✔
692
                // If a pending update was successfully sent, increment the
468✔
693
                // sequence number and remove the item from the queue. This
468✔
694
                // ensures the total number of backups in the session remains
468✔
695
                // unchanged, which maintains the external view of the session's
468✔
696
                // reserve status.
468✔
697
                q.seqNum++
468✔
698
                q.pendingQueue.Remove(q.pendingQueue.Front())
468✔
699
        } else {
468✔
700
                // Otherwise, simply remove the update from the committed queue.
×
701
                // This has no effect on the queues reserve status since the
×
702
                // update had already been committed.
×
703
                q.commitQueue.Remove(q.commitQueue.Front())
×
704
        }
×
705
        q.queueCond.L.Unlock()
468✔
706

468✔
707
        return nil
468✔
708
}
709

710
// status returns a sessionQueueStatus indicating whether the sessionQueue can
711
// accept another task. sessionQueueAvailable is returned when a task can be
712
// accepted, and sessionQueueExhausted is returned if the all slots in the
713
// session have been allocated.
714
//
715
// NOTE: This method MUST be called with queueCond's exclusive lock held.
716
func (q *sessionQueue) status() sessionQueueStatus {
958✔
717
        numPending := uint32(q.pendingQueue.Len())
958✔
718
        maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
958✔
719

958✔
720
        if uint32(q.seqNum)+numPending < maxUpdates {
1,872✔
721
                return sessionQueueAvailable
914✔
722
        }
914✔
723

724
        return sessionQueueExhausted
48✔
725

726
}
727

728
// resetBackoff returns the connection backoff the minimum configured backoff.
729
func (q *sessionQueue) resetBackoff() {
104✔
730
        q.retryBackoff = q.cfg.MinBackoff
104✔
731
}
104✔
732

733
// increaseBackoff doubles the current connection backoff, clamping to the
734
// configured maximum backoff if it would exceed the limit.
735
func (q *sessionQueue) increaseBackoff() {
225✔
736
        q.retryBackoff *= 2
225✔
737
        if q.retryBackoff > q.cfg.MaxBackoff {
225✔
738
                q.retryBackoff = q.cfg.MaxBackoff
×
739
        }
×
740
}
741

742
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
743
// sessionQueue.
744
type sessionQueueSet struct {
745
        queues map[wtdb.SessionID]*sessionQueue
746
        mu     sync.Mutex
747
}
748

749
// newSessionQueueSet constructs a new sessionQueueSet.
750
func newSessionQueueSet() *sessionQueueSet {
40✔
751
        return &sessionQueueSet{
40✔
752
                queues: make(map[wtdb.SessionID]*sessionQueue),
40✔
753
        }
40✔
754
}
40✔
755

756
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
757
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
84✔
758
        s.mu.Lock()
84✔
759
        defer s.mu.Unlock()
84✔
760

84✔
761
        s.queues[*sessionQueue.ID()] = sessionQueue
84✔
762

84✔
763
        sessionQueue.Start()
84✔
764
}
84✔
765

766
// StopAndRemove stops the given session queue and removes it from the
767
// sessionQueueSet.
768
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID, final bool) error {
21✔
769
        s.mu.Lock()
21✔
770
        defer s.mu.Unlock()
21✔
771

21✔
772
        queue, ok := s.queues[id]
21✔
773
        if !ok {
28✔
774
                return nil
7✔
775
        }
7✔
776

777
        delete(s.queues, id)
18✔
778

18✔
779
        return queue.Stop(final)
18✔
780
}
781

782
// Get fetches and returns the sessionQueue with the given ID.
783
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
87✔
784
        s.mu.Lock()
87✔
785
        defer s.mu.Unlock()
87✔
786

87✔
787
        q, ok := s.queues[id]
87✔
788

87✔
789
        return q, ok
87✔
790
}
87✔
791

792
// ApplyAndWait executes the nil-adic function returned from getApply for each
793
// sessionQueue in the set in parallel, then waits for all of them to finish
794
// before returning to the caller.
795
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
40✔
796
        s.mu.Lock()
40✔
797
        defer s.mu.Unlock()
40✔
798

40✔
799
        var wg sync.WaitGroup
40✔
800
        for _, sessionq := range s.queues {
110✔
801
                wg.Add(1)
70✔
802
                go func(sq *sessionQueue) {
140✔
803
                        defer wg.Done()
70✔
804

70✔
805
                        getApply(sq)()
70✔
806
                }(sessionq)
70✔
807
        }
808
        wg.Wait()
40✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc