• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.9
/watchtower/wtclient/session_queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/input"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
16
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
17
)
18

19
// sessionQueueStatus is an enum that signals how full a particular session is.
20
type sessionQueueStatus uint8
21

22
const (
23
        // sessionQueueAvailable indicates that the session has space for at
24
        // least one more backup.
25
        sessionQueueAvailable sessionQueueStatus = iota
26

27
        // sessionQueueExhausted indicates that all slots in the session have
28
        // been allocated.
29
        sessionQueueExhausted
30

31
        // sessionQueueShuttingDown indicates that the session queue is
32
        // shutting down and so is no longer accepting any more backups.
33
        sessionQueueShuttingDown
34
)
35

36
// sessionQueueConfig bundles the resources required by the sessionQueue to
37
// perform its duties. All entries MUST be non-nil.
38
type sessionQueueConfig struct {
39
        // ClientSession provides access to the negotiated session parameters
40
        // and updating its persistent storage.
41
        ClientSession *ClientSession
42

43
        // ChainHash identifies the chain for which the session's justice
44
        // transactions are targeted.
45
        ChainHash chainhash.Hash
46

47
        // Dial allows the client to dial the tower using it's public key and
48
        // net address.
49
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
50
                error)
51

52
        // SendMessage encodes, encrypts, and writes a message to the given
53
        // peer.
54
        SendMessage func(wtserver.Peer, wtwire.Message) error
55

56
        // ReadMessage receives, decypts, and decodes a message from the given
57
        // peer.
58
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
59

60
        // Signer facilitates signing of inputs, used to construct the witnesses
61
        // for justice transaction inputs.
62
        Signer input.Signer
63

64
        // BuildBreachRetribution is a function closure that allows the client
65
        // to fetch the breach retribution info for a certain channel at a
66
        // certain revoked commitment height.
67
        BuildBreachRetribution BreachRetributionBuilder
68

69
        // TaskPipeline is a pipeline which the sessionQueue should use to send
70
        // any unhandled tasks on shutdown of the queue.
71
        TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
72

73
        // DB provides access to the client's stable storage.
74
        DB DB
75

76
        // MinBackoff defines the initial backoff applied by the session
77
        // queue before reconnecting to the tower after a failed or partially
78
        // successful batch is sent. Subsequent backoff durations will grow
79
        // exponentially up until MaxBackoff.
80
        MinBackoff time.Duration
81

82
        // MaxBackoff defines the maximum backoff applied by the session
83
        // queue before reconnecting to the tower after a failed or partially
84
        // successful batch is sent. If the exponential backoff produces a
85
        // timeout greater than this value, the backoff duration will be clamped
86
        // to MaxBackoff.
87
        MaxBackoff time.Duration
88

89
        // Log specifies the desired log output, which should be prefixed by the
90
        // client type, e.g. anchor or legacy.
91
        Log btclog.Logger
92
}
93

94
// sessionQueue implements a reliable queue that will encrypt and send accepted
95
// backups to the watchtower specified in the config's ClientSession. Calling
96
// Stop will attempt to perform a clean shutdown replaying any un-committed
97
// pending updates to the client's main task pipeline.
98
type sessionQueue struct {
99
        started sync.Once
100
        stopped sync.Once
101
        forced  sync.Once
102

103
        cfg *sessionQueueConfig
104
        log btclog.Logger
105

106
        commitQueue  *list.List
107
        pendingQueue *list.List
108
        queueMtx     sync.Mutex
109
        queueCond    *sync.Cond
110

111
        localInit *wtwire.Init
112
        tower     *Tower
113

114
        seqNum uint16
115

116
        retryBackoff time.Duration
117

118
        quit chan struct{}
119
        wg   sync.WaitGroup
120
}
121

122
// newSessionQueue initializes a fresh sessionQueue.
123
func newSessionQueue(cfg *sessionQueueConfig,
124
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
125

3✔
126
        localInit := wtwire.NewInitMessage(
3✔
127
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
3✔
128
                cfg.ChainHash,
3✔
129
        )
3✔
130

3✔
131
        sq := &sessionQueue{
3✔
132
                cfg:          cfg,
3✔
133
                log:          cfg.Log,
3✔
134
                commitQueue:  list.New(),
3✔
135
                pendingQueue: list.New(),
3✔
136
                localInit:    localInit,
3✔
137
                tower:        cfg.ClientSession.Tower,
3✔
138
                seqNum:       cfg.ClientSession.SeqNum,
3✔
139
                retryBackoff: cfg.MinBackoff,
3✔
140
                quit:         make(chan struct{}),
3✔
141
        }
3✔
142
        sq.queueCond = sync.NewCond(&sq.queueMtx)
3✔
143

3✔
144
        // The database should return them in sorted order, and session queue's
3✔
145
        // sequence number will be equal to that of the last committed update.
3✔
146
        for _, update := range updates {
3✔
UNCOV
147
                sq.commitQueue.PushBack(update)
×
UNCOV
148
        }
×
149

150
        return sq
3✔
151
}
152

153
// Start idempotently starts the sessionQueue so that it can begin accepting
154
// backups.
155
func (q *sessionQueue) Start() {
3✔
156
        q.started.Do(func() {
6✔
157
                q.wg.Add(1)
3✔
158
                go q.sessionManager()
3✔
159
        })
3✔
160
}
161

162
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
163
// will clear all pending tasks in the queue before returning to the caller.
164
// The final param should only be set to true if this is the last time that
165
// this session will be used. Otherwise, during normal shutdown, the final param
166
// should be false.
167
func (q *sessionQueue) Stop(final bool) error {
3✔
168
        var returnErr error
3✔
169
        q.stopped.Do(func() {
6✔
170
                q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
3✔
171

3✔
172
                close(q.quit)
3✔
173

3✔
174
                shutdown := make(chan struct{})
3✔
175
                go func() {
6✔
176
                        for {
6✔
177
                                select {
3✔
178
                                case <-time.After(time.Millisecond):
3✔
179
                                        q.queueCond.Signal()
3✔
180
                                case <-shutdown:
3✔
181
                                        return
3✔
182
                                }
183
                        }
184
                }()
185

186
                q.wg.Wait()
3✔
187
                close(shutdown)
3✔
188

3✔
189
                // Now, for any task in the pending queue that we have not yet
3✔
190
                // created a CommittedUpdate for, re-add the task to the main
3✔
191
                // task pipeline.
3✔
192
                updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
3✔
193
                if err != nil {
3✔
194
                        returnErr = err
×
195
                        return
×
196
                }
×
197

198
                unAckedUpdates := make(map[wtdb.BackupID]bool)
3✔
199
                for _, update := range updates {
3✔
UNCOV
200
                        unAckedUpdates[update.BackupID] = true
×
UNCOV
201

×
UNCOV
202
                        if !final {
×
UNCOV
203
                                continue
×
204
                        }
205

UNCOV
206
                        err := q.cfg.TaskPipeline.QueueBackupID(
×
UNCOV
207
                                &update.BackupID,
×
UNCOV
208
                        )
×
UNCOV
209
                        if err != nil {
×
210
                                log.Errorf("could not re-queue %s: %v",
×
211
                                        update.BackupID, err)
×
212
                                continue
×
213
                        }
214
                }
215

216
                if final {
6✔
217
                        err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
3✔
218
                        if err != nil {
3✔
219
                                log.Errorf("could not delete committed "+
×
220
                                        "updates for session %s", q.ID())
×
221
                        }
×
222
                }
223

224
                // Push any task that was on the pending queue that there is
225
                // not yet a committed update for back to the main task
226
                // pipeline.
227
                q.queueCond.L.Lock()
3✔
228
                for q.pendingQueue.Len() > 0 {
3✔
UNCOV
229
                        next := q.pendingQueue.Front()
×
UNCOV
230
                        q.pendingQueue.Remove(next)
×
UNCOV
231

×
UNCOV
232
                        //nolint:forcetypeassert
×
UNCOV
233
                        task := next.Value.(*backupTask)
×
UNCOV
234

×
UNCOV
235
                        if unAckedUpdates[task.id] {
×
UNCOV
236
                                continue
×
237
                        }
238

UNCOV
239
                        err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
×
UNCOV
240
                        if err != nil {
×
241
                                log.Errorf("could not re-queue backup task: "+
×
242
                                        "%v", err)
×
243
                                continue
×
244
                        }
245
                }
246
                q.queueCond.L.Unlock()
3✔
247

3✔
248
                q.log.Debugf("SessionQueue(%s) stopped", q.ID())
3✔
249
        })
250

251
        return returnErr
3✔
252
}
253

254
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
255
// identify this a particular queue.
256
func (q *sessionQueue) ID() *wtdb.SessionID {
3✔
257
        return &q.cfg.ClientSession.ID
3✔
258
}
3✔
259

260
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
261
// tower. The session will only be accepted if the queue is not already
262
// exhausted or shutting down and the task is successfully bound to the
263
// ClientSession.
264
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
3✔
265
        // Exit early if the queue has started shutting down.
3✔
266
        select {
3✔
267
        case <-q.quit:
×
268
                return sessionQueueShuttingDown, false
×
269
        default:
3✔
270
        }
271

272
        q.queueCond.L.Lock()
3✔
273

3✔
274
        // There is a chance that sessionQueue started shutting down between
3✔
275
        // the last quit channel check and waiting for the lock. So check one
3✔
276
        // more time here.
3✔
277
        select {
3✔
278
        case <-q.quit:
×
279
                q.queueCond.L.Unlock()
×
280
                return sessionQueueShuttingDown, false
×
281
        default:
3✔
282
        }
283

284
        numPending := uint32(q.pendingQueue.Len())
3✔
285
        maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
3✔
286
        q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
3✔
287
                "pending=%d max-updates=%d",
3✔
288
                q.ID(), task.id, q.seqNum, numPending, maxUpdates)
3✔
289

3✔
290
        // Examine the current reserve status of the session queue.
3✔
291
        curStatus := q.status()
3✔
292

3✔
293
        switch curStatus {
3✔
294

295
        // The session queue is exhausted, and cannot accept the task because it
296
        // is full. Reject the task such that it can be tried against a
297
        // different session.
UNCOV
298
        case sessionQueueExhausted:
×
UNCOV
299
                q.queueCond.L.Unlock()
×
UNCOV
300
                return curStatus, false
×
301

302
        // The session queue is not exhausted. Compute the sweep and reward
303
        // outputs as a function of the session parameters. If the outputs are
304
        // dusty or uneconomical to backup, the task is rejected and will not be
305
        // tried again.
306
        //
307
        // TODO(conner): queue backups and retry with different session params.
308
        case sessionQueueAvailable:
3✔
309
                err := task.bindSession(
3✔
310
                        &q.cfg.ClientSession.ClientSessionBody,
3✔
311
                        q.cfg.BuildBreachRetribution,
3✔
312
                )
3✔
313
                if err != nil {
3✔
UNCOV
314
                        q.queueCond.L.Unlock()
×
UNCOV
315
                        q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
×
UNCOV
316
                                q.ID(), task.id, err)
×
UNCOV
317
                        return curStatus, false
×
UNCOV
318
                }
×
319
        }
320

321
        // The sweep and reward outputs satisfy the session's policy, queue the
322
        // task for final signing and delivery.
323
        q.pendingQueue.PushBack(task)
3✔
324

3✔
325
        // Finally, compute the session's *new* reserve status. This will be
3✔
326
        // used by the client to determine if it can continue using this session
3✔
327
        // queue, or if it should negotiate a new one.
3✔
328
        newStatus := q.status()
3✔
329
        q.queueCond.L.Unlock()
3✔
330

3✔
331
        q.queueCond.Signal()
3✔
332

3✔
333
        return newStatus, true
3✔
334
}
335

336
// sessionManager is the primary event loop for the sessionQueue, and is
337
// responsible for encrypting and sending accepted tasks to the tower.
338
func (q *sessionQueue) sessionManager() {
3✔
339
        defer q.wg.Done()
3✔
340

3✔
341
        for {
6✔
342
                q.queueCond.L.Lock()
3✔
343
                for q.commitQueue.Len() == 0 &&
3✔
344
                        q.pendingQueue.Len() == 0 {
6✔
345

3✔
346
                        q.queueCond.Wait()
3✔
347

3✔
348
                        select {
3✔
349
                        case <-q.quit:
3✔
350
                                q.queueCond.L.Unlock()
3✔
351
                                return
3✔
352
                        default:
3✔
353
                        }
354
                }
355
                q.queueCond.L.Unlock()
3✔
356

3✔
357
                // Exit immediately if the sessionQueue has been stopped.
3✔
358
                select {
3✔
UNCOV
359
                case <-q.quit:
×
UNCOV
360
                        return
×
361
                default:
3✔
362
                }
363

364
                // Initiate a new connection to the watchtower and attempt to
365
                // drain all pending tasks.
366
                q.drainBackups()
3✔
367
        }
368
}
369

370
// drainBackups attempts to send all pending updates in the queue to the tower.
371
func (q *sessionQueue) drainBackups() {
3✔
372
        var (
3✔
373
                conn      wtserver.Peer
3✔
374
                err       error
3✔
375
                towerAddr = q.tower.Addresses.Peek()
3✔
376
        )
3✔
377

3✔
378
        for {
6✔
379
                q.log.Infof("SessionQueue(%s) attempting to dial tower at %v",
3✔
380
                        q.ID(), towerAddr)
3✔
381

3✔
382
                // First, check that we are able to dial this session's tower.
3✔
383
                conn, err = q.cfg.Dial(
3✔
384
                        q.cfg.ClientSession.SessionKeyECDH, &lnwire.NetAddress{
3✔
385
                                IdentityKey: q.tower.IdentityKey,
3✔
386
                                Address:     towerAddr,
3✔
387
                        },
3✔
388
                )
3✔
389
                if err != nil {
3✔
UNCOV
390
                        // If there are more addrs available, immediately try
×
UNCOV
391
                        // those.
×
UNCOV
392
                        nextAddr, iteratorErr := q.tower.Addresses.Next()
×
UNCOV
393
                        if iteratorErr == nil {
×
394
                                towerAddr = nextAddr
×
395
                                continue
×
396
                        }
397

398
                        // Otherwise, if we have exhausted the address list,
399
                        // back off and try again later.
UNCOV
400
                        q.tower.Addresses.Reset()
×
UNCOV
401

×
UNCOV
402
                        q.log.Errorf("SessionQueue(%s) unable to dial tower "+
×
UNCOV
403
                                "at any available Addresses: %v", q.ID(), err)
×
UNCOV
404

×
UNCOV
405
                        q.increaseBackoff()
×
UNCOV
406
                        select {
×
UNCOV
407
                        case <-time.After(q.retryBackoff):
×
UNCOV
408
                        case <-q.quit:
×
409
                        }
UNCOV
410
                        return
×
411
                }
412

413
                break
3✔
414
        }
415
        defer conn.Close()
3✔
416

3✔
417
        // Begin draining the queue of pending state updates. Before the first
3✔
418
        // update is sent, we will precede it with an Init message. If the first
3✔
419
        // is successful, subsequent updates can be streamed without sending an
3✔
420
        // Init.
3✔
421
        for sendInit := true; ; sendInit = false {
6✔
422
                // Generate the next state update to upload to the tower. This
3✔
423
                // method will first proceed in dequeuing committed updates
3✔
424
                // before attempting to dequeue any pending updates.
3✔
425
                stateUpdate, isPending, backupID, err := q.nextStateUpdate()
3✔
426
                if err != nil {
3✔
427
                        q.log.Errorf("SessionQueue(%v) unable to get next "+
×
428
                                "state update: %v", q.ID(), err)
×
429
                        return
×
430
                }
×
431

432
                // Now, send the state update to the tower and wait for a reply.
433
                err = q.sendStateUpdate(conn, stateUpdate, sendInit, isPending)
3✔
434
                if err != nil {
3✔
UNCOV
435
                        q.log.Errorf("SessionQueue(%s) unable to send state "+
×
UNCOV
436
                                "update: %v", q.ID(), err)
×
UNCOV
437

×
UNCOV
438
                        q.increaseBackoff()
×
UNCOV
439
                        select {
×
UNCOV
440
                        case <-time.After(q.retryBackoff):
×
UNCOV
441
                        case <-q.quit:
×
442
                        }
UNCOV
443
                        return
×
444
                }
445

446
                q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
3✔
447
                        q.ID(), backupID, stateUpdate.SeqNum)
3✔
448

3✔
449
                // If the last task was backed up successfully, we'll exit and
3✔
450
                // continue once more tasks are added to the queue. We'll also
3✔
451
                // clear any accumulated backoff as this batch was able to be
3✔
452
                // sent reliably.
3✔
453
                if stateUpdate.IsComplete == 1 {
6✔
454
                        q.resetBackoff()
3✔
455
                        return
3✔
456
                }
3✔
457

458
                // Always apply a small delay between sends, which makes the
459
                // unit tests more reliable. If we were requested to back off,
460
                // when we will do so.
461
                select {
3✔
462
                case <-time.After(time.Millisecond):
3✔
463
                case <-q.quit:
×
464
                        return
×
465
                }
466
        }
467
}
468

469
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
470
// If any committed updates are present, this method will reconstruct the state
471
// update from the committed update using the current last applied value found
472
// in the database. Otherwise, it will select the next pending update, craft the
473
// payload, and commit an update before returning the state update to send. The
474
// boolean value in the response is true if the state update is taken from the
475
// pending queue, allowing the caller to remove the update from either the
476
// commit or pending queue if the update is successfully acked.
477
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
478
        wtdb.BackupID, error) {
3✔
479

3✔
480
        var (
3✔
481
                seqNum    uint16
3✔
482
                update    wtdb.CommittedUpdate
3✔
483
                isLast    bool
3✔
484
                isPending bool
3✔
485
        )
3✔
486

3✔
487
        q.queueCond.L.Lock()
3✔
488
        switch {
3✔
489

490
        // If the commit queue is non-empty, parse the next committed update.
UNCOV
491
        case q.commitQueue.Len() > 0:
×
UNCOV
492
                next := q.commitQueue.Front()
×
UNCOV
493

×
UNCOV
494
                update = next.Value.(wtdb.CommittedUpdate)
×
UNCOV
495
                seqNum = update.SeqNum
×
UNCOV
496

×
UNCOV
497
                // If this is the last item in the commit queue and no items
×
UNCOV
498
                // exist in the pending queue, we will use the IsComplete flag
×
UNCOV
499
                // in the StateUpdate to signal that the tower can release the
×
UNCOV
500
                // connection after replying to free up resources.
×
UNCOV
501
                isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
×
UNCOV
502
                q.queueCond.L.Unlock()
×
UNCOV
503

×
UNCOV
504
                q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
×
UNCOV
505
                        "update for %v seqnum=%d",
×
UNCOV
506
                        q.ID(), update.BackupID, seqNum)
×
507

508
        // Otherwise, craft and commit the next update from the pending queue.
509
        default:
3✔
510
                isPending = true
3✔
511

3✔
512
                // Determine the current sequence number to apply for this
3✔
513
                // pending update.
3✔
514
                seqNum = q.seqNum + 1
3✔
515

3✔
516
                // Obtain the next task from the queue.
3✔
517
                next := q.pendingQueue.Front()
3✔
518
                task := next.Value.(*backupTask)
3✔
519

3✔
520
                // If this is the last item in the pending queue, we will use
3✔
521
                // the IsComplete flag in the StateUpdate to signal that the
3✔
522
                // tower can release the connection after replying to free up
3✔
523
                // resources.
3✔
524
                isLast = q.pendingQueue.Len() == 1
3✔
525
                q.queueCond.L.Unlock()
3✔
526

3✔
527
                hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
3✔
528
                if err != nil {
3✔
529
                        // TODO(conner): mark will not send
×
530
                        err := fmt.Errorf("unable to craft session payload: %w",
×
531
                                err)
×
532
                        return nil, false, wtdb.BackupID{}, err
×
533
                }
×
534
                // TODO(conner): special case other obscure errors
535

536
                update = wtdb.CommittedUpdate{
3✔
537
                        SeqNum: seqNum,
3✔
538
                        CommittedUpdateBody: wtdb.CommittedUpdateBody{
3✔
539
                                BackupID:      task.id,
3✔
540
                                Hint:          hint,
3✔
541
                                EncryptedBlob: encBlob,
3✔
542
                        },
3✔
543
                }
3✔
544

3✔
545
                q.log.Debugf("SessionQueue(%s) committing state update "+
3✔
546
                        "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
3✔
547
        }
548

549
        // Before sending the task to the tower, commit the state update
550
        // to disk using the assigned sequence number. If this task has already
551
        // been committed, the call will succeed and only be used for the
552
        // purpose of obtaining the last applied value to send to the tower.
553
        //
554
        // This step ensures that if we crash before receiving an ack that we
555
        // will retransmit the same update. If the tower successfully received
556
        // the update from before, it will reply with an ACK regardless of what
557
        // we send the next time. This step ensures that if we reliably send the
558
        // same update for a given sequence number, to prevent us from thinking
559
        // we backed up a state when we instead backed up another.
560
        lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
3✔
561
        if err != nil {
3✔
562
                // TODO(conner): mark failed/reschedule
×
563
                err := fmt.Errorf("unable to commit state update for "+
×
564
                        "%v seqnum=%d: %v", update.BackupID, seqNum, err)
×
565
                return nil, false, wtdb.BackupID{}, err
×
566
        }
×
567

568
        stateUpdate := &wtwire.StateUpdate{
3✔
569
                SeqNum:        update.SeqNum,
3✔
570
                LastApplied:   lastApplied,
3✔
571
                Hint:          update.Hint,
3✔
572
                EncryptedBlob: update.EncryptedBlob,
3✔
573
        }
3✔
574

3✔
575
        // Set the IsComplete flag if this is the last queued item.
3✔
576
        if isLast {
6✔
577
                stateUpdate.IsComplete = 1
3✔
578
        }
3✔
579

580
        return stateUpdate, isPending, update.BackupID, nil
3✔
581
}
582

583
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
584
// the ACK before returning. If sendInit is true, this method will first send
585
// the localInit message and verify that the tower supports our required feature
586
// bits. And error is returned if any part of the send fails. The boolean return
587
// variable indicates whether we should back off before attempting to send the
588
// next state update.
589
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
590
        stateUpdate *wtwire.StateUpdate, sendInit, isPending bool) error {
3✔
591

3✔
592
        towerAddr := &lnwire.NetAddress{
3✔
593
                IdentityKey: conn.RemotePub(),
3✔
594
                Address:     conn.RemoteAddr(),
3✔
595
        }
3✔
596

3✔
597
        // If this is the first message being sent to the tower, we must send an
3✔
598
        // Init message to establish that server supports the features we
3✔
599
        // require.
3✔
600
        if sendInit {
6✔
601
                // Send Init to tower.
3✔
602
                err := q.cfg.SendMessage(conn, q.localInit)
3✔
603
                if err != nil {
3✔
604
                        return err
×
605
                }
×
606

607
                // Receive Init from tower.
608
                remoteMsg, err := q.cfg.ReadMessage(conn)
3✔
609
                if err != nil {
3✔
610
                        return err
×
611
                }
×
612

613
                remoteInit, ok := remoteMsg.(*wtwire.Init)
3✔
614
                if !ok {
3✔
615
                        return fmt.Errorf("watchtower %s responded with %T "+
×
616
                                "to Init", towerAddr, remoteMsg)
×
617
                }
×
618

619
                // Validate Init.
620
                err = q.localInit.CheckRemoteInit(
3✔
621
                        remoteInit, wtwire.FeatureNames,
3✔
622
                )
3✔
623
                if err != nil {
3✔
624
                        return err
×
625
                }
×
626
        }
627

628
        // Send StateUpdate to tower.
629
        err := q.cfg.SendMessage(conn, stateUpdate)
3✔
630
        if err != nil {
3✔
631
                return err
×
632
        }
×
633

634
        // Receive StateUpdate from tower.
635
        remoteMsg, err := q.cfg.ReadMessage(conn)
3✔
636
        if err != nil {
3✔
UNCOV
637
                return err
×
UNCOV
638
        }
×
639

640
        stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
3✔
641
        if !ok {
3✔
642
                return fmt.Errorf("watchtower %s responded with %T to "+
×
643
                        "StateUpdate", towerAddr, remoteMsg)
×
644
        }
×
645

646
        // Process the reply from the tower.
647
        switch stateUpdateReply.Code {
3✔
648

649
        // The tower reported a successful update, validate the response and
650
        // record the last applied returned.
651
        case wtwire.CodeOK:
3✔
652

653
        // TODO(conner): handle other error cases properly, ban towers, etc.
654
        default:
×
655
                err := fmt.Errorf("received error code %v in "+
×
656
                        "StateUpdateReply for seqnum=%d",
×
657
                        stateUpdateReply.Code, stateUpdate.SeqNum)
×
658
                q.log.Warnf("SessionQueue(%s) unable to upload state update "+
×
659
                        "to tower=%s: %v", q.ID(), towerAddr, err)
×
660
                return err
×
661
        }
662

663
        lastApplied := stateUpdateReply.LastApplied
3✔
664
        err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
3✔
665
        switch {
3✔
666
        case err == wtdb.ErrUnallocatedLastApplied:
×
667
                // TODO(conner): borked watchtower
×
668
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
669
                        stateUpdate.SeqNum, err)
×
670
                q.log.Errorf("SessionQueue(%v) failed to ack update: %v",
×
671
                        q.ID(), err)
×
672
                return err
×
673

674
        case err == wtdb.ErrLastAppliedReversion:
×
675
                // TODO(conner): borked watchtower
×
676
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
677
                        stateUpdate.SeqNum, err)
×
678
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
679
                        q.ID(), err)
×
680
                return err
×
681

682
        case err != nil:
×
683
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
684
                        stateUpdate.SeqNum, err)
×
685
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
686
                        q.ID(), err)
×
687
                return err
×
688
        }
689

690
        q.queueCond.L.Lock()
3✔
691
        if isPending {
6✔
692
                // If a pending update was successfully sent, increment the
3✔
693
                // sequence number and remove the item from the queue. This
3✔
694
                // ensures the total number of backups in the session remains
3✔
695
                // unchanged, which maintains the external view of the session's
3✔
696
                // reserve status.
3✔
697
                q.seqNum++
3✔
698
                q.pendingQueue.Remove(q.pendingQueue.Front())
3✔
699
        } else {
3✔
700
                // Otherwise, simply remove the update from the committed queue.
×
701
                // This has no effect on the queues reserve status since the
×
702
                // update had already been committed.
×
703
                q.commitQueue.Remove(q.commitQueue.Front())
×
704
        }
×
705
        q.queueCond.L.Unlock()
3✔
706

3✔
707
        return nil
3✔
708
}
709

710
// status returns a sessionQueueStatus indicating whether the sessionQueue can
711
// accept another task. sessionQueueAvailable is returned when a task can be
712
// accepted, and sessionQueueExhausted is returned if the all slots in the
713
// session have been allocated.
714
//
715
// NOTE: This method MUST be called with queueCond's exclusive lock held.
716
func (q *sessionQueue) status() sessionQueueStatus {
3✔
717
        numPending := uint32(q.pendingQueue.Len())
3✔
718
        maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
3✔
719

3✔
720
        if uint32(q.seqNum)+numPending < maxUpdates {
6✔
721
                return sessionQueueAvailable
3✔
722
        }
3✔
723

724
        return sessionQueueExhausted
3✔
725

726
}
727

728
// resetBackoff returns the connection backoff the minimum configured backoff.
729
func (q *sessionQueue) resetBackoff() {
3✔
730
        q.retryBackoff = q.cfg.MinBackoff
3✔
731
}
3✔
732

733
// increaseBackoff doubles the current connection backoff, clamping to the
734
// configured maximum backoff if it would exceed the limit.
UNCOV
735
func (q *sessionQueue) increaseBackoff() {
×
UNCOV
736
        q.retryBackoff *= 2
×
UNCOV
737
        if q.retryBackoff > q.cfg.MaxBackoff {
×
738
                q.retryBackoff = q.cfg.MaxBackoff
×
739
        }
×
740
}
741

742
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
743
// sessionQueue.
744
type sessionQueueSet struct {
745
        queues map[wtdb.SessionID]*sessionQueue
746
        mu     sync.Mutex
747
}
748

749
// newSessionQueueSet constructs a new sessionQueueSet.
750
func newSessionQueueSet() *sessionQueueSet {
3✔
751
        return &sessionQueueSet{
3✔
752
                queues: make(map[wtdb.SessionID]*sessionQueue),
3✔
753
        }
3✔
754
}
3✔
755

756
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
757
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
3✔
758
        s.mu.Lock()
3✔
759
        defer s.mu.Unlock()
3✔
760

3✔
761
        s.queues[*sessionQueue.ID()] = sessionQueue
3✔
762

3✔
763
        sessionQueue.Start()
3✔
764
}
3✔
765

766
// StopAndRemove stops the given session queue and removes it from the
767
// sessionQueueSet.
768
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID, final bool) error {
3✔
769
        s.mu.Lock()
3✔
770
        defer s.mu.Unlock()
3✔
771

3✔
772
        queue, ok := s.queues[id]
3✔
773
        if !ok {
6✔
774
                return nil
3✔
775
        }
3✔
776

777
        delete(s.queues, id)
3✔
778

3✔
779
        return queue.Stop(final)
3✔
780
}
781

782
// Get fetches and returns the sessionQueue with the given ID.
783
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
3✔
784
        s.mu.Lock()
3✔
785
        defer s.mu.Unlock()
3✔
786

3✔
787
        q, ok := s.queues[id]
3✔
788

3✔
789
        return q, ok
3✔
790
}
3✔
791

792
// ApplyAndWait executes the nil-adic function returned from getApply for each
793
// sessionQueue in the set in parallel, then waits for all of them to finish
794
// before returning to the caller.
795
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
3✔
796
        s.mu.Lock()
3✔
797
        defer s.mu.Unlock()
3✔
798

3✔
799
        var wg sync.WaitGroup
3✔
800
        for _, sessionq := range s.queues {
6✔
801
                wg.Add(1)
3✔
802
                go func(sq *sessionQueue) {
6✔
803
                        defer wg.Done()
3✔
804

3✔
805
                        getApply(sq)()
3✔
806
                }(sessionq)
3✔
807
        }
808
        wg.Wait()
3✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc