• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.92
/watchtower/wtclient/session_queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/input"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
16
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
17
)
18

19
// sessionQueueStatus is an enum that signals how full a particular session is.
20
type sessionQueueStatus uint8
21

22
const (
23
        // sessionQueueAvailable indicates that the session has space for at
24
        // least one more backup.
25
        sessionQueueAvailable sessionQueueStatus = iota
26

27
        // sessionQueueExhausted indicates that all slots in the session have
28
        // been allocated.
29
        sessionQueueExhausted
30

31
        // sessionQueueShuttingDown indicates that the session queue is
32
        // shutting down and so is no longer accepting any more backups.
33
        sessionQueueShuttingDown
34
)
35

36
// sessionQueueConfig bundles the resources required by the sessionQueue to
37
// perform its duties. All entries MUST be non-nil.
38
type sessionQueueConfig struct {
39
        // ClientSession provides access to the negotiated session parameters
40
        // and updating its persistent storage.
41
        ClientSession *ClientSession
42

43
        // ChainHash identifies the chain for which the session's justice
44
        // transactions are targeted.
45
        ChainHash chainhash.Hash
46

47
        // Dial allows the client to dial the tower using it's public key and
48
        // net address.
49
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
50
                error)
51

52
        // SendMessage encodes, encrypts, and writes a message to the given
53
        // peer.
54
        SendMessage func(wtserver.Peer, wtwire.Message) error
55

56
        // ReadMessage receives, decypts, and decodes a message from the given
57
        // peer.
58
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
59

60
        // Signer facilitates signing of inputs, used to construct the witnesses
61
        // for justice transaction inputs.
62
        Signer input.Signer
63

64
        // BuildBreachRetribution is a function closure that allows the client
65
        // to fetch the breach retribution info for a certain channel at a
66
        // certain revoked commitment height.
67
        BuildBreachRetribution BreachRetributionBuilder
68

69
        // TaskPipeline is a pipeline which the sessionQueue should use to send
70
        // any unhandled tasks on shutdown of the queue.
71
        TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
72

73
        // DB provides access to the client's stable storage.
74
        DB DB
75

76
        // MinBackoff defines the initial backoff applied by the session
77
        // queue before reconnecting to the tower after a failed or partially
78
        // successful batch is sent. Subsequent backoff durations will grow
79
        // exponentially up until MaxBackoff.
80
        MinBackoff time.Duration
81

82
        // MaxBackoff defines the maximum backoff applied by the session
83
        // queue before reconnecting to the tower after a failed or partially
84
        // successful batch is sent. If the exponential backoff produces a
85
        // timeout greater than this value, the backoff duration will be clamped
86
        // to MaxBackoff.
87
        MaxBackoff time.Duration
88

89
        // Log specifies the desired log output, which should be prefixed by the
90
        // client type, e.g. anchor or legacy.
91
        Log btclog.Logger
92
}
93

94
// sessionQueue implements a reliable queue that will encrypt and send accepted
95
// backups to the watchtower specified in the config's ClientSession. Calling
96
// Stop will attempt to perform a clean shutdown replaying any un-committed
97
// pending updates to the client's main task pipeline.
98
type sessionQueue struct {
99
        started sync.Once
100
        stopped sync.Once
101
        forced  sync.Once
102

103
        cfg *sessionQueueConfig
104
        log btclog.Logger
105

106
        commitQueue  *list.List
107
        pendingQueue *list.List
108
        queueMtx     sync.Mutex
109
        queueCond    *sync.Cond
110

111
        localInit *wtwire.Init
112
        tower     *Tower
113

114
        seqNum uint16
115

116
        retryBackoff time.Duration
117

118
        quit chan struct{}
119
        wg   sync.WaitGroup
120
}
121

122
// newSessionQueue initializes a fresh sessionQueue.
123
func newSessionQueue(cfg *sessionQueueConfig,
124
        updates []wtdb.CommittedUpdate) *sessionQueue {
83✔
125

83✔
126
        localInit := wtwire.NewInitMessage(
83✔
127
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
83✔
128
                cfg.ChainHash,
83✔
129
        )
83✔
130

83✔
131
        sq := &sessionQueue{
83✔
132
                cfg:          cfg,
83✔
133
                log:          cfg.Log,
83✔
134
                commitQueue:  list.New(),
83✔
135
                pendingQueue: list.New(),
83✔
136
                localInit:    localInit,
83✔
137
                tower:        cfg.ClientSession.Tower,
83✔
138
                seqNum:       cfg.ClientSession.SeqNum,
83✔
139
                retryBackoff: cfg.MinBackoff,
83✔
140
                quit:         make(chan struct{}),
83✔
141
        }
83✔
142
        sq.queueCond = sync.NewCond(&sq.queueMtx)
83✔
143

83✔
144
        // The database should return them in sorted order, and session queue's
83✔
145
        // sequence number will be equal to that of the last committed update.
83✔
146
        for _, update := range updates {
85✔
147
                sq.commitQueue.PushBack(update)
2✔
148
        }
2✔
149

150
        return sq
83✔
151
}
152

153
// Start idempotently starts the sessionQueue so that it can begin accepting
154
// backups.
155
func (q *sessionQueue) Start() {
83✔
156
        q.started.Do(func() {
166✔
157
                q.wg.Add(1)
83✔
158
                go q.sessionManager()
83✔
159
        })
83✔
160
}
161

162
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
163
// will clear all pending tasks in the queue before returning to the caller.
164
// The final param should only be set to true if this is the last time that
165
// this session will be used. Otherwise, during normal shutdown, the final param
166
// should be false.
167
func (q *sessionQueue) Stop(final bool) error {
83✔
168
        var returnErr error
83✔
169
        q.stopped.Do(func() {
166✔
170
                q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
83✔
171

83✔
172
                close(q.quit)
83✔
173

83✔
174
                shutdown := make(chan struct{})
83✔
175
                go func() {
166✔
176
                        for {
242✔
177
                                select {
159✔
178
                                case <-time.After(time.Millisecond):
76✔
179
                                        q.queueCond.Signal()
76✔
180
                                case <-shutdown:
83✔
181
                                        return
83✔
182
                                }
183
                        }
184
                }()
185

186
                q.wg.Wait()
83✔
187
                close(shutdown)
83✔
188

83✔
189
                // Now, for any task in the pending queue that we have not yet
83✔
190
                // created a CommittedUpdate for, re-add the task to the main
83✔
191
                // task pipeline.
83✔
192
                updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
83✔
193
                if err != nil {
83✔
194
                        returnErr = err
×
195
                        return
×
196
                }
×
197

198
                unAckedUpdates := make(map[wtdb.BackupID]bool)
83✔
199
                for _, update := range updates {
89✔
200
                        unAckedUpdates[update.BackupID] = true
6✔
201

6✔
202
                        if !final {
8✔
203
                                continue
2✔
204
                        }
205

206
                        err := q.cfg.TaskPipeline.QueueBackupID(
4✔
207
                                &update.BackupID,
4✔
208
                        )
4✔
209
                        if err != nil {
4✔
210
                                log.Errorf("could not re-queue %s: %v",
×
211
                                        update.BackupID, err)
×
212
                                continue
×
213
                        }
214
                }
215

216
                if final {
96✔
217
                        err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
13✔
218
                        if err != nil {
13✔
219
                                log.Errorf("could not delete committed "+
×
220
                                        "updates for session %s", q.ID())
×
221
                        }
×
222
                }
223

224
                // Push any task that was on the pending queue that there is
225
                // not yet a committed update for back to the main task
226
                // pipeline.
227
                q.queueCond.L.Lock()
83✔
228
                for q.pendingQueue.Len() > 0 {
93✔
229
                        next := q.pendingQueue.Front()
10✔
230
                        q.pendingQueue.Remove(next)
10✔
231

10✔
232
                        //nolint:forcetypeassert
10✔
233
                        task := next.Value.(*backupTask)
10✔
234

10✔
235
                        if unAckedUpdates[task.id] {
14✔
236
                                continue
4✔
237
                        }
238

239
                        err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
6✔
240
                        if err != nil {
6✔
241
                                log.Errorf("could not re-queue backup task: "+
×
242
                                        "%v", err)
×
243
                                continue
×
244
                        }
245
                }
246
                q.queueCond.L.Unlock()
83✔
247

83✔
248
                q.log.Debugf("SessionQueue(%s) stopped", q.ID())
83✔
249
        })
250

251
        return returnErr
83✔
252
}
253

254
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
255
// identify this a particular queue.
256
func (q *sessionQueue) ID() *wtdb.SessionID {
4,277✔
257
        return &q.cfg.ClientSession.ID
4,277✔
258
}
4,277✔
259

260
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
261
// tower. The session will only be accepted if the queue is not already
262
// exhausted or shutting down and the task is successfully bound to the
263
// ClientSession.
264
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
480✔
265
        // Exit early if the queue has started shutting down.
480✔
266
        select {
480✔
267
        case <-q.quit:
×
268
                return sessionQueueShuttingDown, false
×
269
        default:
480✔
270
        }
271

272
        q.queueCond.L.Lock()
480✔
273

480✔
274
        // There is a chance that sessionQueue started shutting down between
480✔
275
        // the last quit channel check and waiting for the lock. So check one
480✔
276
        // more time here.
480✔
277
        select {
480✔
278
        case <-q.quit:
×
279
                q.queueCond.L.Unlock()
×
280
                return sessionQueueShuttingDown, false
×
281
        default:
480✔
282
        }
283

284
        numPending := uint32(q.pendingQueue.Len())
480✔
285
        maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
480✔
286
        q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
480✔
287
                "pending=%d max-updates=%d",
480✔
288
                q.ID(), task.id, q.seqNum, numPending, maxUpdates)
480✔
289

480✔
290
        // Examine the current reserve status of the session queue.
480✔
291
        curStatus := q.status()
480✔
292

480✔
293
        switch curStatus {
480✔
294

295
        // The session queue is exhausted, and cannot accept the task because it
296
        // is full. Reject the task such that it can be tried against a
297
        // different session.
298
        case sessionQueueExhausted:
1✔
299
                q.queueCond.L.Unlock()
1✔
300
                return curStatus, false
1✔
301

302
        // The session queue is not exhausted. Compute the sweep and reward
303
        // outputs as a function of the session parameters. If the outputs are
304
        // dusty or uneconomical to backup, the task is rejected and will not be
305
        // tried again.
306
        //
307
        // TODO(conner): queue backups and retry with different session params.
308
        case sessionQueueAvailable:
479✔
309
                err := task.bindSession(
479✔
310
                        &q.cfg.ClientSession.ClientSessionBody,
479✔
311
                        q.cfg.BuildBreachRetribution,
479✔
312
                )
479✔
313
                if err != nil {
484✔
314
                        q.queueCond.L.Unlock()
5✔
315
                        q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
5✔
316
                                q.ID(), task.id, err)
5✔
317
                        return curStatus, false
5✔
318
                }
5✔
319
        }
320

321
        // The sweep and reward outputs satisfy the session's policy, queue the
322
        // task for final signing and delivery.
323
        q.pendingQueue.PushBack(task)
474✔
324

474✔
325
        // Finally, compute the session's *new* reserve status. This will be
474✔
326
        // used by the client to determine if it can continue using this session
474✔
327
        // queue, or if it should negotiate a new one.
474✔
328
        newStatus := q.status()
474✔
329
        q.queueCond.L.Unlock()
474✔
330

474✔
331
        q.queueCond.Signal()
474✔
332

474✔
333
        return newStatus, true
474✔
334
}
335

336
// sessionManager is the primary event loop for the sessionQueue, and is
337
// responsible for encrypting and sending accepted tasks to the tower.
338
func (q *sessionQueue) sessionManager() {
83✔
339
        defer q.wg.Done()
83✔
340

83✔
341
        for {
493✔
342
                q.queueCond.L.Lock()
410✔
343
                for q.commitQueue.Len() == 0 &&
410✔
344
                        q.pendingQueue.Len() == 0 {
516✔
345

106✔
346
                        q.queueCond.Wait()
106✔
347

106✔
348
                        select {
106✔
349
                        case <-q.quit:
76✔
350
                                q.queueCond.L.Unlock()
76✔
351
                                return
76✔
352
                        default:
30✔
353
                        }
354
                }
355
                q.queueCond.L.Unlock()
334✔
356

334✔
357
                // Exit immediately if the sessionQueue has been stopped.
334✔
358
                select {
334✔
359
                case <-q.quit:
7✔
360
                        return
7✔
361
                default:
327✔
362
                }
363

364
                // Initiate a new connection to the watchtower and attempt to
365
                // drain all pending tasks.
366
                q.drainBackups()
327✔
367
        }
368
}
369

370
// drainBackups attempts to send all pending updates in the queue to the tower.
371
func (q *sessionQueue) drainBackups() {
327✔
372
        var (
327✔
373
                conn      wtserver.Peer
327✔
374
                err       error
327✔
375
                towerAddr = q.tower.Addresses.Peek()
327✔
376
        )
327✔
377

327✔
378
        for {
654✔
379
                q.log.Infof("SessionQueue(%s) attempting to dial tower at %v",
327✔
380
                        q.ID(), towerAddr)
327✔
381

327✔
382
                // First, check that we are able to dial this session's tower.
327✔
383
                conn, err = q.cfg.Dial(
327✔
384
                        q.cfg.ClientSession.SessionKeyECDH, &lnwire.NetAddress{
327✔
385
                                IdentityKey: q.tower.IdentityKey,
327✔
386
                                Address:     towerAddr,
327✔
387
                        },
327✔
388
                )
327✔
389
                if err != nil {
334✔
390
                        // If there are more addrs available, immediately try
7✔
391
                        // those.
7✔
392
                        nextAddr, iteratorErr := q.tower.Addresses.Next()
7✔
393
                        if iteratorErr == nil {
7✔
394
                                towerAddr = nextAddr
×
395
                                continue
×
396
                        }
397

398
                        // Otherwise, if we have exhausted the address list,
399
                        // back off and try again later.
400
                        q.tower.Addresses.Reset()
7✔
401

7✔
402
                        q.log.Errorf("SessionQueue(%s) unable to dial tower "+
7✔
403
                                "at any available Addresses: %v", q.ID(), err)
7✔
404

7✔
405
                        q.increaseBackoff()
7✔
406
                        select {
7✔
407
                        case <-time.After(q.retryBackoff):
6✔
408
                        case <-q.quit:
1✔
409
                        }
410
                        return
7✔
411
                }
412

413
                break
320✔
414
        }
415
        defer conn.Close()
320✔
416

320✔
417
        // Begin draining the queue of pending state updates. Before the first
320✔
418
        // update is sent, we will precede it with an Init message. If the first
320✔
419
        // is successful, subsequent updates can be streamed without sending an
320✔
420
        // Init.
320✔
421
        for sendInit := true; ; sendInit = false {
1,002✔
422
                // Generate the next state update to upload to the tower. This
682✔
423
                // method will first proceed in dequeuing committed updates
682✔
424
                // before attempting to dequeue any pending updates.
682✔
425
                stateUpdate, isPending, backupID, err := q.nextStateUpdate()
682✔
426
                if err != nil {
682✔
427
                        q.log.Errorf("SessionQueue(%v) unable to get next "+
×
428
                                "state update: %v", q.ID(), err)
×
429
                        return
×
430
                }
×
431

432
                // Now, send the state update to the tower and wait for a reply.
433
                err = q.sendStateUpdate(conn, stateUpdate, sendInit, isPending)
682✔
434
                if err != nil {
900✔
435
                        q.log.Errorf("SessionQueue(%s) unable to send state "+
218✔
436
                                "update: %v", q.ID(), err)
218✔
437

218✔
438
                        q.increaseBackoff()
218✔
439
                        select {
218✔
440
                        case <-time.After(q.retryBackoff):
212✔
441
                        case <-q.quit:
6✔
442
                        }
443
                        return
218✔
444
                }
445

446
                q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
464✔
447
                        q.ID(), backupID, stateUpdate.SeqNum)
464✔
448

464✔
449
                // If the last task was backed up successfully, we'll exit and
464✔
450
                // continue once more tasks are added to the queue. We'll also
464✔
451
                // clear any accumulated backoff as this batch was able to be
464✔
452
                // sent reliably.
464✔
453
                if stateUpdate.IsComplete == 1 {
566✔
454
                        q.resetBackoff()
102✔
455
                        return
102✔
456
                }
102✔
457

458
                // Always apply a small delay between sends, which makes the
459
                // unit tests more reliable. If we were requested to back off,
460
                // when we will do so.
461
                select {
362✔
462
                case <-time.After(time.Millisecond):
362✔
463
                case <-q.quit:
×
464
                        return
×
465
                }
466
        }
467
}
468

469
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
470
// If any committed updates are present, this method will reconstruct the state
471
// update from the committed update using the current last applied value found
472
// in the database. Otherwise, it will select the next pending update, craft the
473
// payload, and commit an update before returning the state update to send. The
474
// boolean value in the response is true if the state update is taken from the
475
// pending queue, allowing the caller to remove the update from either the
476
// commit or pending queue if the update is successfully acked.
477
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
478
        wtdb.BackupID, error) {
682✔
479

682✔
480
        var (
682✔
481
                seqNum    uint16
682✔
482
                update    wtdb.CommittedUpdate
682✔
483
                isLast    bool
682✔
484
                isPending bool
682✔
485
        )
682✔
486

682✔
487
        q.queueCond.L.Lock()
682✔
488
        switch {
682✔
489

490
        // If the commit queue is non-empty, parse the next committed update.
491
        case q.commitQueue.Len() > 0:
2✔
492
                next := q.commitQueue.Front()
2✔
493

2✔
494
                update = next.Value.(wtdb.CommittedUpdate)
2✔
495
                seqNum = update.SeqNum
2✔
496

2✔
497
                // If this is the last item in the commit queue and no items
2✔
498
                // exist in the pending queue, we will use the IsComplete flag
2✔
499
                // in the StateUpdate to signal that the tower can release the
2✔
500
                // connection after replying to free up resources.
2✔
501
                isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
2✔
502
                q.queueCond.L.Unlock()
2✔
503

2✔
504
                q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
2✔
505
                        "update for %v seqnum=%d",
2✔
506
                        q.ID(), update.BackupID, seqNum)
2✔
507

508
        // Otherwise, craft and commit the next update from the pending queue.
509
        default:
680✔
510
                isPending = true
680✔
511

680✔
512
                // Determine the current sequence number to apply for this
680✔
513
                // pending update.
680✔
514
                seqNum = q.seqNum + 1
680✔
515

680✔
516
                // Obtain the next task from the queue.
680✔
517
                next := q.pendingQueue.Front()
680✔
518
                task := next.Value.(*backupTask)
680✔
519

680✔
520
                // If this is the last item in the pending queue, we will use
680✔
521
                // the IsComplete flag in the StateUpdate to signal that the
680✔
522
                // tower can release the connection after replying to free up
680✔
523
                // resources.
680✔
524
                isLast = q.pendingQueue.Len() == 1
680✔
525
                q.queueCond.L.Unlock()
680✔
526

680✔
527
                hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
680✔
528
                if err != nil {
680✔
529
                        // TODO(conner): mark will not send
×
530
                        err := fmt.Errorf("unable to craft session payload: %w",
×
531
                                err)
×
532
                        return nil, false, wtdb.BackupID{}, err
×
533
                }
×
534
                // TODO(conner): special case other obscure errors
535

536
                update = wtdb.CommittedUpdate{
680✔
537
                        SeqNum: seqNum,
680✔
538
                        CommittedUpdateBody: wtdb.CommittedUpdateBody{
680✔
539
                                BackupID:      task.id,
680✔
540
                                Hint:          hint,
680✔
541
                                EncryptedBlob: encBlob,
680✔
542
                        },
680✔
543
                }
680✔
544

680✔
545
                q.log.Debugf("SessionQueue(%s) committing state update "+
680✔
546
                        "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
680✔
547
        }
548

549
        // Before sending the task to the tower, commit the state update
550
        // to disk using the assigned sequence number. If this task has already
551
        // been committed, the call will succeed and only be used for the
552
        // purpose of obtaining the last applied value to send to the tower.
553
        //
554
        // This step ensures that if we crash before receiving an ack that we
555
        // will retransmit the same update. If the tower successfully received
556
        // the update from before, it will reply with an ACK regardless of what
557
        // we send the next time. This step ensures that if we reliably send the
558
        // same update for a given sequence number, to prevent us from thinking
559
        // we backed up a state when we instead backed up another.
560
        lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
682✔
561
        if err != nil {
682✔
562
                // TODO(conner): mark failed/reschedule
×
563
                err := fmt.Errorf("unable to commit state update for "+
×
564
                        "%v seqnum=%d: %v", update.BackupID, seqNum, err)
×
565
                return nil, false, wtdb.BackupID{}, err
×
566
        }
×
567

568
        stateUpdate := &wtwire.StateUpdate{
682✔
569
                SeqNum:        update.SeqNum,
682✔
570
                LastApplied:   lastApplied,
682✔
571
                Hint:          update.Hint,
682✔
572
                EncryptedBlob: update.EncryptedBlob,
682✔
573
        }
682✔
574

682✔
575
        // Set the IsComplete flag if this is the last queued item.
682✔
576
        if isLast {
804✔
577
                stateUpdate.IsComplete = 1
122✔
578
        }
122✔
579

580
        return stateUpdate, isPending, update.BackupID, nil
682✔
581
}
582

583
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
584
// the ACK before returning. If sendInit is true, this method will first send
585
// the localInit message and verify that the tower supports our required feature
586
// bits. And error is returned if any part of the send fails. The boolean return
587
// variable indicates whether we should back off before attempting to send the
588
// next state update.
589
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
590
        stateUpdate *wtwire.StateUpdate, sendInit, isPending bool) error {
682✔
591

682✔
592
        towerAddr := &lnwire.NetAddress{
682✔
593
                IdentityKey: conn.RemotePub(),
682✔
594
                Address:     conn.RemoteAddr(),
682✔
595
        }
682✔
596

682✔
597
        // If this is the first message being sent to the tower, we must send an
682✔
598
        // Init message to establish that server supports the features we
682✔
599
        // require.
682✔
600
        if sendInit {
1,002✔
601
                // Send Init to tower.
320✔
602
                err := q.cfg.SendMessage(conn, q.localInit)
320✔
603
                if err != nil {
320✔
604
                        return err
×
605
                }
×
606

607
                // Receive Init from tower.
608
                remoteMsg, err := q.cfg.ReadMessage(conn)
320✔
609
                if err != nil {
320✔
610
                        return err
×
611
                }
×
612

613
                remoteInit, ok := remoteMsg.(*wtwire.Init)
320✔
614
                if !ok {
320✔
615
                        return fmt.Errorf("watchtower %s responded with %T "+
×
616
                                "to Init", towerAddr, remoteMsg)
×
617
                }
×
618

619
                // Validate Init.
620
                err = q.localInit.CheckRemoteInit(
320✔
621
                        remoteInit, wtwire.FeatureNames,
320✔
622
                )
320✔
623
                if err != nil {
320✔
624
                        return err
×
625
                }
×
626
        }
627

628
        // Send StateUpdate to tower.
629
        err := q.cfg.SendMessage(conn, stateUpdate)
682✔
630
        if err != nil {
682✔
631
                return err
×
632
        }
×
633

634
        // Receive StateUpdate from tower.
635
        remoteMsg, err := q.cfg.ReadMessage(conn)
682✔
636
        if err != nil {
900✔
637
                return err
218✔
638
        }
218✔
639

640
        stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
464✔
641
        if !ok {
464✔
642
                return fmt.Errorf("watchtower %s responded with %T to "+
×
643
                        "StateUpdate", towerAddr, remoteMsg)
×
644
        }
×
645

646
        // Process the reply from the tower.
647
        switch stateUpdateReply.Code {
464✔
648

649
        // The tower reported a successful update, validate the response and
650
        // record the last applied returned.
651
        case wtwire.CodeOK:
464✔
652

653
        // TODO(conner): handle other error cases properly, ban towers, etc.
654
        default:
×
655
                err := fmt.Errorf("received error code %v in "+
×
656
                        "StateUpdateReply for seqnum=%d",
×
657
                        stateUpdateReply.Code, stateUpdate.SeqNum)
×
658
                q.log.Warnf("SessionQueue(%s) unable to upload state update "+
×
659
                        "to tower=%s: %v", q.ID(), towerAddr, err)
×
660
                return err
×
661
        }
662

663
        lastApplied := stateUpdateReply.LastApplied
464✔
664
        err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
464✔
665
        switch {
464✔
666
        case err == wtdb.ErrUnallocatedLastApplied:
×
667
                // TODO(conner): borked watchtower
×
668
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
669
                        stateUpdate.SeqNum, err)
×
670
                q.log.Errorf("SessionQueue(%v) failed to ack update: %v",
×
671
                        q.ID(), err)
×
672
                return err
×
673

674
        case err == wtdb.ErrLastAppliedReversion:
×
675
                // TODO(conner): borked watchtower
×
676
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
677
                        stateUpdate.SeqNum, err)
×
678
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
679
                        q.ID(), err)
×
680
                return err
×
681

682
        case err != nil:
×
683
                err = fmt.Errorf("unable to ack seqnum=%d: %w",
×
684
                        stateUpdate.SeqNum, err)
×
685
                q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
×
686
                        q.ID(), err)
×
687
                return err
×
688
        }
689

690
        q.queueCond.L.Lock()
464✔
691
        if isPending {
928✔
692
                // If a pending update was successfully sent, increment the
464✔
693
                // sequence number and remove the item from the queue. This
464✔
694
                // ensures the total number of backups in the session remains
464✔
695
                // unchanged, which maintains the external view of the session's
464✔
696
                // reserve status.
464✔
697
                q.seqNum++
464✔
698
                q.pendingQueue.Remove(q.pendingQueue.Front())
464✔
699
        } else {
464✔
UNCOV
700
                // Otherwise, simply remove the update from the committed queue.
×
UNCOV
701
                // This has no effect on the queues reserve status since the
×
UNCOV
702
                // update had already been committed.
×
UNCOV
703
                q.commitQueue.Remove(q.commitQueue.Front())
×
UNCOV
704
        }
×
705
        q.queueCond.L.Unlock()
464✔
706

464✔
707
        return nil
464✔
708
}
709

710
// status returns a sessionQueueStatus indicating whether the sessionQueue can
711
// accept another task. sessionQueueAvailable is returned when a task can be
712
// accepted, and sessionQueueExhausted is returned if the all slots in the
713
// session have been allocated.
714
//
715
// NOTE: This method MUST be called with queueCond's exclusive lock held.
716
func (q *sessionQueue) status() sessionQueueStatus {
954✔
717
        numPending := uint32(q.pendingQueue.Len())
954✔
718
        maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
954✔
719

954✔
720
        if uint32(q.seqNum)+numPending < maxUpdates {
1,864✔
721
                return sessionQueueAvailable
910✔
722
        }
910✔
723

724
        return sessionQueueExhausted
44✔
725

726
}
727

728
// resetBackoff returns the connection backoff the minimum configured backoff.
729
func (q *sessionQueue) resetBackoff() {
102✔
730
        q.retryBackoff = q.cfg.MinBackoff
102✔
731
}
102✔
732

733
// increaseBackoff doubles the current connection backoff, clamping to the
734
// configured maximum backoff if it would exceed the limit.
735
func (q *sessionQueue) increaseBackoff() {
225✔
736
        q.retryBackoff *= 2
225✔
737
        if q.retryBackoff > q.cfg.MaxBackoff {
225✔
738
                q.retryBackoff = q.cfg.MaxBackoff
×
739
        }
×
740
}
741

742
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
743
// sessionQueue.
744
type sessionQueueSet struct {
745
        queues map[wtdb.SessionID]*sessionQueue
746
        mu     sync.Mutex
747
}
748

749
// newSessionQueueSet constructs a new sessionQueueSet.
750
func newSessionQueueSet() *sessionQueueSet {
36✔
751
        return &sessionQueueSet{
36✔
752
                queues: make(map[wtdb.SessionID]*sessionQueue),
36✔
753
        }
36✔
754
}
36✔
755

756
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
757
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
83✔
758
        s.mu.Lock()
83✔
759
        defer s.mu.Unlock()
83✔
760

83✔
761
        s.queues[*sessionQueue.ID()] = sessionQueue
83✔
762

83✔
763
        sessionQueue.Start()
83✔
764
}
83✔
765

766
// StopAndRemove stops the given session queue and removes it from the
767
// sessionQueueSet.
768
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID, final bool) error {
17✔
769
        s.mu.Lock()
17✔
770
        defer s.mu.Unlock()
17✔
771

17✔
772
        queue, ok := s.queues[id]
17✔
773
        if !ok {
18✔
774
                return nil
1✔
775
        }
1✔
776

777
        delete(s.queues, id)
16✔
778

16✔
779
        return queue.Stop(final)
16✔
780
}
781

782
// Get fetches and returns the sessionQueue with the given ID.
783
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
84✔
784
        s.mu.Lock()
84✔
785
        defer s.mu.Unlock()
84✔
786

84✔
787
        q, ok := s.queues[id]
84✔
788

84✔
789
        return q, ok
84✔
790
}
84✔
791

792
// ApplyAndWait executes the nil-adic function returned from getApply for each
793
// sessionQueue in the set in parallel, then waits for all of them to finish
794
// before returning to the caller.
795
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
36✔
796
        s.mu.Lock()
36✔
797
        defer s.mu.Unlock()
36✔
798

36✔
799
        var wg sync.WaitGroup
36✔
800
        for _, sessionq := range s.queues {
103✔
801
                wg.Add(1)
67✔
802
                go func(sq *sessionQueue) {
134✔
803
                        defer wg.Done()
67✔
804

67✔
805
                        getApply(sq)()
67✔
806
                }(sessionq)
67✔
807
        }
808
        wg.Wait()
36✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc