• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14193549836

01 Apr 2025 10:40AM UTC coverage: 69.046% (+0.007%) from 69.039%
14193549836

Pull #9665

github

web-flow
Merge e8825f209 into b01f4e514
Pull Request #9665: kvdb: bump etcd libs to v3.5.12

133439 of 193262 relevant lines covered (69.05%)

22119.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.37
/htlcswitch/quiescer.go
1
package htlcswitch
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btclog/v2"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lntypes"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
var (
15
        // ErrInvalidStfu indicates that the Stfu we have received is invalid.
16
        // This can happen in instances where we have not sent Stfu but we have
17
        // received one with the initiator field set to false.
18
        ErrInvalidStfu = fmt.Errorf("stfu received is invalid")
19

20
        // ErrStfuAlreadySent indicates that this channel has already sent an
21
        // Stfu message for this negotiation.
22
        ErrStfuAlreadySent = fmt.Errorf("stfu already sent")
23

24
        // ErrStfuAlreadyRcvd indicates that this channel has already received
25
        // an Stfu message for this negotiation.
26
        ErrStfuAlreadyRcvd = fmt.Errorf("stfu already received")
27

28
        // ErrNoQuiescenceInitiator indicates that the caller has requested the
29
        // quiescence initiator for a channel that is not yet quiescent.
30
        ErrNoQuiescenceInitiator = fmt.Errorf(
31
                "indeterminate quiescence initiator: channel is not quiescent",
32
        )
33

34
        // ErrPendingRemoteUpdates indicates that we have received an Stfu while
35
        // the remote party has issued updates that are not yet bilaterally
36
        // committed.
37
        ErrPendingRemoteUpdates = fmt.Errorf(
38
                "stfu received with pending remote updates",
39
        )
40

41
        // ErrPendingLocalUpdates indicates that we are attempting to send an
42
        // Stfu while we have issued updates that are not yet bilaterally
43
        // committed.
44
        ErrPendingLocalUpdates = fmt.Errorf(
45
                "stfu send attempted with pending local updates",
46
        )
47

48
        // ErrQuiescenceTimeout indicates that the quiescer has been quiesced
49
        // beyond the allotted time.
50
        ErrQuiescenceTimeout = fmt.Errorf(
51
                "quiescence timeout",
52
        )
53
)
54

55
const defaultQuiescenceTimeout = 30 * time.Second
56

57
type StfuReq = fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]]
58

59
// Quiescer is the public interface of the quiescence mechanism. Callers of the
60
// quiescence API should not need any methods besides the ones detailed here.
61
type Quiescer interface {
62
        // IsQuiescent returns true if the state machine has been driven all the
63
        // way to completion. If this returns true, processes that depend on
64
        // channel quiescence may proceed.
65
        IsQuiescent() bool
66

67
        // QuiescenceInitiator determines which ChannelParty is the initiator of
68
        // quiescence for the purposes of downstream protocols. If the channel
69
        // is not currently quiescent, this method will return
70
        // ErrNoDownstreamLeader.
71
        QuiescenceInitiator() fn.Result[lntypes.ChannelParty]
72

73
        // InitStfu instructs the quiescer that we intend to begin a quiescence
74
        // negotiation where we are the initiator. We don't yet send stfu yet
75
        // because we need to wait for the link to give us a valid opportunity
76
        // to do so.
77
        InitStfu(req StfuReq)
78

79
        // RecvStfu is called when we receive an Stfu message from the remote.
80
        RecvStfu(stfu lnwire.Stfu) error
81

82
        // CanRecvUpdates returns true if we haven't yet received an Stfu which
83
        // would mark the end of the remote's ability to send updates.
84
        CanRecvUpdates() bool
85

86
        // CanSendUpdates returns true if we haven't yet sent an Stfu which
87
        // would mark the end of our ability to send updates.
88
        CanSendUpdates() bool
89

90
        // SendOwedStfu sends Stfu if it owes one. It returns an error if the
91
        // state machine is in an invalid state.
92
        SendOwedStfu() error
93

94
        // OnResume accepts a no return closure that will run when the quiescer
95
        // is resumed.
96
        OnResume(hook func())
97

98
        // Resume runs all of the deferred actions that have accumulated while
99
        // the channel has been quiescent and then resets the quiescer state to
100
        // its initial state.
101
        Resume()
102
}
103

104
// QuiescerCfg is a config structure used to initialize a quiescer giving it the
105
// appropriate functionality to interact with the channel state that the
106
// quiescer must syncrhonize with.
107
type QuiescerCfg struct {
108
        // chanID marks what channel we are managing the state machine for. This
109
        // is important because the quiescer needs to know the ChannelID to
110
        // construct the Stfu message.
111
        chanID lnwire.ChannelID
112

113
        // channelInitiator indicates which ChannelParty originally opened the
114
        // channel. This is used to break ties when both sides of the channel
115
        // send Stfu claiming to be the initiator.
116
        channelInitiator lntypes.ChannelParty
117

118
        // sendMsg is a function that can be used to send an Stfu message over
119
        // the wire.
120
        sendMsg func(lnwire.Stfu) error
121

122
        // timeoutDuration is the Duration that we will wait from the moment the
123
        // channel is considered quiescent before we call the onTimeout function
124
        timeoutDuration time.Duration
125

126
        // onTimeout is a function that will be called in the event that the
127
        // Quiescer has not been resumed before the timeout is reached. If
128
        // Quiescer.Resume is called before the timeout has been raeached, then
129
        // onTimeout will not be called until the quiescer reaches a quiescent
130
        // state again.
131
        onTimeout func()
132
}
133

134
// QuiescerLive is a state machine that tracks progression through the
135
// quiescence protocol.
136
type QuiescerLive struct {
137
        cfg QuiescerCfg
138

139
        // log is a quiescer-scoped logging instance.
140
        log btclog.Logger
141

142
        // localInit indicates whether our path through this state machine was
143
        // initiated by our node. This can be true or false independently of
144
        // remoteInit.
145
        localInit bool
146

147
        // remoteInit indicates whether we received Stfu from our peer where the
148
        // message indicated that the remote node believes it was the initiator.
149
        // This can be true or false independently of localInit.
150
        remoteInit bool
151

152
        // sent tracks whether or not we have emitted Stfu for sending.
153
        sent bool
154

155
        // received tracks whether or not we have received Stfu from our peer.
156
        received bool
157

158
        // activeQuiescenceRequest is a possibly None Request that we should
159
        // resolve when we complete quiescence.
160
        activeQuiescenceReq fn.Option[StfuReq]
161

162
        // resumeQueue is a slice of hooks that will be called when the quiescer
163
        // is resumed. These are actions that needed to be deferred while the
164
        // channel was quiescent.
165
        resumeQueue []func()
166

167
        // timeoutTimer is a field that is used to hold onto the timeout job
168
        // when we reach quiescence.
169
        timeoutTimer *time.Timer
170

171
        sync.RWMutex
172
}
173

174
// NewQuiescer creates a new quiescer for the given channel.
175
func NewQuiescer(cfg QuiescerCfg) Quiescer {
232✔
176
        logPrefix := fmt.Sprintf("Quiescer(%v):", cfg.chanID)
232✔
177

232✔
178
        return &QuiescerLive{
232✔
179
                cfg: cfg,
232✔
180
                log: log.WithPrefix(logPrefix),
232✔
181
        }
232✔
182
}
232✔
183

184
// RecvStfu is called when we receive an Stfu message from the remote.
185
func (q *QuiescerLive) RecvStfu(msg lnwire.Stfu) error {
19✔
186
        q.Lock()
19✔
187
        defer q.Unlock()
19✔
188

19✔
189
        return q.recvStfu(msg)
19✔
190
}
19✔
191

192
// recvStfu is called when we receive an Stfu message from the remote.
193
func (q *QuiescerLive) recvStfu(msg lnwire.Stfu) error {
20✔
194
        // At the time of this writing, this check that we have already received
20✔
195
        // an Stfu is not strictly necessary, according to the specification.
20✔
196
        // However, it is fishy if we do and it is unclear how we should handle
20✔
197
        // such a case so we will err on the side of caution.
20✔
198
        if q.received {
21✔
199
                return fmt.Errorf("%w for channel %v", ErrStfuAlreadyRcvd,
1✔
200
                        q.cfg.chanID)
1✔
201
        }
1✔
202

203
        // We need to check that the Stfu we are receiving is valid.
204
        if !q.sent && !msg.Initiator {
19✔
205
                return fmt.Errorf("%w for channel %v", ErrInvalidStfu,
×
206
                        q.cfg.chanID)
×
207
        }
×
208

209
        if !q.canRecvStfu() {
19✔
210
                return fmt.Errorf("%w for channel %v", ErrPendingRemoteUpdates,
×
211
                        q.cfg.chanID)
×
212
        }
×
213

214
        q.received = true
19✔
215

19✔
216
        // If the remote party sets the initiator bit to true then we will
19✔
217
        // remember that they are making a claim to the initiator role. This
19✔
218
        // does not necessarily mean they will get it, though.
19✔
219
        q.remoteInit = msg.Initiator
19✔
220

19✔
221
        // Since we just received an Stfu, we may have a newly quiesced state.
19✔
222
        // If so, we will try to resolve any outstanding StfuReqs.
19✔
223
        q.tryResolveStfuReq()
19✔
224

19✔
225
        if q.isQuiescent() {
25✔
226
                q.startTimeout()
6✔
227
        }
6✔
228

229
        return nil
19✔
230
}
231

232
// MakeStfu is called when we are ready to send an Stfu message. It returns the
233
// Stfu message to be sent.
234
func (q *QuiescerLive) MakeStfu() fn.Result[lnwire.Stfu] {
2✔
235
        q.RLock()
2✔
236
        defer q.RUnlock()
2✔
237

2✔
238
        return q.makeStfu()
2✔
239
}
2✔
240

241
// makeStfu is called when we are ready to send an Stfu message. It returns the
242
// Stfu message to be sent.
243
func (q *QuiescerLive) makeStfu() fn.Result[lnwire.Stfu] {
18✔
244
        if q.sent {
19✔
245
                return fn.Errf[lnwire.Stfu]("%w for channel %v",
1✔
246
                        ErrStfuAlreadySent, q.cfg.chanID)
1✔
247
        }
1✔
248

249
        if !q.canSendStfu() {
17✔
250
                return fn.Errf[lnwire.Stfu]("%w for channel %v",
×
251
                        ErrPendingLocalUpdates, q.cfg.chanID)
×
252
        }
×
253

254
        stfu := lnwire.Stfu{
17✔
255
                ChanID:    q.cfg.chanID,
17✔
256
                Initiator: q.localInit,
17✔
257
        }
17✔
258

17✔
259
        return fn.Ok(stfu)
17✔
260
}
261

262
// OweStfu returns true if we owe the other party an Stfu. We owe the remote an
263
// Stfu when we have received but not yet sent an Stfu, or we are the initiator
264
// but have not yet sent an Stfu.
265
func (q *QuiescerLive) OweStfu() bool {
×
266
        q.RLock()
×
267
        defer q.RUnlock()
×
268

×
269
        return q.oweStfu()
×
270
}
×
271

272
// oweStfu returns true if we owe the other party an Stfu. We owe the remote an
273
// Stfu when we have received but not yet sent an Stfu, or we are the initiator
274
// but have not yet sent an Stfu.
275
func (q *QuiescerLive) oweStfu() bool {
1,092✔
276
        return (q.received || q.localInit) && !q.sent
1,092✔
277
}
1,092✔
278

279
// NeedStfu returns true if the remote owes us an Stfu. They owe us an Stfu when
280
// we have sent but not yet received an Stfu.
281
func (q *QuiescerLive) NeedStfu() bool {
2✔
282
        q.RLock()
2✔
283
        defer q.RUnlock()
2✔
284

2✔
285
        return q.needStfu()
2✔
286
}
2✔
287

288
// needStfu returns true if the remote owes us an Stfu. They owe us an Stfu when
289
// we have sent but not yet received an Stfu.
290
func (q *QuiescerLive) needStfu() bool {
2✔
291
        q.RLock()
2✔
292
        defer q.RUnlock()
2✔
293

2✔
294
        return q.sent && !q.received
2✔
295
}
2✔
296

297
// IsQuiescent returns true if the state machine has been driven all the way to
298
// completion. If this returns true, processes that depend on channel quiescence
299
// may proceed.
300
func (q *QuiescerLive) IsQuiescent() bool {
2✔
301
        q.RLock()
2✔
302
        defer q.RUnlock()
2✔
303

2✔
304
        return q.isQuiescent()
2✔
305
}
2✔
306

307
// isQuiescent returns true if the state machine has been driven all the way to
308
// completion. If this returns true, processes that depend on channel quiescence
309
// may proceed.
310
func (q *QuiescerLive) isQuiescent() bool {
55✔
311
        return q.sent && q.received
55✔
312
}
55✔
313

314
// QuiescenceInitiator determines which ChannelParty is the initiator of
315
// quiescence for the purposes of downstream protocols. If the channel is not
316
// currently quiescent, this method will return ErrNoQuiescenceInitiator.
317
func (q *QuiescerLive) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] {
3✔
318
        q.RLock()
3✔
319
        defer q.RUnlock()
3✔
320

3✔
321
        return q.quiescenceInitiator()
3✔
322
}
3✔
323

324
// quiescenceInitiator determines which ChannelParty is the initiator of
325
// quiescence for the purposes of downstream protocols. If the channel is not
326
// currently quiescent, this method will return ErrNoQuiescenceInitiator.
327
func (q *QuiescerLive) quiescenceInitiator() fn.Result[lntypes.ChannelParty] {
14✔
328
        switch {
14✔
329
        case !q.isQuiescent():
4✔
330
                return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator)
4✔
331

332
        case q.localInit && q.remoteInit:
3✔
333
                // In the case of a tie, the channel initiator wins.
3✔
334
                return fn.Ok(q.cfg.channelInitiator)
3✔
335

336
        case q.localInit:
6✔
337
                return fn.Ok(lntypes.Local)
6✔
338

339
        case q.remoteInit:
1✔
340
                return fn.Ok(lntypes.Remote)
1✔
341
        }
342

343
        // unreachable
344
        return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator)
×
345
}
346

347
// CanSendUpdates returns true if we haven't yet sent an Stfu which would mark
348
// the end of our ability to send updates.
349
func (q *QuiescerLive) CanSendUpdates() bool {
2,799✔
350
        q.RLock()
2,799✔
351
        defer q.RUnlock()
2,799✔
352

2,799✔
353
        return q.canSendUpdates()
2,799✔
354
}
2,799✔
355

356
// canSendUpdates returns true if we haven't yet sent an Stfu which would mark
357
// the end of our ability to send updates.
358
func (q *QuiescerLive) canSendUpdates() bool {
2,799✔
359
        return !q.sent && !q.localInit
2,799✔
360
}
2,799✔
361

362
// CanRecvUpdates returns true if we haven't yet received an Stfu which would
363
// mark the end of the remote's ability to send updates.
364
func (q *QuiescerLive) CanRecvUpdates() bool {
808✔
365
        q.RLock()
808✔
366
        defer q.RUnlock()
808✔
367

808✔
368
        return q.canRecvUpdates()
808✔
369
}
808✔
370

371
// canRecvUpdates returns true if we haven't yet received an Stfu which would
372
// mark the end of the remote's ability to send updates.
373
func (q *QuiescerLive) canRecvUpdates() bool {
808✔
374
        return !q.received
808✔
375
}
808✔
376

377
// CanSendStfu returns true if we can send an Stfu.
378
func (q *QuiescerLive) CanSendStfu(numPendingLocalUpdates uint64) bool {
×
379
        q.RLock()
×
380
        defer q.RUnlock()
×
381

×
382
        return q.canSendStfu()
×
383
}
×
384

385
// canSendStfu returns true if we can send an Stfu.
386
func (q *QuiescerLive) canSendStfu() bool {
30✔
387
        return !q.sent
30✔
388
}
30✔
389

390
// CanRecvStfu returns true if we can receive an Stfu.
391
func (q *QuiescerLive) CanRecvStfu() bool {
×
392
        q.RLock()
×
393
        defer q.RUnlock()
×
394

×
395
        return q.canRecvStfu()
×
396
}
×
397

398
// canRecvStfu returns true if we can receive an Stfu.
399
func (q *QuiescerLive) canRecvStfu() bool {
19✔
400
        return !q.received
19✔
401
}
19✔
402

403
// SendOwedStfu sends Stfu if it owes one. It returns an error if the state
404
// machine is in an invalid state.
405
func (q *QuiescerLive) SendOwedStfu() error {
1,091✔
406
        q.Lock()
1,091✔
407
        defer q.Unlock()
1,091✔
408

1,091✔
409
        return q.sendOwedStfu()
1,091✔
410
}
1,091✔
411

412
// sendOwedStfu sends Stfu if it owes one. It returns an error if the state
413
// machine is in an invalid state.
414
func (q *QuiescerLive) sendOwedStfu() error {
1,092✔
415
        if !q.oweStfu() || !q.canSendStfu() {
2,171✔
416
                return nil
1,079✔
417
        }
1,079✔
418

419
        err := q.makeStfu().Sink(q.cfg.sendMsg)
16✔
420

16✔
421
        if err == nil {
32✔
422
                q.sent = true
16✔
423

16✔
424
                // Since we just sent an Stfu, we may have a newly quiesced
16✔
425
                // state. If so, we will try to resolve any outstanding
16✔
426
                // StfuReqs.
16✔
427
                q.tryResolveStfuReq()
16✔
428

16✔
429
                if q.isQuiescent() {
29✔
430
                        q.startTimeout()
13✔
431
                }
13✔
432
        }
433

434
        return err
16✔
435
}
436

437
// TryResolveStfuReq attempts to resolve the active quiescence request if the
438
// state machine has reached a quiescent state.
439
func (q *QuiescerLive) TryResolveStfuReq() {
×
440
        q.Lock()
×
441
        defer q.Unlock()
×
442

×
443
        q.tryResolveStfuReq()
×
444
}
×
445

446
// tryResolveStfuReq attempts to resolve the active quiescence request if the
447
// state machine has reached a quiescent state.
448
func (q *QuiescerLive) tryResolveStfuReq() {
32✔
449
        q.activeQuiescenceReq.WhenSome(
32✔
450
                func(req StfuReq) {
45✔
451
                        if q.isQuiescent() {
21✔
452
                                req.Resolve(q.quiescenceInitiator())
8✔
453
                                q.activeQuiescenceReq = fn.None[StfuReq]()
8✔
454
                        }
8✔
455
                },
456
        )
457
}
458

459
// InitStfu instructs the quiescer that we intend to begin a quiescence
460
// negotiation where we are the initiator. We don't yet send stfu yet because
461
// we need to wait for the link to give us a valid opportunity to do so.
462
func (q *QuiescerLive) InitStfu(req StfuReq) {
7✔
463
        q.Lock()
7✔
464
        defer q.Unlock()
7✔
465

7✔
466
        q.initStfu(req)
7✔
467
}
7✔
468

469
// initStfu instructs the quiescer that we intend to begin a quiescence
470
// negotiation where we are the initiator. We don't yet send stfu yet because
471
// we need to wait for the link to give us a valid opportunity to do so.
472
func (q *QuiescerLive) initStfu(req StfuReq) {
9✔
473
        if q.localInit {
10✔
474
                req.Resolve(fn.Errf[lntypes.ChannelParty](
1✔
475
                        "quiescence already requested",
1✔
476
                ))
1✔
477

1✔
478
                return
1✔
479
        }
1✔
480

481
        q.localInit = true
8✔
482
        q.activeQuiescenceReq = fn.Some(req)
8✔
483
}
484

485
// OnResume accepts a no return closure that will run when the quiescer is
486
// resumed.
487
func (q *QuiescerLive) OnResume(hook func()) {
2✔
488
        q.Lock()
2✔
489
        defer q.Unlock()
2✔
490

2✔
491
        q.onResume(hook)
2✔
492
}
2✔
493

494
// onResume accepts a no return closure that will run when the quiescer is
495
// resumed.
496
func (q *QuiescerLive) onResume(hook func()) {
2✔
497
        q.resumeQueue = append(q.resumeQueue, hook)
2✔
498
}
2✔
499

500
// Resume runs all of the deferred actions that have accumulated while the
501
// channel has been quiescent and then resets the quiescer state to its initial
502
// state.
503
func (q *QuiescerLive) Resume() {
2✔
504
        q.Lock()
2✔
505
        defer q.Unlock()
2✔
506

2✔
507
        q.resume()
2✔
508
}
2✔
509

510
// resume runs all of the deferred actions that have accumulated while the
511
// channel has been quiescent and then resets the quiescer state to its initial
512
// state.
513
func (q *QuiescerLive) resume() {
2✔
514
        q.log.Debug("quiescence terminated, resuming htlc traffic")
2✔
515

2✔
516
        // since we are resuming we want to cancel the quiescence timeout
2✔
517
        // action.
2✔
518
        q.cancelTimeout()
2✔
519

2✔
520
        for _, hook := range q.resumeQueue {
3✔
521
                hook()
1✔
522
        }
1✔
523
        q.localInit = false
2✔
524
        q.remoteInit = false
2✔
525
        q.sent = false
2✔
526
        q.received = false
2✔
527
        q.resumeQueue = nil
2✔
528
}
529

530
// startTimeout starts the timeout function that fires if the quiescer remains
531
// in a quiesced state for too long. If this function is called multiple times
532
// only the last one will have an effect.
533
func (q *QuiescerLive) startTimeout() {
16✔
534
        if q.cfg.onTimeout == nil {
25✔
535
                return
9✔
536
        }
9✔
537

538
        old := q.timeoutTimer
7✔
539

7✔
540
        q.timeoutTimer = time.AfterFunc(q.cfg.timeoutDuration, q.cfg.onTimeout)
7✔
541

7✔
542
        if old != nil {
7✔
543
                old.Stop()
×
544
        }
×
545
}
546

547
// cancelTimeout cancels the timeout function that would otherwise fire if the
548
// quiescer remains in a quiesced state too long. If this function is called
549
// before startTimeout or after another call to cancelTimeout, the effect will
550
// be a noop.
551
func (q *QuiescerLive) cancelTimeout() {
2✔
552
        if q.timeoutTimer != nil {
3✔
553
                q.timeoutTimer.Stop()
1✔
554
                q.timeoutTimer = nil
1✔
555
        }
1✔
556
}
557

558
type quiescerNoop struct{}
559

560
var _ Quiescer = (*quiescerNoop)(nil)
561

562
func (q *quiescerNoop) InitStfu(req StfuReq) {
×
563
        req.Resolve(fn.Errf[lntypes.ChannelParty]("quiescence not supported"))
×
564
}
×
565
func (q *quiescerNoop) RecvStfu(_ lnwire.Stfu) error { return nil }
×
566
func (q *quiescerNoop) CanRecvUpdates() bool         { return true }
×
567
func (q *quiescerNoop) CanSendUpdates() bool         { return true }
×
568
func (q *quiescerNoop) SendOwedStfu() error          { return nil }
×
569
func (q *quiescerNoop) IsQuiescent() bool            { return false }
×
570
func (q *quiescerNoop) OnResume(hook func())         { hook() }
×
571
func (q *quiescerNoop) Resume()                      {}
×
572
func (q *quiescerNoop) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] {
×
573
        return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator)
×
574
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc