• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14378973393

10 Apr 2025 11:10AM UTC coverage: 68.977%. First build
14378973393

Pull #9691

github

web-flow
Merge 00150d64e into e214b579e
Pull Request #9691: htlcswitch+peer [1/2]: thread context through in preparation for passing to graph DB calls

218 of 244 new or added lines in 14 files covered. (89.34%)

133429 of 193439 relevant lines covered (68.98%)

22122.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.63
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "context"
7
        "errors"
8
        "fmt"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/clock"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/lntypes"
15
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
21
        // a shutdown request.
22
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
23

24
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
25
        // because it already exists in the mailbox.
26
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
27
)
28

29
// MailBox is an interface which represents a concurrent-safe, in-order
30
// delivery queue for messages from the network and also from the main switch.
31
// This struct serves as a buffer between incoming messages, and messages to
32
// the handled by the link. Each of the mutating methods within this interface
33
// should be implemented in a non-blocking manner.
34
type MailBox interface {
35
        // AddMessage appends a new message to the end of the message queue.
36
        AddMessage(msg lnwire.Message) error
37

38
        // AddPacket appends a new message to the end of the packet queue.
39
        AddPacket(pkt *htlcPacket) error
40

41
        // HasPacket queries the packets for a circuit key, this is used to drop
42
        // packets bound for the switch that already have a queued response.
43
        HasPacket(CircuitKey) bool
44

45
        // AckPacket removes a packet from the mailboxes in-memory replay
46
        // buffer. This will prevent a packet from being delivered after a link
47
        // restarts if the switch has remained online. The returned boolean
48
        // indicates whether or not a packet with the passed incoming circuit
49
        // key was removed.
50
        AckPacket(CircuitKey) bool
51

52
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
53
        // removing it from the in-memory replay buffer. This will prevent the
54
        // packet from being delivered after the link restarts if the switch has
55
        // remained online. The generated LinkError will show an
56
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
57
        FailAdd(ctx context.Context, pkt *htlcPacket)
58

59
        // MessageOutBox returns a channel that any new messages ready for
60
        // delivery will be sent on.
61
        MessageOutBox() chan lnwire.Message
62

63
        // PacketOutBox returns a channel that any new packets ready for
64
        // delivery will be sent on.
65
        PacketOutBox() chan *htlcPacket
66

67
        // Clears any pending wire messages from the inbox.
68
        ResetMessages() error
69

70
        // Reset the packet head to point at the first element in the list.
71
        ResetPackets() error
72

73
        // SetDustClosure takes in a closure that is used to evaluate whether
74
        // mailbox HTLC's are dust.
75
        SetDustClosure(isDust dustClosure)
76

77
        // SetFeeRate sets the feerate to be used when evaluating dust.
78
        SetFeeRate(feerate chainfee.SatPerKWeight)
79

80
        // DustPackets returns the dust sum for Adds in the mailbox for the
81
        // local and remote commitments.
82
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
83

84
        // Start starts the mailbox and any goroutines it needs to operate
85
        // properly.
86
        Start(ctx context.Context)
87

88
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
89
        Stop()
90
}
91

92
type mailBoxConfig struct {
93
        // shortChanID is the short channel id of the channel this mailbox
94
        // belongs to.
95
        shortChanID lnwire.ShortChannelID
96

97
        // forwardPackets send a varidic number of htlcPackets to the switch to
98
        // be routed. A quit channel should be provided so that the call can
99
        // properly exit during shutdown.
100
        forwardPackets func(context.Context, <-chan struct{},
101
                ...*htlcPacket) error
102

103
        // clock is a time source for the mailbox.
104
        clock clock.Clock
105

106
        // expiry is the interval after which Adds will be cancelled if they
107
        // have not been yet been delivered. The computed deadline will expiry
108
        // this long after the Adds are added via AddPacket.
109
        expiry time.Duration
110

111
        // failMailboxUpdate is used to fail an expired HTLC and use the
112
        // correct SCID if the underlying channel uses aliases.
113
        failMailboxUpdate func(ctx context.Context, outScid,
114
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
115
}
116

117
// memoryMailBox is an implementation of the MailBox struct backed by purely
118
// in-memory queues.
119
//
120
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
121
type memoryMailBox struct {
122
        started sync.Once
123
        stopped sync.Once
124

125
        cfg *mailBoxConfig
126

127
        wireMessages *list.List
128
        wireMtx      sync.Mutex
129
        wireCond     *sync.Cond
130

131
        messageOutbox chan lnwire.Message
132
        msgReset      chan chan struct{}
133

134
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
135
        repPkts  *list.List
136
        repIndex map[CircuitKey]*list.Element
137
        repHead  *list.Element
138

139
        // addPkts is a dedicated queue for Adds.
140
        addPkts  *list.List
141
        addIndex map[CircuitKey]*list.Element
142
        addHead  *list.Element
143

144
        pktMtx  sync.Mutex
145
        pktCond *sync.Cond
146

147
        pktOutbox chan *htlcPacket
148
        pktReset  chan chan struct{}
149

150
        wireShutdown chan struct{}
151
        pktShutdown  chan struct{}
152

153
        cancel fn.Option[context.CancelFunc]
154
        quit   chan struct{}
155

156
        // feeRate is set when the link receives or sends out fee updates. It
157
        // is refreshed when AttachMailBox is called in case a fee update did
158
        // not get committed. In some cases it may be out of sync with the
159
        // channel's feerate, but it should eventually get back in sync.
160
        feeRate chainfee.SatPerKWeight
161

162
        // isDust is set when AttachMailBox is called and serves to evaluate
163
        // the outstanding dust in the memoryMailBox given the current set
164
        // feeRate.
165
        isDust dustClosure
166
}
167

168
// newMemoryMailBox creates a new instance of the memoryMailBox.
169
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
335✔
170
        box := &memoryMailBox{
335✔
171
                cfg:           cfg,
335✔
172
                wireMessages:  list.New(),
335✔
173
                repPkts:       list.New(),
335✔
174
                addPkts:       list.New(),
335✔
175
                messageOutbox: make(chan lnwire.Message),
335✔
176
                pktOutbox:     make(chan *htlcPacket),
335✔
177
                msgReset:      make(chan chan struct{}, 1),
335✔
178
                pktReset:      make(chan chan struct{}, 1),
335✔
179
                repIndex:      make(map[CircuitKey]*list.Element),
335✔
180
                addIndex:      make(map[CircuitKey]*list.Element),
335✔
181
                wireShutdown:  make(chan struct{}),
335✔
182
                pktShutdown:   make(chan struct{}),
335✔
183
                quit:          make(chan struct{}),
335✔
184
        }
335✔
185
        box.wireCond = sync.NewCond(&box.wireMtx)
335✔
186
        box.pktCond = sync.NewCond(&box.pktMtx)
335✔
187

335✔
188
        return box
335✔
189
}
335✔
190

191
// A compile time assertion to ensure that memoryMailBox meets the MailBox
192
// interface.
193
var _ MailBox = (*memoryMailBox)(nil)
194

195
// courierType is an enum that reflects the distinct types of messages a
196
// MailBox can handle. Each type will be placed in an isolated mail box and
197
// will have a dedicated goroutine for delivering the messages.
198
type courierType uint8
199

200
const (
201
        // wireCourier is a type of courier that handles wire messages.
202
        wireCourier courierType = iota
203

204
        // pktCourier is a type of courier that handles htlc packets.
205
        pktCourier
206
)
207

208
// Start starts the mailbox and any goroutines it needs to operate properly.
209
//
210
// NOTE: This method is part of the MailBox interface.
211
func (m *memoryMailBox) Start(ctx context.Context) {
336✔
212
        m.started.Do(func() {
671✔
213
                ctx, cancel := context.WithCancel(ctx)
335✔
214
                m.cancel = fn.Some(cancel)
335✔
215

335✔
216
                go m.wireMailCourier(ctx)
335✔
217
                go m.pktMailCourier(ctx)
335✔
218
        })
335✔
219
}
220

221
// ResetMessages blocks until all buffered wire messages are cleared.
222
func (m *memoryMailBox) ResetMessages() error {
339✔
223
        msgDone := make(chan struct{})
339✔
224
        select {
339✔
225
        case m.msgReset <- msgDone:
339✔
226
                return m.signalUntilReset(wireCourier, msgDone)
339✔
227
        case <-m.quit:
×
228
                return ErrMailBoxShuttingDown
×
229
        }
230
}
231

232
// ResetPackets blocks until the head of packets buffer is reset, causing the
233
// packets to be redelivered in order.
234
func (m *memoryMailBox) ResetPackets() error {
543✔
235
        pktDone := make(chan struct{})
543✔
236
        select {
543✔
237
        case m.pktReset <- pktDone:
543✔
238
                return m.signalUntilReset(pktCourier, pktDone)
543✔
239
        case <-m.quit:
×
240
                return ErrMailBoxShuttingDown
×
241
        }
242
}
243

244
// signalUntilReset strobes the condition variable for the specified inbox type
245
// until receiving a response that the mailbox has processed a reset.
246
func (m *memoryMailBox) signalUntilReset(cType courierType,
247
        done chan struct{}) error {
880✔
248

880✔
249
        for {
2,080✔
250

1,200✔
251
                switch cType {
1,200✔
252
                case wireCourier:
657✔
253
                        m.wireCond.Signal()
657✔
254
                case pktCourier:
545✔
255
                        m.pktCond.Signal()
545✔
256
                }
257

258
                select {
1,200✔
259
                case <-time.After(time.Millisecond):
322✔
260
                        continue
322✔
261
                case <-done:
878✔
262
                        return nil
878✔
263
                case <-m.quit:
2✔
264
                        return ErrMailBoxShuttingDown
2✔
265
                }
266
        }
267
}
268

269
// AckPacket removes the packet identified by it's incoming circuit key from the
270
// queue of packets to be delivered. The returned boolean indicates whether or
271
// not a packet with the passed incoming circuit key was removed.
272
//
273
// NOTE: It is safe to call this method multiple times for the same circuit key.
274
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
598✔
275
        m.pktCond.L.Lock()
598✔
276
        defer m.pktCond.L.Unlock()
598✔
277

598✔
278
        if entry, ok := m.repIndex[inKey]; ok {
658✔
279
                // Check whether we are removing the head of the queue. If so,
60✔
280
                // we must advance the head to the next packet before removing.
60✔
281
                // It's possible that the courier has already advanced the
60✔
282
                // repHead, so this check prevents the repHead from getting
60✔
283
                // desynchronized.
60✔
284
                if entry == m.repHead {
66✔
285
                        m.repHead = entry.Next()
6✔
286
                }
6✔
287
                m.repPkts.Remove(entry)
60✔
288
                delete(m.repIndex, inKey)
60✔
289

60✔
290
                return true
60✔
291
        }
292

293
        if entry, ok := m.addIndex[inKey]; ok {
1,071✔
294
                // Check whether we are removing the head of the queue. If so,
531✔
295
                // we must advance the head to the next add before removing.
531✔
296
                // It's possible that the courier has already advanced the
531✔
297
                // addHead, so this check prevents the addHead from getting
531✔
298
                // desynchronized.
531✔
299
                //
531✔
300
                // NOTE: While this event is rare for Settles or Fails, it could
531✔
301
                // be very common for Adds since the mailbox has the ability to
531✔
302
                // cancel Adds before they are delivered. When that occurs, the
531✔
303
                // head of addPkts has only been peeked and we expect to be
531✔
304
                // removing the head of the queue.
531✔
305
                if entry == m.addHead {
570✔
306
                        m.addHead = entry.Next()
39✔
307
                }
39✔
308

309
                m.addPkts.Remove(entry)
531✔
310
                delete(m.addIndex, inKey)
531✔
311

531✔
312
                return true
531✔
313
        }
314

315
        return false
9✔
316
}
317

318
// HasPacket queries the packets for a circuit key, this is used to drop packets
319
// bound for the switch that already have a queued response.
320
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
578✔
321
        m.pktCond.L.Lock()
578✔
322
        _, ok := m.repIndex[inKey]
578✔
323
        m.pktCond.L.Unlock()
578✔
324

578✔
325
        return ok
578✔
326
}
578✔
327

328
// Stop signals the mailbox and its goroutines for a graceful shutdown.
329
//
330
// NOTE: This method is part of the MailBox interface.
331
func (m *memoryMailBox) Stop() {
312✔
332
        m.stopped.Do(func() {
623✔
333
                m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
622✔
334
                close(m.quit)
311✔
335

311✔
336
                m.signalUntilShutdown(wireCourier)
311✔
337
                m.signalUntilShutdown(pktCourier)
311✔
338
        })
339
}
340

341
// signalUntilShutdown strobes the condition variable of the passed courier
342
// type, blocking until the worker has exited.
343
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
620✔
344
        var (
620✔
345
                cond     *sync.Cond
620✔
346
                shutdown chan struct{}
620✔
347
        )
620✔
348

620✔
349
        switch cType {
620✔
350
        case wireCourier:
311✔
351
                cond = m.wireCond
311✔
352
                shutdown = m.wireShutdown
311✔
353
        case pktCourier:
311✔
354
                cond = m.pktCond
311✔
355
                shutdown = m.pktShutdown
311✔
356
        }
357

358
        for {
1,848✔
359
                select {
1,228✔
360
                case <-time.After(time.Millisecond):
610✔
361
                        cond.Signal()
610✔
362
                case <-shutdown:
620✔
363
                        return
620✔
364
                }
365
        }
366
}
367

368
// pktWithExpiry wraps an incoming packet and records the time at which it it
369
// should be canceled from the mailbox. This will be used to detect if it gets
370
// stuck in the mailbox and inform when to cancel back.
371
type pktWithExpiry struct {
372
        pkt    *htlcPacket
373
        expiry time.Time
374
}
375

376
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
606✔
377
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
606✔
378
}
606✔
379

380
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
381
// wire messages.
382
func (m *memoryMailBox) wireMailCourier(ctx context.Context) {
335✔
383
        defer close(m.wireShutdown)
335✔
384

335✔
385
        for {
4,034✔
386
                // First, we'll check our condition. If our mailbox is empty,
3,699✔
387
                // then we'll wait until a new item is added.
3,699✔
388
                m.wireCond.L.Lock()
3,699✔
389
                for m.wireMessages.Front() == nil {
7,548✔
390
                        m.wireCond.Wait()
3,849✔
391

3,849✔
392
                        select {
3,849✔
393
                        case msgDone := <-m.msgReset:
338✔
394
                                m.wireMessages.Init()
338✔
395
                                close(msgDone)
338✔
396
                        case <-m.quit:
164✔
397
                                m.wireCond.L.Unlock()
164✔
398
                                return
164✔
399
                        case <-ctx.Done():
149✔
400
                                m.wireCond.L.Unlock()
149✔
401
                                return
149✔
402
                        default:
3,180✔
403
                        }
404
                }
405

406
                // Grab the datum off the front of the queue, shifting the
407
                // slice's reference down one in order to remove the datum from
408
                // the queue.
409
                entry := m.wireMessages.Front()
3,366✔
410

3,366✔
411
                //nolint:forcetypeassert
3,366✔
412
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3,366✔
413

3,366✔
414
                // Now that we're done with the condition, we can unlock it to
3,366✔
415
                // allow any callers to append to the end of our target queue.
3,366✔
416
                m.wireCond.L.Unlock()
3,366✔
417

3,366✔
418
                // With the next message obtained, we'll now select to attempt
3,366✔
419
                // to deliver the message. If we receive a kill signal, then
3,366✔
420
                // we'll bail out.
3,366✔
421
                select {
3,366✔
422
                case m.messageOutbox <- nextMsg:
3,366✔
423
                case msgDone := <-m.msgReset:
×
424
                        m.wireCond.L.Lock()
×
425
                        m.wireMessages.Init()
×
426
                        m.wireCond.L.Unlock()
×
427

×
428
                        close(msgDone)
×
429
                case <-m.quit:
×
430
                        return
×
NEW
431
                case <-ctx.Done():
×
NEW
432
                        return
×
433
                }
434
        }
435
}
436

437
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
438
// packet messages.
439
func (m *memoryMailBox) pktMailCourier(ctx context.Context) {
335✔
440
        defer close(m.pktShutdown)
335✔
441

335✔
442
        for {
1,355✔
443
                // First, we'll check our condition. If our mailbox is empty,
1,020✔
444
                // then we'll wait until a new item is added.
1,020✔
445
                m.pktCond.L.Lock()
1,020✔
446
                for m.repHead == nil && m.addHead == nil {
2,385✔
447
                        m.pktCond.Wait()
1,365✔
448

1,365✔
449
                        select {
1,365✔
450
                        // Resetting the packet queue means just moving our
451
                        // pointer to the front. This ensures that any un-ACK'd
452
                        // messages are re-delivered upon reconnect.
453
                        case pktDone := <-m.pktReset:
536✔
454
                                m.repHead = m.repPkts.Front()
536✔
455
                                m.addHead = m.addPkts.Front()
536✔
456

536✔
457
                                close(pktDone)
536✔
458

459
                        case <-m.quit:
145✔
460
                                m.pktCond.L.Unlock()
145✔
461
                                return
145✔
462

463
                        case <-ctx.Done():
163✔
464
                                m.pktCond.L.Unlock()
163✔
465
                                return
163✔
466

467
                        default:
503✔
468
                        }
469
                }
470

471
                var (
692✔
472
                        nextRep   *htlcPacket
692✔
473
                        nextRepEl *list.Element
692✔
474
                        nextAdd   *pktWithExpiry
692✔
475
                        nextAddEl *list.Element
692✔
476
                )
692✔
477
                // For packets, we actually never remove an item until it has
692✔
478
                // been ACK'd by the link. This ensures that if a read packet
692✔
479
                // doesn't make it into a commitment, then it'll be
692✔
480
                // re-delivered once the link comes back online.
692✔
481

692✔
482
                // Peek at the head of the Settle/Fails and Add queues. We peak
692✔
483
                // both even if there is a Settle/Fail present because we need
692✔
484
                // to set a deadline for the next pending Add if it's present.
692✔
485
                // Due to clock monotonicity, we know that the head of the Adds
692✔
486
                // is the next to expire.
692✔
487
                if m.repHead != nil {
784✔
488
                        //nolint:forcetypeassert
92✔
489
                        nextRep = m.repHead.Value.(*htlcPacket)
92✔
490
                        nextRepEl = m.repHead
92✔
491
                }
92✔
492
                if m.addHead != nil {
1,298✔
493
                        //nolint:forcetypeassert
606✔
494
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
606✔
495
                        nextAddEl = m.addHead
606✔
496
                }
606✔
497

498
                // Now that we're done with the condition, we can unlock it to
499
                // allow any callers to append to the end of our target queue.
500
                m.pktCond.L.Unlock()
692✔
501

692✔
502
                var (
692✔
503
                        pktOutbox chan *htlcPacket
692✔
504
                        addOutbox chan *htlcPacket
692✔
505
                        add       *htlcPacket
692✔
506
                        deadline  <-chan time.Time
692✔
507
                )
692✔
508

692✔
509
                // Prioritize delivery of Settle/Fail packets over Adds. This
692✔
510
                // ensures that we actively clear the commitment of existing
692✔
511
                // HTLCs before trying to add new ones. This can help to improve
692✔
512
                // forwarding performance since the time to sign a commitment is
692✔
513
                // linear in the number of HTLCs manifested on the commitments.
692✔
514
                //
692✔
515
                // NOTE: Both types are eventually delivered over the same
692✔
516
                // channel, but we can control which is delivered by exclusively
692✔
517
                // making one nil and the other non-nil. We know from our loop
692✔
518
                // condition that at least one nextRep and nextAdd are non-nil.
692✔
519
                if nextRep != nil {
784✔
520
                        pktOutbox = m.pktOutbox
92✔
521
                } else {
694✔
522
                        addOutbox = m.pktOutbox
602✔
523
                }
602✔
524

525
                // If we have a pending Add, we'll also construct the deadline
526
                // so we can fail it back if we are unable to deliver any
527
                // message in time. We also dereference the nextAdd's packet,
528
                // since we will need access to it in the case we are delivering
529
                // it and/or if the deadline expires.
530
                //
531
                // NOTE: It's possible after this point for add to be nil, but
532
                // this can only occur when addOutbox is also nil, hence we
533
                // won't accidentally deliver a nil packet.
534
                if nextAdd != nil {
1,298✔
535
                        add = nextAdd.pkt
606✔
536
                        deadline = nextAdd.deadline(m.cfg.clock)
606✔
537
                }
606✔
538

539
                select {
692✔
540
                case pktOutbox <- nextRep:
90✔
541
                        m.pktCond.L.Lock()
90✔
542
                        // Only advance the repHead if this Settle or Fail is
90✔
543
                        // still at the head of the queue.
90✔
544
                        if m.repHead != nil && m.repHead == nextRepEl {
174✔
545
                                m.repHead = m.repHead.Next()
84✔
546
                        }
84✔
547
                        m.pktCond.L.Unlock()
90✔
548

549
                case addOutbox <- add:
557✔
550
                        m.pktCond.L.Lock()
557✔
551
                        // Only advance the addHead if this Add is still at the
557✔
552
                        // head of the queue.
557✔
553
                        if m.addHead != nil && m.addHead == nextAddEl {
1,111✔
554
                                m.addHead = m.addHead.Next()
554✔
555
                        }
554✔
556
                        m.pktCond.L.Unlock()
557✔
557

558
                case <-deadline:
36✔
559
                        log.Debugf("Expiring add htlc with "+
36✔
560
                                "keystone=%v", add.keystone())
36✔
561
                        m.FailAdd(ctx, add)
36✔
562

563
                case pktDone := <-m.pktReset:
8✔
564
                        m.pktCond.L.Lock()
8✔
565
                        m.repHead = m.repPkts.Front()
8✔
566
                        m.addHead = m.addPkts.Front()
8✔
567
                        m.pktCond.L.Unlock()
8✔
568

8✔
569
                        close(pktDone)
8✔
570

571
                case <-m.quit:
×
572
                        return
×
573

574
                case <-ctx.Done():
7✔
575
                        return
7✔
576
                }
577
        }
578
}
579

580
// AddMessage appends a new message to the end of the message queue.
581
//
582
// NOTE: This method is safe for concrete use and part of the MailBox
583
// interface.
584
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3,366✔
585
        // First, we'll lock the condition, and add the message to the end of
3,366✔
586
        // the wire message inbox.
3,366✔
587
        m.wireCond.L.Lock()
3,366✔
588
        m.wireMessages.PushBack(msg)
3,366✔
589
        m.wireCond.L.Unlock()
3,366✔
590

3,366✔
591
        // With the message added, we signal to the mailCourier that there are
3,366✔
592
        // additional messages to deliver.
3,366✔
593
        m.wireCond.Signal()
3,366✔
594

3,366✔
595
        return nil
3,366✔
596
}
3,366✔
597

598
// AddPacket appends a new message to the end of the packet queue.
599
//
600
// NOTE: This method is safe for concrete use and part of the MailBox
601
// interface.
602
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
2,098✔
603
        m.pktCond.L.Lock()
2,098✔
604
        switch htlc := pkt.htlc.(type) {
2,098✔
605
        // Split off Settle/Fail packets into the repPkts queue.
606
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
94✔
607
                if _, ok := m.repIndex[pkt.inKey()]; ok {
96✔
608
                        m.pktCond.L.Unlock()
2✔
609
                        return ErrPacketAlreadyExists
2✔
610
                }
2✔
611

612
                entry := m.repPkts.PushBack(pkt)
92✔
613
                m.repIndex[pkt.inKey()] = entry
92✔
614
                if m.repHead == nil {
181✔
615
                        m.repHead = entry
89✔
616
                }
89✔
617

618
        // Split off Add packets into the addPkts queue.
619
        case *lnwire.UpdateAddHTLC:
2,006✔
620
                if _, ok := m.addIndex[pkt.inKey()]; ok {
2,007✔
621
                        m.pktCond.L.Unlock()
1✔
622
                        return ErrPacketAlreadyExists
1✔
623
                }
1✔
624

625
                entry := m.addPkts.PushBack(&pktWithExpiry{
2,005✔
626
                        pkt:    pkt,
2,005✔
627
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
2,005✔
628
                })
2,005✔
629
                m.addIndex[pkt.inKey()] = entry
2,005✔
630
                if m.addHead == nil {
2,436✔
631
                        m.addHead = entry
431✔
632
                }
431✔
633

634
        default:
×
635
                m.pktCond.L.Unlock()
×
636
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
637
        }
638
        m.pktCond.L.Unlock()
2,095✔
639

2,095✔
640
        // With the packet added, we signal to the mailCourier that there are
2,095✔
641
        // additional packets to consume.
2,095✔
642
        m.pktCond.Signal()
2,095✔
643

2,095✔
644
        return nil
2,095✔
645
}
646

647
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
648
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
223✔
649
        m.pktCond.L.Lock()
223✔
650
        defer m.pktCond.L.Unlock()
223✔
651

223✔
652
        m.feeRate = feeRate
223✔
653
}
223✔
654

655
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
656
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
339✔
657
        m.pktCond.L.Lock()
339✔
658
        defer m.pktCond.L.Unlock()
339✔
659

339✔
660
        m.isDust = isDust
339✔
661
}
339✔
662

663
// DustPackets returns the dust sum for add packets in the mailbox. The first
664
// return value is the local dust sum and the second is the remote dust sum.
665
// This will keep track of a given dust HTLC from the time it is added via
666
// AddPacket until it is removed via AckPacket.
667
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
668
        lnwire.MilliSatoshi) {
415✔
669

415✔
670
        m.pktCond.L.Lock()
415✔
671
        defer m.pktCond.L.Unlock()
415✔
672

415✔
673
        var (
415✔
674
                localDustSum  lnwire.MilliSatoshi
415✔
675
                remoteDustSum lnwire.MilliSatoshi
415✔
676
        )
415✔
677

415✔
678
        // Run through the map of HTLC's and determine the dust sum with calls
415✔
679
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
415✔
680
        // are outgoing so the second argument to isDust will be false.
415✔
681
        for _, e := range m.addIndex {
3,816✔
682
                addPkt := e.Value.(*pktWithExpiry).pkt
3,401✔
683

3,401✔
684
                // Evaluate whether this HTLC is dust on the local commitment.
3,401✔
685
                if m.isDust(
3,401✔
686
                        m.feeRate, false, lntypes.Local,
3,401✔
687
                        addPkt.amount.ToSatoshis(),
3,401✔
688
                ) {
6,796✔
689

3,395✔
690
                        localDustSum += addPkt.amount
3,395✔
691
                }
3,395✔
692

693
                // Evaluate whether this HTLC is dust on the remote commitment.
694
                if m.isDust(
3,401✔
695
                        m.feeRate, false, lntypes.Remote,
3,401✔
696
                        addPkt.amount.ToSatoshis(),
3,401✔
697
                ) {
6,802✔
698

3,401✔
699
                        remoteDustSum += addPkt.amount
3,401✔
700
                }
3,401✔
701
        }
702

703
        return localDustSum, remoteDustSum
415✔
704
}
705

706
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
707
// from the in-memory replay buffer. This will prevent the packet from being
708
// delivered after the link restarts if the switch has remained online. The
709
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
710
// FailureDetail.
711
func (m *memoryMailBox) FailAdd(ctx context.Context, pkt *htlcPacket) {
53✔
712
        // First, remove the packet from mailbox. If we didn't find the packet
53✔
713
        // because it has already been acked, we'll exit early to avoid sending
53✔
714
        // a duplicate fail message through the switch.
53✔
715
        if !m.AckPacket(pkt.inKey()) {
58✔
716
                return
5✔
717
        }
5✔
718

719
        var (
48✔
720
                localFailure = false
48✔
721
                reason       lnwire.OpaqueReason
48✔
722
        )
48✔
723

48✔
724
        // Create a temporary channel failure which we will send back to our
48✔
725
        // peer if this is a forward, or report to the user if the failed
48✔
726
        // payment was locally initiated.
48✔
727
        failure := m.cfg.failMailboxUpdate(
48✔
728
                ctx, pkt.originalOutgoingChanID, m.cfg.shortChanID,
48✔
729
        )
48✔
730

48✔
731
        // If the payment was locally initiated (which is indicated by a nil
48✔
732
        // obfuscator), we do not need to encrypt it back to the sender.
48✔
733
        if pkt.obfuscator == nil {
89✔
734
                var b bytes.Buffer
41✔
735
                err := lnwire.EncodeFailure(&b, failure, 0)
41✔
736
                if err != nil {
41✔
737
                        log.Errorf("Unable to encode failure: %v", err)
×
738
                        return
×
739
                }
×
740
                reason = lnwire.OpaqueReason(b.Bytes())
41✔
741
                localFailure = true
41✔
742
        } else {
7✔
743
                // If the packet is part of a forward, (identified by a non-nil
7✔
744
                // obfuscator) we need to encrypt the error back to the source.
7✔
745
                var err error
7✔
746
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
7✔
747
                if err != nil {
7✔
748
                        log.Errorf("Unable to obfuscate error: %v", err)
×
749
                        return
×
750
                }
×
751
        }
752

753
        // Create a link error containing the temporary channel failure and a
754
        // detail which indicates the we failed to add the htlc.
755
        linkError := NewDetailedLinkError(
48✔
756
                failure, OutgoingFailureDownstreamHtlcAdd,
48✔
757
        )
48✔
758

48✔
759
        failPkt := &htlcPacket{
48✔
760
                incomingChanID: pkt.incomingChanID,
48✔
761
                incomingHTLCID: pkt.incomingHTLCID,
48✔
762
                circuit:        pkt.circuit,
48✔
763
                sourceRef:      pkt.sourceRef,
48✔
764
                hasSource:      true,
48✔
765
                localFailure:   localFailure,
48✔
766
                obfuscator:     pkt.obfuscator,
48✔
767
                linkFailure:    linkError,
48✔
768
                htlc: &lnwire.UpdateFailHTLC{
48✔
769
                        Reason: reason,
48✔
770
                },
48✔
771
        }
48✔
772

48✔
773
        if err := m.cfg.forwardPackets(ctx, m.quit, failPkt); err != nil {
48✔
774
                log.Errorf("Unhandled error while reforwarding packets "+
×
775
                        "settle/fail over htlcswitch: %v", err)
×
776
        }
×
777
}
778

779
// MessageOutBox returns a channel that any new messages ready for delivery
780
// will be sent on.
781
//
782
// NOTE: This method is part of the MailBox interface.
783
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
235✔
784
        return m.messageOutbox
235✔
785
}
235✔
786

787
// PacketOutBox returns a channel that any new packets ready for delivery will
788
// be sent on.
789
//
790
// NOTE: This method is part of the MailBox interface.
791
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
386✔
792
        return m.pktOutbox
386✔
793
}
386✔
794

795
// mailOrchestrator is responsible for coordinating the creation and lifecycle
796
// of mailboxes used within the switch. It supports the ability to create
797
// mailboxes, reassign their short channel id's, deliver htlc packets, and
798
// queue packets for mailboxes that have not been created due to a link's late
799
// registration.
800
type mailOrchestrator struct {
801
        mu sync.RWMutex
802

803
        cfg *mailOrchConfig
804

805
        // mailboxes caches exactly one mailbox for all known channels.
806
        mailboxes map[lnwire.ChannelID]MailBox
807

808
        // liveIndex maps a live short chan id to the primary mailbox key.
809
        // An index in liveIndex map is only entered under two conditions:
810
        //   1. A link has a non-zero short channel id at time of AddLink.
811
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
812
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
813

814
        // TODO(conner): add another pair of indexes:
815
        //   chan_id -> short_chan_id
816
        //   short_chan_id -> mailbox
817
        // so that Deliver can lookup mailbox directly once live,
818
        // but still queryable by channel_id.
819

820
        // unclaimedPackets maps a live short chan id to queue of packets if no
821
        // mailbox has been created.
822
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
823
}
824

825
type mailOrchConfig struct {
826
        // forwardPackets send a varidic number of htlcPackets to the switch to
827
        // be routed. A quit channel should be provided so that the call can
828
        // properly exit during shutdown.
829
        forwardPackets func(context.Context, <-chan struct{},
830
                ...*htlcPacket) error
831

832
        // clock is a time source for the generated mailboxes.
833
        clock clock.Clock
834

835
        // expiry is the interval after which Adds will be cancelled if they
836
        // have not been yet been delivered. The computed deadline will expiry
837
        // this long after the Adds are added to a mailbox via AddPacket.
838
        expiry time.Duration
839

840
        // failMailboxUpdate is used to fail an expired HTLC and use the
841
        // correct SCID if the underlying channel uses aliases.
842
        failMailboxUpdate func(ctx context.Context, outScid,
843
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
844
}
845

846
// newMailOrchestrator initializes a fresh mailOrchestrator.
847
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
344✔
848
        return &mailOrchestrator{
344✔
849
                cfg:              cfg,
344✔
850
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
344✔
851
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
344✔
852
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
344✔
853
        }
344✔
854
}
344✔
855

856
// Stop instructs the orchestrator to stop all active mailboxes.
857
func (mo *mailOrchestrator) Stop() {
312✔
858
        for _, mailbox := range mo.mailboxes {
615✔
859
                mailbox.Stop()
303✔
860
        }
303✔
861
}
862

863
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
864
// creates and returns a new mailbox if none is found.
865
func (mo *mailOrchestrator) GetOrCreateMailBox(ctx context.Context,
866
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
742✔
867

742✔
868
        // First, try lookup the mailbox directly using only the shared mutex.
742✔
869
        mo.mu.RLock()
742✔
870
        mailbox, ok := mo.mailboxes[chanID]
742✔
871
        if ok {
1,159✔
872
                mo.mu.RUnlock()
417✔
873
                return mailbox
417✔
874
        }
417✔
875
        mo.mu.RUnlock()
327✔
876

327✔
877
        // Otherwise, we will try again with exclusive lock, creating a mailbox
327✔
878
        // if one still has not been created.
327✔
879
        mo.mu.Lock()
327✔
880
        mailbox = mo.exclusiveGetOrCreateMailBox(ctx, chanID, shortChanID)
327✔
881
        mo.mu.Unlock()
327✔
882

327✔
883
        return mailbox
327✔
884
}
885

886
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
887
// given channel id. If none is found, a new one is creates, started, and
888
// recorded.
889
//
890
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
891
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(ctx context.Context,
892
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
327✔
893

327✔
894
        mailbox, ok := mo.mailboxes[chanID]
327✔
895
        if !ok {
654✔
896
                mailbox = newMemoryMailBox(&mailBoxConfig{
327✔
897
                        shortChanID:       shortChanID,
327✔
898
                        forwardPackets:    mo.cfg.forwardPackets,
327✔
899
                        clock:             mo.cfg.clock,
327✔
900
                        expiry:            mo.cfg.expiry,
327✔
901
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
327✔
902
                })
327✔
903
                mailbox.Start(ctx)
327✔
904
                mo.mailboxes[chanID] = mailbox
327✔
905
        }
327✔
906

907
        return mailbox
327✔
908
}
909

910
// BindLiveShortChanID registers that messages bound for a particular short
911
// channel id should be forwarded to the mailbox corresponding to the given
912
// channel id. This method also checks to see if there are any unclaimed
913
// packets for this short_chan_id. If any are found, they are delivered to the
914
// mailbox and removed (marked as claimed).
915
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
916
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
338✔
917

338✔
918
        mo.mu.Lock()
338✔
919
        // Update the mapping from short channel id to mailbox's channel id.
338✔
920
        mo.liveIndex[sid] = cid
338✔
921

338✔
922
        // Retrieve any unclaimed packets destined for this mailbox.
338✔
923
        pkts := mo.unclaimedPackets[sid]
338✔
924
        delete(mo.unclaimedPackets, sid)
338✔
925
        mo.mu.Unlock()
338✔
926

338✔
927
        // Deliver the unclaimed packets.
338✔
928
        for _, pkt := range pkts {
345✔
929
                mailbox.AddPacket(pkt)
7✔
930
        }
7✔
931
}
932

933
// Deliver lookups the target mailbox using the live index from short_chan_id
934
// to channel_id. If the mailbox is found, the message is delivered directly.
935
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
936
// mailbox upon the subsequent call to BindLiveShortChanID.
937
func (mo *mailOrchestrator) Deliver(ctx context.Context,
938
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
91✔
939

91✔
940
        var (
91✔
941
                mailbox MailBox
91✔
942
                found   bool
91✔
943
        )
91✔
944

91✔
945
        // First, try to find the channel id for the target short_chan_id. If
91✔
946
        // the link is live, we will also look up the created mailbox.
91✔
947
        mo.mu.RLock()
91✔
948
        chanID, isLive := mo.liveIndex[sid]
91✔
949
        if isLive {
177✔
950
                mailbox, found = mo.mailboxes[chanID]
86✔
951
        }
86✔
952
        mo.mu.RUnlock()
91✔
953

91✔
954
        // The link is live and target mailbox was found, deliver immediately.
91✔
955
        if isLive && found {
177✔
956
                return mailbox.AddPacket(pkt)
86✔
957
        }
86✔
958

959
        // If we detected that the link has not been made live, we will acquire
960
        // the exclusive lock preemptively in order to queue this packet in the
961
        // list of unclaimed packets.
962
        mo.mu.Lock()
7✔
963

7✔
964
        // Double check to see if the mailbox has been not made live since the
7✔
965
        // release of the shared lock.
7✔
966
        //
7✔
967
        // NOTE: Checking again with the exclusive lock held prevents a race
7✔
968
        // condition where BindLiveShortChanID is interleaved between the
7✔
969
        // release of the shared lock, and acquiring the exclusive lock. The
7✔
970
        // result would be stuck packets, as they wouldn't be redelivered until
7✔
971
        // the next call to BindLiveShortChanID, which is expected to occur
7✔
972
        // infrequently.
7✔
973
        chanID, isLive = mo.liveIndex[sid]
7✔
974
        if isLive {
7✔
975
                // Reaching this point indicates the mailbox is actually live.
×
976
                // We'll try to load the mailbox using the fresh channel id.
×
977
                //
×
978
                // NOTE: This should never create a new mailbox, as the live
×
979
                // index should only be set if the mailbox had been initialized
×
980
                // beforehand.  However, this does ensure that this case is
×
981
                // handled properly in the event that it could happen.
×
NEW
982
                mailbox = mo.exclusiveGetOrCreateMailBox(ctx, chanID, sid)
×
983
                mo.mu.Unlock()
×
984

×
985
                // Deliver the packet to the mailbox if it was found or created.
×
986
                return mailbox.AddPacket(pkt)
×
987
        }
×
988

989
        // Finally, if the channel id is still not found in the live index,
990
        // we'll add this to the list of unclaimed packets. These will be
991
        // delivered upon the next call to BindLiveShortChanID.
992
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
7✔
993
        mo.mu.Unlock()
7✔
994

7✔
995
        return nil
7✔
996
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc