• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.9
/watchtower/wtclient/session_queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/input"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
16
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
17
)
18

19
// sessionQueueStatus is an enum that signals how full a particular session is.
20
type sessionQueueStatus uint8
21

22
const (
23
        // sessionQueueAvailable indicates that the session has space for at
24
        // least one more backup.
25
        sessionQueueAvailable sessionQueueStatus = iota
26

27
        // sessionQueueExhausted indicates that all slots in the session have
28
        // been allocated.
29
        sessionQueueExhausted
30

31
        // sessionQueueShuttingDown indicates that the session queue is
32
        // shutting down and so is no longer accepting any more backups.
33
        sessionQueueShuttingDown
34
)
35

36
// sessionQueueConfig bundles the resources required by the sessionQueue to
37
// perform its duties. All entries MUST be non-nil.
38
type sessionQueueConfig struct {
39
        // ClientSession provides access to the negotiated session parameters
40
        // and updating its persistent storage.
41
        ClientSession *ClientSession
42

43
        // ChainHash identifies the chain for which the session's justice
44
        // transactions are targeted.
45
        ChainHash chainhash.Hash
46

47
        // Dial allows the client to dial the tower using it's public key and
48
        // net address.
49
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
50
                error)
51

52
        // SendMessage encodes, encrypts, and writes a message to the given
53
        // peer.
54
        SendMessage func(wtserver.Peer, wtwire.Message) error
55

56
        // ReadMessage receives, decypts, and decodes a message from the given
57
        // peer.
58
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
59

60
        // Signer facilitates signing of inputs, used to construct the witnesses
61
        // for justice transaction inputs.
62
        Signer input.Signer
63

64
        // BuildBreachRetribution is a function closure that allows the client
65
        // to fetch the breach retribution info for a certain channel at a
66
        // certain revoked commitment height.
67
        BuildBreachRetribution BreachRetributionBuilder
68

69
        // TaskPipeline is a pipeline which the sessionQueue should use to send
70
        // any unhandled tasks on shutdown of the queue.
71
        TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
72

73
        // DB provides access to the client's stable storage.
74
        DB DB
75

76
        // MinBackoff defines the initial backoff applied by the session
77
        // queue before reconnecting to the tower after a failed or partially
78
        // successful batch is sent. Subsequent backoff durations will grow
79
        // exponentially up until MaxBackoff.
80
        MinBackoff time.Duration
81

82
        // MaxBackoff defines the maximum backoff applied by the session
83
        // queue before reconnecting to the tower after a failed or partially
84
        // successful batch is sent. If the exponential backoff produces a
85
        // timeout greater than this value, the backoff duration will be clamped
86
        // to MaxBackoff.
87
        MaxBackoff time.Duration
88

89
        // Log specifies the desired log output, which should be prefixed by the
90
        // client type, e.g. anchor or legacy.
91
        Log btclog.Logger
92
}
93

94
// sessionQueue implements a reliable queue that will encrypt and send accepted
95
// backups to the watchtower specified in the config's ClientSession. Calling
96
// Stop will attempt to perform a clean shutdown replaying any un-committed
97
// pending updates to the client's main task pipeline.
98
type sessionQueue struct {
99
        started sync.Once
100
        stopped sync.Once
101
        forced  sync.Once
102

103
        cfg *sessionQueueConfig
104
        log btclog.Logger
105

106
        commitQueue  *list.List
107
        pendingQueue *list.List
108
        queueMtx     sync.Mutex
109
        queueCond    *sync.Cond
110

111
        localInit *wtwire.Init
112
        tower     *Tower
113

114
        seqNum uint16
115

116
        retryBackoff time.Duration
117

118
        quit chan struct{}
119
        wg   sync.WaitGroup
120
}
121

122
// newSessionQueue initializes a fresh sessionQueue.
123
func newSessionQueue(cfg *sessionQueueConfig,
124
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
125

3✔
126
        localInit := wtwire.NewInitMessage(
3✔
127
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
3✔
128
                cfg.ChainHash,
3✔
129
        )
3✔
130

3✔
131
        sq := &sessionQueue{
3✔
132
                cfg:          cfg,
3✔
133
                log:          cfg.Log,
3✔
134
                commitQueue:  list.New(),
3✔
135
                pendingQueue: list.New(),
3✔
136
                localInit:    localInit,
3✔
137
                tower:        cfg.ClientSession.Tower,
3✔
138
                seqNum:       cfg.ClientSession.SeqNum,
3✔
139
                retryBackoff: cfg.MinBackoff,
3✔
140
                quit:         make(chan struct{}),
3✔
141
        }
3✔
142
        sq.queueCond = sync.NewCond(&sq.queueMtx)
3✔
143

3✔
144
        // The database should return them in sorted order, and session queue's
3✔
145
        // sequence number will be equal to that of the last committed update.
3✔
146
        for _, update := range updates {
3✔
UNCOV
147
                sq.commitQueue.PushBack(update)
×
UNCOV
148
        }
×
149

150
        return sq
3✔
151
}
152

153
// Start idempotently starts the sessionQueue so that it can begin accepting
154
// backups.
155
func (q *sessionQueue) Start() {
3✔
156
        q.started.Do(func() {
6✔
157
                q.wg.Add(1)
3✔
158
                go q.sessionManager()
3✔
159
        })
3✔
160
}
161

162
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
163
// will clear all pending tasks in the queue before returning to the caller.
164
// The final param should only be set to true if this is the last time that
165
// this session will be used. Otherwise, during normal shutdown, the final param
166
// should be false.
167
func (q *sessionQueue) Stop(final bool) error {
3✔
168
        var returnErr error
3✔
169
        q.stopped.Do(func() {
6✔
170
                q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
3✔
171

3✔
172
                close(q.quit)
3✔
173

3✔
174
                shutdown := make(chan struct{})
3✔
175
                go func() {
6✔
176
                        for {
6✔
177
                                select {
3✔
178
                                case <-time.After(time.Millisecond):
3✔
179
                                        q.queueCond.Signal()
3✔
180
                                case <-shutdown:
3✔
181
                                        return
3✔
182
                                }
183
                        }
184
                }()
185

186
                q.wg.Wait()
3✔
187
                close(shutdown)
3✔
188

3✔
189
                // Now, for any task in the pending queue that we have not yet
3✔
190
                // created a CommittedUpdate for, re-add the task to the main
3✔
191
                // task pipeline.
3✔
192
                updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
3✔
193
                if err != nil {
3✔
194
                        returnErr = err
×
195
                        return
×
196
                }
×
197

198
                unAckedUpdates := make(map[wtdb.BackupID]bool)
3✔
199
                for _, update := range updates {
3✔
UNCOV
200
                        unAckedUpdates[update.BackupID] = true
×
UNCOV
201

×
UNCOV
202
                        if !final {
×
UNCOV
203
                                continue
×
204
                        }
205

UNCOV
206
                        err := q.cfg.TaskPipeline.QueueBackupID(
×
UNCOV
207
                                &update.BackupID,
×
UNCOV
208
                        )
×
UNCOV
209
                        if err != nil {
×
210
                                log.Errorf("could not re-queue %s: %v",
×
211
                                        update.BackupID, err)
×
212
                                continue
×
213
                        }
214
                }
215

216
                if final {
6✔
217
                        err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
3✔
218
                        if err != nil {
3✔
219
                                log.Errorf("could not delete committed "+
×
220
                                        "updates for session %s", q.ID())
×
221
                        }
×
222
                }
223

224
                // Push any task that was on the pending queue that there is
225
                // not yet a committed update for back to the main task
226
                // pipeline.
227
                q.queueCond.L.Lock()
3✔
228
                for q.pendingQueue.Len() > 0 {
3✔
UNCOV
229
                        next := q.pendingQueue.Front()
×
UNCOV
230
                        q.pendingQueue.Remove(next)
×
UNCOV
231

×
UNCOV
232
                        //nolint:forcetypeassert
×
UNCOV
233
                        task := next.Value.(*backupTask)
×
UNCOV
234

×
UNCOV
235
                        if unAckedUpdates[task.id] {
×
UNCOV
236
                                continue
×
237
                        }
238

UNCOV
239
                        err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
×
UNCOV
240
                        if err != nil {
×
241
                                log.Errorf("could not re-queue backup task: "+
×
242
                                        "%v", err)
×
243
                                continue
×
244
                        }
245
                }
246
                q.queueCond.L.Unlock()
3✔
247

3✔
248
                q.log.Debugf("SessionQueue(%s) stopped", q.ID())
3✔
249
        })
250

251
        return returnErr
3✔
252
}
253

254
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
255
// identify this a particular queue.
256
func (q *sessionQueue) ID() *wtdb.SessionID {
3✔
257
        return &q.cfg.ClientSession.ID
3✔
258
}
3✔
259

260
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
261
// tower. The session will only be accepted if the queue is not already
262
// exhausted or shutting down and the task is successfully bound to the
263
// ClientSession.
264
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
3✔
265
        // Exit early if the queue has started shutting down.
3✔
266
        select {
3✔
267
        case <-q.quit:
×
268
                return sessionQueueShuttingDown, false
×
269
        default:
3✔
270
        }
271

272
        q.queueCond.L.Lock()
3✔
273

3✔
274
        // There is a chance that sessionQueue started shutting down between
3✔
275
        // the last quit channel check and waiting for the lock. So check one
3✔
276
        // more time here.
3✔
277
        select {
3✔
278
        case <-q.quit:
×
279
                q.queueCond.L.Unlock()
×
280
                return sessionQueueShuttingDown, false
×
281
        default:
3✔
282
        }
283

284
        numPending := uint32(q.pendingQueue.Len())
3✔
285
        maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
3✔
286
        q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
3✔
287
                "pending=%d max-updates=%d",
3✔
288
                q.ID(), task.id, q.seqNum, numPending, maxUpdates)
3✔
289

3✔
290
        // Examine the current reserve status of the session queue.
3✔
291
        curStatus := q.status()
3✔
292

3✔
293
        switch curStatus {
3✔
294

295
        // The session queue is exhausted, and cannot accept the task because it
296
        // is full. Reject the task such that it can be tried against a
297
        // different session.
UNCOV
298
        case sessionQueueExhausted:
×
UNCOV
299
                q.queueCond.L.Unlock()
×
UNCOV
300
                return curStatus, false
×
301

302
        // The session queue is not exhausted. Compute the sweep and reward
303
        // outputs as a function of the session parameters. If the outputs are
304
        // dusty or uneconomical to backup, the task is rejected and will not be
305
        // tried again.
306
        //
307
        // TODO(conner): queue backups and retry with different session params.
308
        case sessionQueueAvailable:
3✔
309
                err := task.bindSession(
3✔
310
                        &q.cfg.ClientSession.ClientSessionBody,
3✔
311
                        q.cfg.BuildBreachRetribution,
3✔
312
                )
3✔
313
                if err != nil {
3✔
UNCOV
314
                        q.queueCond.L.Unlock()
×
UNCOV
315
                        q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
×
UNCOV
316
                                q.ID(), task.id, err)
×
UNCOV
317
                        return curStatus, false
×
UNCOV
318
                }
×
319
        }
320

321
        // The sweep and reward outputs satisfy the session's policy, queue the
322
        // task for final signing and delivery.
323
        q.pendingQueue.PushBack(task)
3✔
324

3✔
325
        // Finally, compute the session's *new* reserve status. This will be
3✔
326
        // used by the client to determine if it can continue using this session
3✔
327
        // queue, or if it should negotiate a new one.
3✔
328
        newStatus := q.status()
3✔
329
        q.queueCond.L.Unlock()
3✔
330

3✔
331
        q.queueCond.Signal()
3✔
332

3✔
333
        return newStatus, true
3✔
334
}
335

336
// sessionManager is the primary event loop for the sessionQueue, and is
337
// responsible for encrypting and sending accepted tasks to the tower.
338
func (q *sessionQueue) sessionManager() {
3✔
339
        defer q.wg.Done()
3✔
340

3✔
341
        for {
6✔
342
                q.queueCond.L.Lock()
3✔
343
                for q.commitQueue.Len() == 0 &&
3✔
344
                        q.pendingQueue.Len() == 0 {
6✔
345

3✔
346
                        q.queueCond.Wait()
3✔
347

3✔
348
                        select {
3✔
349
                        case <-q.quit:
3✔
350
                                q.queueCond.L.Unlock()
3✔
351
                                return
3✔
352
                        default:
3✔
353
                        }
354
                }
355
                q.queueCond.L.Unlock()
3✔
356

3✔
357
                // Exit immediately if the sessionQueue has been stopped.
3✔
358
                select {
3✔
UNCOV
359
                case <-q.quit:
×
UNCOV
360
                        return
×
361
                default:
3✔
362
                }
363

364
                // Initiate a new connection to the watchtower and attempt to
365
                // drain all pending tasks.
366
                q.drainBackups()
3✔
367
        }
368
}
369

370
// drainBackups attempts to send all pending updates in the queue to the tower.
371
func (q *sessionQueue) drainBackups() {
3✔
372
        var (
3✔
373
                conn      wtserver.Peer
3✔
374
                err       error
3✔
375
                towerAddr = q.tower.Addresses.Peek()
3✔
376
        )
3✔
377

3✔
378
        for {
6✔
379
                q.log.Infof("SessionQueue(%s) attempting to dial tower at %v",
3✔
380
                        q.ID(), towerAddr)
3✔
381

3✔
382
                // First, check that we are able to dial this session's tower.
3✔
383
                conn, err = q.cfg.Dial(
3✔
384
                        q.cfg.ClientSession.SessionKeyECDH, &lnwire.NetAddress{
3✔
385
                                IdentityKey: q.tower.IdentityKey,
3✔
386
                                Address:     towerAddr,
3✔
387
                        },
3✔
388
                )
3✔
389
                if err != nil {
3✔
UNCOV
390
                        // If there are more addrs available, immediately try
×
UNCOV
391
                        // those.
×
UNCOV
392
                        nextAddr, iteratorErr := q.tower.Addresses.Next()
×
UNCOV
393
                        if iteratorErr == nil {
×
394
                                towerAddr = nextAddr
×
395
                                continue
×
396
                        }
397

398
                        // Otherwise, if we have exhausted the address list,
399
                        // back off and try again later.
UNCOV
400
                        q.tower.Addresses.Reset()
×
UNCOV
401

×
UNCOV
402
                        q.log.Errorf("SessionQueue(%s) unable to dial tower "+
×
UNCOV
403
                                "at any available Addresses: %v", q.ID(), err)
×
UNCOV
404

×
UNCOV
405
                        q.increaseBackoff()
×
UNCOV
406
                        select {
×
UNCOV
407
                        case <-time.After(q.retryBackoff):
×
UNCOV
408
                        case <-q.quit:
×
409
                        }
UNCOV
410
                        return
×
411
                }
412

413
                break
3✔
414
        }
415
        defer conn.Close()
3✔
416

3✔
417
        // Begin draining the queue of pending state updates. Before the first
3✔
418
        // update is sent, we will precede it with an Init message. If the first
3✔
419
        // is successful, subsequent updates can be streamed without sending an
3✔
420
        // Init.
3✔
421
        for sendInit := true; ; sendInit = false {
6✔
422
                // Generate the next state update to upload to the tower. This
3✔
423
                // method will first proceed in dequeuing committed updates
3✔
424
                // before attempting to dequeue any pending updates.
3✔
425
                stateUpdate, isPending, backupID, err := q.nextStateUpdate()
3✔
426
                if err != nil {
3✔
427
                        q.log.Errorf("SessionQueue(%v) unable to get next "+
×
428
                                "state update: %v", q.ID(), err)
×
429
                        return
×
430
                }
×
431

432
                // Now, send the state update to the tower and wait for a reply.
433
                err = q.sendStateUpdate(conn, stateUpdate, sendInit, isPending)
3✔
434
                if err != nil {
3✔
UNCOV
435
                        q.log.Errorf("SessionQueue(%s) unable to send state "+
×
UNCOV
436
                                "update: %v", q.ID(), err)
×
UNCOV
437

×
UNCOV
438
                        q.increaseBackoff()
×
UNCOV
439
                        select {
×
UNCOV
440
                        case <-time.After(q.retryBackoff):
×
UNCOV
441
                        case <-q.quit:
×
442
                        }
UNCOV
443
                        return
×
444
                }
445

446
                q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
3✔
447
                        q.ID(), backupID, stateUpdate.SeqNum)
3✔
448

3✔
449
                // If the last task was backed up successfully, we'll exit and
3✔
450
                // continue once more tasks are added to the queue. We'll also
3✔
451
                // clear any accumulated backoff as this batch was able to be
3✔
452
                // sent reliably.
3✔
453
                if stateUpdate.IsComplete == 1 {
6✔
454
                        q.resetBackoff()
3✔
455
                        return
3✔
456
                }
3✔
457

458
                // Always apply a small delay between sends, which makes the
459
                // unit tests more reliable. If we were requested to back off,
460
                // when we will do so.
461
                select {
3✔
462
                case <-time.After(time.Millisecond):
3✔
463
                case <-q.quit:
×
464
                        return
×
465
                }
466
        }
467
}
468

469
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
470
// If any committed updates are present, this method will reconstruct the state
471
// update from the committed update using the current last applied value found
472
// in the database. Otherwise, it will select the next pending update, craft the
473
// payload, and commit an update before returning the state update to send. The
474
// boolean value in the response is true if the state update is taken from the
475
// pending queue, allowing the caller to remove the update from either the
476
// commit or pending queue if the update is successfully acked.
477
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
478
        wtdb.BackupID, error) {
3✔
479

3✔
480
        var (
3✔
481
                seqNum    uint16
3✔
482
                update    wtdb.CommittedUpdate
3✔
483
                isLast    bool
3✔
484
                isPending bool
3✔
485
        )
3✔
486

3✔
487
        q.queueCond.L.Lock()
3✔
488
        switch {
3✔
489

490
        // If the commit queue is non-empty, parse the next committed update.
UNCOV
491
        case q.commitQueue.Len() > 0:
×
UNCOV
492
                next := q.commitQueue.Front()
×
UNCOV
493

×
UNCOV
494
                update = next.Value.(wtdb.CommittedUpdate)
×
UNCOV
495
                seqNum = update.SeqNum
×
UNCOV
496

×
UNCOV
497
                // If this is the last item in the commit queue and no items
×
UNCOV
498
                // exist in the pending queue, we will use the IsComplete flag
×
UNCOV
499
                // in the StateUpdate to signal that the tower can release the
×
UNCOV
500
                // connection after replying to free up resources.
×
UNCOV
501
                isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
×
UNCOV
502
                q.queueCond.L.Unlock()
×
UNCOV
503

×
UNCOV
504
                q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
×
UNCOV
505
                        "update for %v seqnum=%d",
×
UNCOV
506
                        q.ID(), update.BackupID, seqNum)
×
507

508
        // Otherwise, craft and commit the next update from the pending queue.
509
        default:
3✔
510
                isPending = true
3✔
511

3✔
512
                // Determine the current sequence number to apply for this
3✔
513
                // pending update.
3✔
514
                seqNum = q.seqNum + 1
3✔
515

3✔
516
                // Obtain the next task from the queue.
3✔
517
                next := q.pendingQueue.Front()
3✔
518
                task := next.Value.(*backupTask)
3✔
519

3✔
520
                // If this is the last item in the pending queue, we will use
3✔
521
                // the IsComplete flag in the StateUpdate to signal that the
3✔
522
                // tower can release the connection after replying to free up
3✔
523
                // resources.
3✔
524
                isLast = q.pendingQueue.Len() == 1
3✔
525
                q.queueCond.L.Unlock()
3✔
526

3✔
527
                hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
3✔
528
                if err != nil {
3✔
529
                        // TODO(conner): mark will not send
×
530
                        err := fmt.Errorf("unable to craft session payload: %w",
×
531
                                err)
×
532
                        return nil, false, wtdb.BackupID{}, err
×
533
                }
×
534
                // TODO(conner): special case other obscure errors
535

536
                update = wtdb.CommittedUpdate{
3✔
537
                        SeqNum: seqNum,
3✔
538
                        CommittedUpdateBody: wtdb.CommittedUpdateBody{
3✔
539
                                BackupID:      task.id,
3✔
540
                                Hint:          hint,
3✔
541
                                EncryptedBlob: encBlob,
3✔
542
                        },
3✔
543
                }
3✔
544

3✔
545
                q.log.Debugf("SessionQueue(%s) committing state update "+
3✔
546
                        "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
3✔
547
        }
548

549
        // Before sending the task to the tower, commit the state update
550
        // to disk using the assigned sequence number. If this task has already
551
        // been committed, the call will succeed and only be used for the
552
        // purpose of obtaining the last applied value to send to the tower.
553
        //
554
        // This step ensures that if we crash before receiving an ack that we
555
        // will retransmit the same update. If the tower successfully received
556
        // the update from before, it will reply with an ACK regardless of what
557
        // we send the next time. This step ensures that if we reliably send the
558
        // same update for a given sequence number, to prevent us from thinking
559
        // we backed up a state when we instead backed up another.
560
        lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
3✔
561
        if err != nil {
3✔
562
                // TODO(conner): mark failed/reschedule
×
563
                err := fmt.Errorf("unable to commit state update for "+
×
564
                        "%v seqnum=%d: %v", update.BackupID, seqNum, err)
×
565
                return nil, false, wtdb.BackupID{}, err
×
566
        }
×
567

568
        stateUpdate := &wtwire.StateUpdate{
3✔
569
                SeqNum:        update.SeqNum,
3✔
570
                LastApplied:   lastApplied,
3✔
571
                Hint:          update.Hint,
3✔
572
                EncryptedBlob: update.EncryptedBlob,
3✔
573
        }
3✔
574

3✔
575
        // Set the IsComplete flag if this is the last queued item.
3✔
576
        if isLast {
6✔
577
                stateUpdate.IsComplete = 1
3✔
578
        }
3✔
579

580
        return stateUpdate, isPending, update.BackupID, nil
3✔
581
}
582

583
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
584
// the ACK before returning. If sendInit is true, this method will first send
585
// the localInit message and verify that the tower supports our required feature
586
// bits. And error is returned if any part of the send fails. The boolean return
587
// variable indicates whether we should back off before attempting to send the
588
// next state update.
589
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
590
        stateUpdate *wtwire.StateUpdate, sendInit, isPending bool) error {
3✔
591

3✔
592
        towerAddr := &lnwire.NetAddress{
3✔
593
                IdentityKey: conn.RemotePub(),
3✔
594
                Address:     conn.RemoteAddr(),
3✔
595
        }
3✔
596

3✔
597
        // If this is the first message being sent to the tower, we must send an
3✔
598
        // Init message to establish that server supports the features we
3✔
599
        // require.
3✔
600
        if sendInit {
6✔
601
                // Send Init to tower.
3✔
602
                err := q.cfg.SendMessage(conn, q.localInit)
3✔
603
                if err != nil {
3✔
604
                        return err
×
605
                }
×
606

607
                // Receive Init from tower.
608
                remoteMsg, err := q.cfg.ReadMessage(conn)
3✔
609
                if err != nil {
3✔
610
                        return err
×
611
                }
×
612

613
                remoteInit, ok := remoteMsg.(*wtwire.Init)
3✔
614
                if !ok {
3✔
615
                        return fmt.Errorf("watchtower %s responded with %T "+
×
616
                                "to Init", towerAddr, remoteMsg)
×
617
                }
×
618

619
                // Validate Init.
620
                err = q.localInit.CheckRemoteInit(
3✔
621
                        remoteInit, wtwire.FeatureNames,
3✔
622
                )
3✔
623
                if err != nil {
3✔
624
                        return err
×
625
                }
×
626
        }
627

628
        // Send StateUpdate to tower.
629
        err := q.cfg.SendMessage(conn, stateUpdate)
3✔
630
        if err != nil {
3✔
631
                return err
×
632
        }
×
633

634
        // Receive StateUpdate from tower.
635
        remoteMsg, err := q.cfg.ReadMessage(conn)
3✔
636
        if err != nil {
3✔
UNCOV
637
                return err
×
UNCOV
638
        }
×
639

640
        stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
3✔
641
        if !ok {
3✔
642
                return fmt.Errorf("watchtower %s responded with %T to "+
×
643
                        "StateUpdate", towerAddr, remoteMsg)
×
644
        }
×
645

646
        // Process the reply from the tower.
647
        switch stateUpdateReply.Code {
3✔
648

649
        // The tower reported a successful update, validate the response and
650
        // record the last applied returned.
651
        case wtwire.CodeOK:
3✔
652

653
        // TODO(conner): handle other error cases properly, ban towers, etc.
654
        default:
×
655
                err := fmt.Errorf("received error code %v in "+
×
656
                        "StateUpdateReply for seqnum=%d",
×
657
                        stateUpdateReply.Code, stateUpdate.SeqNum)
×
658
                q.log.Warnf("SessionQueue(%s) unable to upload state update "+
×
659
                        "to tower=%s: %v", q.ID(), towerAddr, err)
×
660
                return err
×
661
        }
662

663
        lastApplied := stateUpdateReply.LastApplied
3✔
664
        err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
3✔
665
        switch {
3✔
666
        case err == wtdb.ErrUnallocatedLastApplied:
×
667
                // TODO(conner): borked watchtower
×
668
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
669
                        stateUpdate.SeqNum, err)
×
670
                q.log.Errorf("SessionQueue(%v) failed to ack update: %v",
×
671
                        q.ID(), err)
×
672
                return err
×
673

674
        case err == wtdb.ErrLastAppliedReversion:
×
675
                // TODO(conner): borked watchtower
×
676
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
677
                        stateUpdate.SeqNum, err)
×
678
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
679
                        q.ID(), err)
×
680
                return err
×
681

682
        case err != nil:
×
683
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
684
                        stateUpdate.SeqNum, err)
×
685
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
686
                        q.ID(), err)
×
687
                return err
×
688
        }
689

690
        q.queueCond.L.Lock()
3✔
691
        if isPending {
6✔
692
                // If a pending update was successfully sent, increment the
3✔
693
                // sequence number and remove the item from the queue. This
3✔
694
                // ensures the total number of backups in the session remains
3✔
695
                // unchanged, which maintains the external view of the session's
3✔
696
                // reserve status.
3✔
697
                q.seqNum++
3✔
698
                q.pendingQueue.Remove(q.pendingQueue.Front())
3✔
699
        } else {
3✔
700
                // Otherwise, simply remove the update from the committed queue.
×
701
                // This has no effect on the queues reserve status since the
×
702
                // update had already been committed.
×
703
                q.commitQueue.Remove(q.commitQueue.Front())
×
704
        }
×
705
        q.queueCond.L.Unlock()
3✔
706

3✔
707
        return nil
3✔
708
}
709

710
// status returns a sessionQueueStatus indicating whether the sessionQueue can
711
// accept another task. sessionQueueAvailable is returned when a task can be
712
// accepted, and sessionQueueExhausted is returned if the all slots in the
713
// session have been allocated.
714
//
715
// NOTE: This method MUST be called with queueCond's exclusive lock held.
716
func (q *sessionQueue) status() sessionQueueStatus {
3✔
717
        numPending := uint32(q.pendingQueue.Len())
3✔
718
        maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
3✔
719

3✔
720
        if uint32(q.seqNum)+numPending < maxUpdates {
6✔
721
                return sessionQueueAvailable
3✔
722
        }
3✔
723

724
        return sessionQueueExhausted
3✔
725

726
}
727

728
// resetBackoff returns the connection backoff the minimum configured backoff.
729
func (q *sessionQueue) resetBackoff() {
3✔
730
        q.retryBackoff = q.cfg.MinBackoff
3✔
731
}
3✔
732

733
// increaseBackoff doubles the current connection backoff, clamping to the
734
// configured maximum backoff if it would exceed the limit.
UNCOV
735
func (q *sessionQueue) increaseBackoff() {
×
UNCOV
736
        q.retryBackoff *= 2
×
UNCOV
737
        if q.retryBackoff > q.cfg.MaxBackoff {
×
738
                q.retryBackoff = q.cfg.MaxBackoff
×
739
        }
×
740
}
741

742
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
743
// sessionQueue.
744
type sessionQueueSet struct {
745
        queues map[wtdb.SessionID]*sessionQueue
746
        mu     sync.Mutex
747
}
748

749
// newSessionQueueSet constructs a new sessionQueueSet.
750
func newSessionQueueSet() *sessionQueueSet {
3✔
751
        return &sessionQueueSet{
3✔
752
                queues: make(map[wtdb.SessionID]*sessionQueue),
3✔
753
        }
3✔
754
}
3✔
755

756
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
757
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
3✔
758
        s.mu.Lock()
3✔
759
        defer s.mu.Unlock()
3✔
760

3✔
761
        s.queues[*sessionQueue.ID()] = sessionQueue
3✔
762

3✔
763
        sessionQueue.Start()
3✔
764
}
3✔
765

766
// StopAndRemove stops the given session queue and removes it from the
767
// sessionQueueSet.
768
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID, final bool) error {
3✔
769
        s.mu.Lock()
3✔
770
        defer s.mu.Unlock()
3✔
771

3✔
772
        queue, ok := s.queues[id]
3✔
773
        if !ok {
6✔
774
                return nil
3✔
775
        }
3✔
776

777
        delete(s.queues, id)
3✔
778

3✔
779
        return queue.Stop(final)
3✔
780
}
781

782
// Get fetches and returns the sessionQueue with the given ID.
783
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
3✔
784
        s.mu.Lock()
3✔
785
        defer s.mu.Unlock()
3✔
786

3✔
787
        q, ok := s.queues[id]
3✔
788

3✔
789
        return q, ok
3✔
790
}
3✔
791

792
// ApplyAndWait executes the nil-adic function returned from getApply for each
793
// sessionQueue in the set in parallel, then waits for all of them to finish
794
// before returning to the caller.
795
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
3✔
796
        s.mu.Lock()
3✔
797
        defer s.mu.Unlock()
3✔
798

3✔
799
        var wg sync.WaitGroup
3✔
800
        for _, sessionq := range s.queues {
6✔
801
                wg.Add(1)
3✔
802
                go func(sq *sessionQueue) {
6✔
803
                        defer wg.Done()
3✔
804

3✔
805
                        getApply(sq)()
3✔
806
                }(sessionq)
3✔
807
        }
808
        wg.Wait()
3✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc