• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14350940652

09 Apr 2025 06:57AM UTC coverage: 58.625%. First build
14350940652

Pull #9691

github

web-flow
Merge d50df36db into ac052988c
Pull Request #9691: htlcswitch+peer: thread context through in preparation for passing to graph DB calls

187 of 266 new or added lines in 15 files covered. (70.3%)

97184 of 165773 relevant lines covered (58.62%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.01
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "context"
7
        "errors"
8
        "fmt"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/clock"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/lntypes"
15
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
21
        // a shutdown request.
22
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
23

24
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
25
        // because it already exists in the mailbox.
26
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
27
)
28

29
// MailBox is an interface which represents a concurrent-safe, in-order
30
// delivery queue for messages from the network and also from the main switch.
31
// This struct serves as a buffer between incoming messages, and messages to
32
// the handled by the link. Each of the mutating methods within this interface
33
// should be implemented in a non-blocking manner.
34
type MailBox interface {
35
        // AddMessage appends a new message to the end of the message queue.
36
        AddMessage(msg lnwire.Message) error
37

38
        // AddPacket appends a new message to the end of the packet queue.
39
        AddPacket(pkt *htlcPacket) error
40

41
        // HasPacket queries the packets for a circuit key, this is used to drop
42
        // packets bound for the switch that already have a queued response.
43
        HasPacket(CircuitKey) bool
44

45
        // AckPacket removes a packet from the mailboxes in-memory replay
46
        // buffer. This will prevent a packet from being delivered after a link
47
        // restarts if the switch has remained online. The returned boolean
48
        // indicates whether or not a packet with the passed incoming circuit
49
        // key was removed.
50
        AckPacket(CircuitKey) bool
51

52
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
53
        // removing it from the in-memory replay buffer. This will prevent the
54
        // packet from being delivered after the link restarts if the switch has
55
        // remained online. The generated LinkError will show an
56
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
57
        FailAdd(ctx context.Context, pkt *htlcPacket)
58

59
        // MessageOutBox returns a channel that any new messages ready for
60
        // delivery will be sent on.
61
        MessageOutBox() chan lnwire.Message
62

63
        // PacketOutBox returns a channel that any new packets ready for
64
        // delivery will be sent on.
65
        PacketOutBox() chan *htlcPacket
66

67
        // Clears any pending wire messages from the inbox.
68
        ResetMessages() error
69

70
        // Reset the packet head to point at the first element in the list.
71
        ResetPackets() error
72

73
        // SetDustClosure takes in a closure that is used to evaluate whether
74
        // mailbox HTLC's are dust.
75
        SetDustClosure(isDust dustClosure)
76

77
        // SetFeeRate sets the feerate to be used when evaluating dust.
78
        SetFeeRate(feerate chainfee.SatPerKWeight)
79

80
        // DustPackets returns the dust sum for Adds in the mailbox for the
81
        // local and remote commitments.
82
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
83

84
        // Start starts the mailbox and any goroutines it needs to operate
85
        // properly.
86
        Start(ctx context.Context)
87

88
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
89
        Stop()
90
}
91

92
type mailBoxConfig struct {
93
        // shortChanID is the short channel id of the channel this mailbox
94
        // belongs to.
95
        shortChanID lnwire.ShortChannelID
96

97
        // forwardPackets send a varidic number of htlcPackets to the switch to
98
        // be routed. A quit channel should be provided so that the call can
99
        // properly exit during shutdown.
100
        forwardPackets func(context.Context, <-chan struct{},
101
                ...*htlcPacket) error
102

103
        // clock is a time source for the mailbox.
104
        clock clock.Clock
105

106
        // expiry is the interval after which Adds will be cancelled if they
107
        // have not been yet been delivered. The computed deadline will expiry
108
        // this long after the Adds are added via AddPacket.
109
        expiry time.Duration
110

111
        // failMailboxUpdate is used to fail an expired HTLC and use the
112
        // correct SCID if the underlying channel uses aliases.
113
        failMailboxUpdate func(ctx context.Context, outScid,
114
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
115
}
116

117
// memoryMailBox is an implementation of the MailBox struct backed by purely
118
// in-memory queues.
119
//
120
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
121
type memoryMailBox struct {
122
        started sync.Once
123
        stopped sync.Once
124

125
        cfg *mailBoxConfig
126

127
        wireMessages *list.List
128
        wireMtx      sync.Mutex
129
        wireCond     *sync.Cond
130

131
        messageOutbox chan lnwire.Message
132
        msgReset      chan chan struct{}
133

134
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
135
        repPkts  *list.List
136
        repIndex map[CircuitKey]*list.Element
137
        repHead  *list.Element
138

139
        // addPkts is a dedicated queue for Adds.
140
        addPkts  *list.List
141
        addIndex map[CircuitKey]*list.Element
142
        addHead  *list.Element
143

144
        pktMtx  sync.Mutex
145
        pktCond *sync.Cond
146

147
        pktOutbox chan *htlcPacket
148
        pktReset  chan chan struct{}
149

150
        wireShutdown chan struct{}
151
        pktShutdown  chan struct{}
152

153
        cancel fn.Option[context.CancelFunc]
154
        quit   chan struct{}
155

156
        // feeRate is set when the link receives or sends out fee updates. It
157
        // is refreshed when AttachMailBox is called in case a fee update did
158
        // not get committed. In some cases it may be out of sync with the
159
        // channel's feerate, but it should eventually get back in sync.
160
        feeRate chainfee.SatPerKWeight
161

162
        // isDust is set when AttachMailBox is called and serves to evaluate
163
        // the outstanding dust in the memoryMailBox given the current set
164
        // feeRate.
165
        isDust dustClosure
166
}
167

168
// newMemoryMailBox creates a new instance of the memoryMailBox.
169
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
3✔
170
        box := &memoryMailBox{
3✔
171
                cfg:           cfg,
3✔
172
                wireMessages:  list.New(),
3✔
173
                repPkts:       list.New(),
3✔
174
                addPkts:       list.New(),
3✔
175
                messageOutbox: make(chan lnwire.Message),
3✔
176
                pktOutbox:     make(chan *htlcPacket),
3✔
177
                msgReset:      make(chan chan struct{}, 1),
3✔
178
                pktReset:      make(chan chan struct{}, 1),
3✔
179
                repIndex:      make(map[CircuitKey]*list.Element),
3✔
180
                addIndex:      make(map[CircuitKey]*list.Element),
3✔
181
                wireShutdown:  make(chan struct{}),
3✔
182
                pktShutdown:   make(chan struct{}),
3✔
183
                quit:          make(chan struct{}),
3✔
184
        }
3✔
185
        box.wireCond = sync.NewCond(&box.wireMtx)
3✔
186
        box.pktCond = sync.NewCond(&box.pktMtx)
3✔
187

3✔
188
        return box
3✔
189
}
3✔
190

191
// A compile time assertion to ensure that memoryMailBox meets the MailBox
192
// interface.
193
var _ MailBox = (*memoryMailBox)(nil)
194

195
// courierType is an enum that reflects the distinct types of messages a
196
// MailBox can handle. Each type will be placed in an isolated mail box and
197
// will have a dedicated goroutine for delivering the messages.
198
type courierType uint8
199

200
const (
201
        // wireCourier is a type of courier that handles wire messages.
202
        wireCourier courierType = iota
203

204
        // pktCourier is a type of courier that handles htlc packets.
205
        pktCourier
206
)
207

208
// Start starts the mailbox and any goroutines it needs to operate properly.
209
//
210
// NOTE: This method is part of the MailBox interface.
211
func (m *memoryMailBox) Start(ctx context.Context) {
3✔
212
        m.started.Do(func() {
6✔
213
                ctx, cancel := context.WithCancel(ctx)
3✔
214
                m.cancel = fn.Some(cancel)
3✔
215

3✔
216
                go m.wireMailCourier()
3✔
217
                go m.pktMailCourier(ctx)
3✔
218
        })
3✔
219
}
220

221
// ResetMessages blocks until all buffered wire messages are cleared.
222
func (m *memoryMailBox) ResetMessages() error {
3✔
223
        msgDone := make(chan struct{})
3✔
224
        select {
3✔
225
        case m.msgReset <- msgDone:
3✔
226
                return m.signalUntilReset(wireCourier, msgDone)
3✔
227
        case <-m.quit:
×
228
                return ErrMailBoxShuttingDown
×
229
        }
230
}
231

232
// ResetPackets blocks until the head of packets buffer is reset, causing the
233
// packets to be redelivered in order.
234
func (m *memoryMailBox) ResetPackets() error {
3✔
235
        pktDone := make(chan struct{})
3✔
236
        select {
3✔
237
        case m.pktReset <- pktDone:
3✔
238
                return m.signalUntilReset(pktCourier, pktDone)
3✔
239
        case <-m.quit:
×
240
                return ErrMailBoxShuttingDown
×
241
        }
242
}
243

244
// signalUntilReset strobes the condition variable for the specified inbox type
245
// until receiving a response that the mailbox has processed a reset.
246
func (m *memoryMailBox) signalUntilReset(cType courierType,
247
        done chan struct{}) error {
3✔
248

3✔
249
        for {
6✔
250

3✔
251
                switch cType {
3✔
252
                case wireCourier:
3✔
253
                        m.wireCond.Signal()
3✔
254
                case pktCourier:
3✔
255
                        m.pktCond.Signal()
3✔
256
                }
257

258
                select {
3✔
259
                case <-time.After(time.Millisecond):
3✔
260
                        continue
3✔
261
                case <-done:
3✔
262
                        return nil
3✔
263
                case <-m.quit:
×
264
                        return ErrMailBoxShuttingDown
×
265
                }
266
        }
267
}
268

269
// AckPacket removes the packet identified by it's incoming circuit key from the
270
// queue of packets to be delivered. The returned boolean indicates whether or
271
// not a packet with the passed incoming circuit key was removed.
272
//
273
// NOTE: It is safe to call this method multiple times for the same circuit key.
274
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
3✔
275
        m.pktCond.L.Lock()
3✔
276
        defer m.pktCond.L.Unlock()
3✔
277

3✔
278
        if entry, ok := m.repIndex[inKey]; ok {
6✔
279
                // Check whether we are removing the head of the queue. If so,
3✔
280
                // we must advance the head to the next packet before removing.
3✔
281
                // It's possible that the courier has already advanced the
3✔
282
                // repHead, so this check prevents the repHead from getting
3✔
283
                // desynchronized.
3✔
284
                if entry == m.repHead {
3✔
285
                        m.repHead = entry.Next()
×
286
                }
×
287
                m.repPkts.Remove(entry)
3✔
288
                delete(m.repIndex, inKey)
3✔
289

3✔
290
                return true
3✔
291
        }
292

293
        if entry, ok := m.addIndex[inKey]; ok {
6✔
294
                // Check whether we are removing the head of the queue. If so,
3✔
295
                // we must advance the head to the next add before removing.
3✔
296
                // It's possible that the courier has already advanced the
3✔
297
                // addHead, so this check prevents the addHead from getting
3✔
298
                // desynchronized.
3✔
299
                //
3✔
300
                // NOTE: While this event is rare for Settles or Fails, it could
3✔
301
                // be very common for Adds since the mailbox has the ability to
3✔
302
                // cancel Adds before they are delivered. When that occurs, the
3✔
303
                // head of addPkts has only been peeked and we expect to be
3✔
304
                // removing the head of the queue.
3✔
305
                if entry == m.addHead {
5✔
306
                        m.addHead = entry.Next()
2✔
307
                }
2✔
308

309
                m.addPkts.Remove(entry)
3✔
310
                delete(m.addIndex, inKey)
3✔
311

3✔
312
                return true
3✔
313
        }
314

315
        return false
×
316
}
317

318
// HasPacket queries the packets for a circuit key, this is used to drop packets
319
// bound for the switch that already have a queued response.
320
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
3✔
321
        m.pktCond.L.Lock()
3✔
322
        _, ok := m.repIndex[inKey]
3✔
323
        m.pktCond.L.Unlock()
3✔
324

3✔
325
        return ok
3✔
326
}
3✔
327

328
// Stop signals the mailbox and its goroutines for a graceful shutdown.
329
//
330
// NOTE: This method is part of the MailBox interface.
331
func (m *memoryMailBox) Stop() {
3✔
332
        m.stopped.Do(func() {
6✔
333
                m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
6✔
334
                close(m.quit)
3✔
335

3✔
336
                m.signalUntilShutdown(wireCourier)
3✔
337
                m.signalUntilShutdown(pktCourier)
3✔
338
        })
339
}
340

341
// signalUntilShutdown strobes the condition variable of the passed courier
342
// type, blocking until the worker has exited.
343
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
3✔
344
        var (
3✔
345
                cond     *sync.Cond
3✔
346
                shutdown chan struct{}
3✔
347
        )
3✔
348

3✔
349
        switch cType {
3✔
350
        case wireCourier:
3✔
351
                cond = m.wireCond
3✔
352
                shutdown = m.wireShutdown
3✔
353
        case pktCourier:
3✔
354
                cond = m.pktCond
3✔
355
                shutdown = m.pktShutdown
3✔
356
        }
357

358
        for {
6✔
359
                select {
3✔
360
                case <-time.After(time.Millisecond):
3✔
361
                        cond.Signal()
3✔
362
                case <-shutdown:
3✔
363
                        return
3✔
364
                }
365
        }
366
}
367

368
// pktWithExpiry wraps an incoming packet and records the time at which it it
369
// should be canceled from the mailbox. This will be used to detect if it gets
370
// stuck in the mailbox and inform when to cancel back.
371
type pktWithExpiry struct {
372
        pkt    *htlcPacket
373
        expiry time.Time
374
}
375

376
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
3✔
377
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
3✔
378
}
3✔
379

380
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
381
// wire messages.
382
func (m *memoryMailBox) wireMailCourier() {
3✔
383
        defer close(m.wireShutdown)
3✔
384

3✔
385
        for {
6✔
386
                // First, we'll check our condition. If our mailbox is empty,
3✔
387
                // then we'll wait until a new item is added.
3✔
388
                m.wireCond.L.Lock()
3✔
389
                for m.wireMessages.Front() == nil {
6✔
390
                        m.wireCond.Wait()
3✔
391

3✔
392
                        select {
3✔
393
                        case msgDone := <-m.msgReset:
3✔
394
                                m.wireMessages.Init()
3✔
395
                                close(msgDone)
3✔
396
                        case <-m.quit:
3✔
397
                                m.wireCond.L.Unlock()
3✔
398
                                return
3✔
399
                        default:
3✔
400
                        }
401
                }
402

403
                // Grab the datum off the front of the queue, shifting the
404
                // slice's reference down one in order to remove the datum from
405
                // the queue.
406
                entry := m.wireMessages.Front()
3✔
407

3✔
408
                //nolint:forcetypeassert
3✔
409
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3✔
410

3✔
411
                // Now that we're done with the condition, we can unlock it to
3✔
412
                // allow any callers to append to the end of our target queue.
3✔
413
                m.wireCond.L.Unlock()
3✔
414

3✔
415
                // With the next message obtained, we'll now select to attempt
3✔
416
                // to deliver the message. If we receive a kill signal, then
3✔
417
                // we'll bail out.
3✔
418
                select {
3✔
419
                case m.messageOutbox <- nextMsg:
3✔
420
                case msgDone := <-m.msgReset:
×
421
                        m.wireCond.L.Lock()
×
422
                        m.wireMessages.Init()
×
423
                        m.wireCond.L.Unlock()
×
424

×
425
                        close(msgDone)
×
426
                case <-m.quit:
×
427
                        return
×
428
                }
429
        }
430
}
431

432
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
433
// packet messages.
434
func (m *memoryMailBox) pktMailCourier(ctx context.Context) {
3✔
435
        defer close(m.pktShutdown)
3✔
436

3✔
437
        for {
6✔
438
                // First, we'll check our condition. If our mailbox is empty,
3✔
439
                // then we'll wait until a new item is added.
3✔
440
                m.pktCond.L.Lock()
3✔
441
                for m.repHead == nil && m.addHead == nil {
6✔
442
                        m.pktCond.Wait()
3✔
443

3✔
444
                        select {
3✔
445
                        // Resetting the packet queue means just moving our
446
                        // pointer to the front. This ensures that any un-ACK'd
447
                        // messages are re-delivered upon reconnect.
448
                        case pktDone := <-m.pktReset:
3✔
449
                                m.repHead = m.repPkts.Front()
3✔
450
                                m.addHead = m.addPkts.Front()
3✔
451

3✔
452
                                close(pktDone)
3✔
453

454
                        case <-m.quit:
3✔
455
                                m.pktCond.L.Unlock()
3✔
456
                                return
3✔
457
                        default:
3✔
458
                        }
459
                }
460

461
                var (
3✔
462
                        nextRep   *htlcPacket
3✔
463
                        nextRepEl *list.Element
3✔
464
                        nextAdd   *pktWithExpiry
3✔
465
                        nextAddEl *list.Element
3✔
466
                )
3✔
467
                // For packets, we actually never remove an item until it has
3✔
468
                // been ACK'd by the link. This ensures that if a read packet
3✔
469
                // doesn't make it into a commitment, then it'll be
3✔
470
                // re-delivered once the link comes back online.
3✔
471

3✔
472
                // Peek at the head of the Settle/Fails and Add queues. We peak
3✔
473
                // both even if there is a Settle/Fail present because we need
3✔
474
                // to set a deadline for the next pending Add if it's present.
3✔
475
                // Due to clock monotonicity, we know that the head of the Adds
3✔
476
                // is the next to expire.
3✔
477
                if m.repHead != nil {
6✔
478
                        //nolint:forcetypeassert
3✔
479
                        nextRep = m.repHead.Value.(*htlcPacket)
3✔
480
                        nextRepEl = m.repHead
3✔
481
                }
3✔
482
                if m.addHead != nil {
6✔
483
                        //nolint:forcetypeassert
3✔
484
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
3✔
485
                        nextAddEl = m.addHead
3✔
486
                }
3✔
487

488
                // Now that we're done with the condition, we can unlock it to
489
                // allow any callers to append to the end of our target queue.
490
                m.pktCond.L.Unlock()
3✔
491

3✔
492
                var (
3✔
493
                        pktOutbox chan *htlcPacket
3✔
494
                        addOutbox chan *htlcPacket
3✔
495
                        add       *htlcPacket
3✔
496
                        deadline  <-chan time.Time
3✔
497
                )
3✔
498

3✔
499
                // Prioritize delivery of Settle/Fail packets over Adds. This
3✔
500
                // ensures that we actively clear the commitment of existing
3✔
501
                // HTLCs before trying to add new ones. This can help to improve
3✔
502
                // forwarding performance since the time to sign a commitment is
3✔
503
                // linear in the number of HTLCs manifested on the commitments.
3✔
504
                //
3✔
505
                // NOTE: Both types are eventually delivered over the same
3✔
506
                // channel, but we can control which is delivered by exclusively
3✔
507
                // making one nil and the other non-nil. We know from our loop
3✔
508
                // condition that at least one nextRep and nextAdd are non-nil.
3✔
509
                if nextRep != nil {
6✔
510
                        pktOutbox = m.pktOutbox
3✔
511
                } else {
6✔
512
                        addOutbox = m.pktOutbox
3✔
513
                }
3✔
514

515
                // If we have a pending Add, we'll also construct the deadline
516
                // so we can fail it back if we are unable to deliver any
517
                // message in time. We also dereference the nextAdd's packet,
518
                // since we will need access to it in the case we are delivering
519
                // it and/or if the deadline expires.
520
                //
521
                // NOTE: It's possible after this point for add to be nil, but
522
                // this can only occur when addOutbox is also nil, hence we
523
                // won't accidentally deliver a nil packet.
524
                if nextAdd != nil {
6✔
525
                        add = nextAdd.pkt
3✔
526
                        deadline = nextAdd.deadline(m.cfg.clock)
3✔
527
                }
3✔
528

529
                select {
3✔
530
                case pktOutbox <- nextRep:
3✔
531
                        m.pktCond.L.Lock()
3✔
532
                        // Only advance the repHead if this Settle or Fail is
3✔
533
                        // still at the head of the queue.
3✔
534
                        if m.repHead != nil && m.repHead == nextRepEl {
6✔
535
                                m.repHead = m.repHead.Next()
3✔
536
                        }
3✔
537
                        m.pktCond.L.Unlock()
3✔
538

539
                case addOutbox <- add:
3✔
540
                        m.pktCond.L.Lock()
3✔
541
                        // Only advance the addHead if this Add is still at the
3✔
542
                        // head of the queue.
3✔
543
                        if m.addHead != nil && m.addHead == nextAddEl {
6✔
544
                                m.addHead = m.addHead.Next()
3✔
545
                        }
3✔
546
                        m.pktCond.L.Unlock()
3✔
547

548
                case <-deadline:
×
549
                        log.Debugf("Expiring add htlc with "+
×
550
                                "keystone=%v", add.keystone())
×
NEW
551
                        m.FailAdd(ctx, add)
×
552

553
                case pktDone := <-m.pktReset:
3✔
554
                        m.pktCond.L.Lock()
3✔
555
                        m.repHead = m.repPkts.Front()
3✔
556
                        m.addHead = m.addPkts.Front()
3✔
557
                        m.pktCond.L.Unlock()
3✔
558

3✔
559
                        close(pktDone)
3✔
560

561
                case <-m.quit:
3✔
562
                        return
3✔
563
                }
564
        }
565
}
566

567
// AddMessage appends a new message to the end of the message queue.
568
//
569
// NOTE: This method is safe for concrete use and part of the MailBox
570
// interface.
571
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3✔
572
        // First, we'll lock the condition, and add the message to the end of
3✔
573
        // the wire message inbox.
3✔
574
        m.wireCond.L.Lock()
3✔
575
        m.wireMessages.PushBack(msg)
3✔
576
        m.wireCond.L.Unlock()
3✔
577

3✔
578
        // With the message added, we signal to the mailCourier that there are
3✔
579
        // additional messages to deliver.
3✔
580
        m.wireCond.Signal()
3✔
581

3✔
582
        return nil
3✔
583
}
3✔
584

585
// AddPacket appends a new message to the end of the packet queue.
586
//
587
// NOTE: This method is safe for concrete use and part of the MailBox
588
// interface.
589
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
3✔
590
        m.pktCond.L.Lock()
3✔
591
        switch htlc := pkt.htlc.(type) {
3✔
592
        // Split off Settle/Fail packets into the repPkts queue.
593
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
3✔
594
                if _, ok := m.repIndex[pkt.inKey()]; ok {
3✔
595
                        m.pktCond.L.Unlock()
×
596
                        return ErrPacketAlreadyExists
×
597
                }
×
598

599
                entry := m.repPkts.PushBack(pkt)
3✔
600
                m.repIndex[pkt.inKey()] = entry
3✔
601
                if m.repHead == nil {
6✔
602
                        m.repHead = entry
3✔
603
                }
3✔
604

605
        // Split off Add packets into the addPkts queue.
606
        case *lnwire.UpdateAddHTLC:
3✔
607
                if _, ok := m.addIndex[pkt.inKey()]; ok {
3✔
608
                        m.pktCond.L.Unlock()
×
609
                        return ErrPacketAlreadyExists
×
610
                }
×
611

612
                entry := m.addPkts.PushBack(&pktWithExpiry{
3✔
613
                        pkt:    pkt,
3✔
614
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
3✔
615
                })
3✔
616
                m.addIndex[pkt.inKey()] = entry
3✔
617
                if m.addHead == nil {
6✔
618
                        m.addHead = entry
3✔
619
                }
3✔
620

621
        default:
×
622
                m.pktCond.L.Unlock()
×
623
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
624
        }
625
        m.pktCond.L.Unlock()
3✔
626

3✔
627
        // With the packet added, we signal to the mailCourier that there are
3✔
628
        // additional packets to consume.
3✔
629
        m.pktCond.Signal()
3✔
630

3✔
631
        return nil
3✔
632
}
633

634
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
635
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
3✔
636
        m.pktCond.L.Lock()
3✔
637
        defer m.pktCond.L.Unlock()
3✔
638

3✔
639
        m.feeRate = feeRate
3✔
640
}
3✔
641

642
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
643
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
3✔
644
        m.pktCond.L.Lock()
3✔
645
        defer m.pktCond.L.Unlock()
3✔
646

3✔
647
        m.isDust = isDust
3✔
648
}
3✔
649

650
// DustPackets returns the dust sum for add packets in the mailbox. The first
651
// return value is the local dust sum and the second is the remote dust sum.
652
// This will keep track of a given dust HTLC from the time it is added via
653
// AddPacket until it is removed via AckPacket.
654
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
655
        lnwire.MilliSatoshi) {
3✔
656

3✔
657
        m.pktCond.L.Lock()
3✔
658
        defer m.pktCond.L.Unlock()
3✔
659

3✔
660
        var (
3✔
661
                localDustSum  lnwire.MilliSatoshi
3✔
662
                remoteDustSum lnwire.MilliSatoshi
3✔
663
        )
3✔
664

3✔
665
        // Run through the map of HTLC's and determine the dust sum with calls
3✔
666
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
3✔
667
        // are outgoing so the second argument to isDust will be false.
3✔
668
        for _, e := range m.addIndex {
6✔
669
                addPkt := e.Value.(*pktWithExpiry).pkt
3✔
670

3✔
671
                // Evaluate whether this HTLC is dust on the local commitment.
3✔
672
                if m.isDust(
3✔
673
                        m.feeRate, false, lntypes.Local,
3✔
674
                        addPkt.amount.ToSatoshis(),
3✔
675
                ) {
6✔
676

3✔
677
                        localDustSum += addPkt.amount
3✔
678
                }
3✔
679

680
                // Evaluate whether this HTLC is dust on the remote commitment.
681
                if m.isDust(
3✔
682
                        m.feeRate, false, lntypes.Remote,
3✔
683
                        addPkt.amount.ToSatoshis(),
3✔
684
                ) {
6✔
685

3✔
686
                        remoteDustSum += addPkt.amount
3✔
687
                }
3✔
688
        }
689

690
        return localDustSum, remoteDustSum
3✔
691
}
692

693
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
694
// from the in-memory replay buffer. This will prevent the packet from being
695
// delivered after the link restarts if the switch has remained online. The
696
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
697
// FailureDetail.
698
func (m *memoryMailBox) FailAdd(ctx context.Context, pkt *htlcPacket) {
3✔
699
        // First, remove the packet from mailbox. If we didn't find the packet
3✔
700
        // because it has already been acked, we'll exit early to avoid sending
3✔
701
        // a duplicate fail message through the switch.
3✔
702
        if !m.AckPacket(pkt.inKey()) {
3✔
703
                return
×
704
        }
×
705

706
        var (
3✔
707
                localFailure = false
3✔
708
                reason       lnwire.OpaqueReason
3✔
709
        )
3✔
710

3✔
711
        // Create a temporary channel failure which we will send back to our
3✔
712
        // peer if this is a forward, or report to the user if the failed
3✔
713
        // payment was locally initiated.
3✔
714
        failure := m.cfg.failMailboxUpdate(
3✔
715
                ctx, pkt.originalOutgoingChanID, m.cfg.shortChanID,
3✔
716
        )
3✔
717

3✔
718
        // If the payment was locally initiated (which is indicated by a nil
3✔
719
        // obfuscator), we do not need to encrypt it back to the sender.
3✔
720
        if pkt.obfuscator == nil {
6✔
721
                var b bytes.Buffer
3✔
722
                err := lnwire.EncodeFailure(&b, failure, 0)
3✔
723
                if err != nil {
3✔
724
                        log.Errorf("Unable to encode failure: %v", err)
×
725
                        return
×
726
                }
×
727
                reason = lnwire.OpaqueReason(b.Bytes())
3✔
728
                localFailure = true
3✔
729
        } else {
×
730
                // If the packet is part of a forward, (identified by a non-nil
×
731
                // obfuscator) we need to encrypt the error back to the source.
×
732
                var err error
×
733
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
×
734
                if err != nil {
×
735
                        log.Errorf("Unable to obfuscate error: %v", err)
×
736
                        return
×
737
                }
×
738
        }
739

740
        // Create a link error containing the temporary channel failure and a
741
        // detail which indicates the we failed to add the htlc.
742
        linkError := NewDetailedLinkError(
3✔
743
                failure, OutgoingFailureDownstreamHtlcAdd,
3✔
744
        )
3✔
745

3✔
746
        failPkt := &htlcPacket{
3✔
747
                incomingChanID: pkt.incomingChanID,
3✔
748
                incomingHTLCID: pkt.incomingHTLCID,
3✔
749
                circuit:        pkt.circuit,
3✔
750
                sourceRef:      pkt.sourceRef,
3✔
751
                hasSource:      true,
3✔
752
                localFailure:   localFailure,
3✔
753
                obfuscator:     pkt.obfuscator,
3✔
754
                linkFailure:    linkError,
3✔
755
                htlc: &lnwire.UpdateFailHTLC{
3✔
756
                        Reason: reason,
3✔
757
                },
3✔
758
        }
3✔
759

3✔
760
        if err := m.cfg.forwardPackets(ctx, m.quit, failPkt); err != nil {
3✔
761
                log.Errorf("Unhandled error while reforwarding packets "+
×
762
                        "settle/fail over htlcswitch: %v", err)
×
763
        }
×
764
}
765

766
// MessageOutBox returns a channel that any new messages ready for delivery
767
// will be sent on.
768
//
769
// NOTE: This method is part of the MailBox interface.
770
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
3✔
771
        return m.messageOutbox
3✔
772
}
3✔
773

774
// PacketOutBox returns a channel that any new packets ready for delivery will
775
// be sent on.
776
//
777
// NOTE: This method is part of the MailBox interface.
778
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
3✔
779
        return m.pktOutbox
3✔
780
}
3✔
781

782
// mailOrchestrator is responsible for coordinating the creation and lifecycle
783
// of mailboxes used within the switch. It supports the ability to create
784
// mailboxes, reassign their short channel id's, deliver htlc packets, and
785
// queue packets for mailboxes that have not been created due to a link's late
786
// registration.
787
type mailOrchestrator struct {
788
        mu sync.RWMutex
789

790
        cfg *mailOrchConfig
791

792
        // mailboxes caches exactly one mailbox for all known channels.
793
        mailboxes map[lnwire.ChannelID]MailBox
794

795
        // liveIndex maps a live short chan id to the primary mailbox key.
796
        // An index in liveIndex map is only entered under two conditions:
797
        //   1. A link has a non-zero short channel id at time of AddLink.
798
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
799
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
800

801
        // TODO(conner): add another pair of indexes:
802
        //   chan_id -> short_chan_id
803
        //   short_chan_id -> mailbox
804
        // so that Deliver can lookup mailbox directly once live,
805
        // but still queryable by channel_id.
806

807
        // unclaimedPackets maps a live short chan id to queue of packets if no
808
        // mailbox has been created.
809
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
810
}
811

812
type mailOrchConfig struct {
813
        // forwardPackets send a varidic number of htlcPackets to the switch to
814
        // be routed. A quit channel should be provided so that the call can
815
        // properly exit during shutdown.
816
        forwardPackets func(context.Context, <-chan struct{},
817
                ...*htlcPacket) error
818

819
        // clock is a time source for the generated mailboxes.
820
        clock clock.Clock
821

822
        // expiry is the interval after which Adds will be cancelled if they
823
        // have not been yet been delivered. The computed deadline will expiry
824
        // this long after the Adds are added to a mailbox via AddPacket.
825
        expiry time.Duration
826

827
        // failMailboxUpdate is used to fail an expired HTLC and use the
828
        // correct SCID if the underlying channel uses aliases.
829
        failMailboxUpdate func(ctx context.Context, outScid,
830
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
831
}
832

833
// newMailOrchestrator initializes a fresh mailOrchestrator.
834
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
3✔
835
        return &mailOrchestrator{
3✔
836
                cfg:              cfg,
3✔
837
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
3✔
838
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
3✔
839
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
3✔
840
        }
3✔
841
}
3✔
842

843
// Stop instructs the orchestrator to stop all active mailboxes.
844
func (mo *mailOrchestrator) Stop() {
3✔
845
        for _, mailbox := range mo.mailboxes {
6✔
846
                mailbox.Stop()
3✔
847
        }
3✔
848
}
849

850
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
851
// creates and returns a new mailbox if none is found.
852
func (mo *mailOrchestrator) GetOrCreateMailBox(ctx context.Context,
853
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
3✔
854

3✔
855
        // First, try lookup the mailbox directly using only the shared mutex.
3✔
856
        mo.mu.RLock()
3✔
857
        mailbox, ok := mo.mailboxes[chanID]
3✔
858
        if ok {
6✔
859
                mo.mu.RUnlock()
3✔
860
                return mailbox
3✔
861
        }
3✔
862
        mo.mu.RUnlock()
3✔
863

3✔
864
        // Otherwise, we will try again with exclusive lock, creating a mailbox
3✔
865
        // if one still has not been created.
3✔
866
        mo.mu.Lock()
3✔
867
        mailbox = mo.exclusiveGetOrCreateMailBox(ctx, chanID, shortChanID)
3✔
868
        mo.mu.Unlock()
3✔
869

3✔
870
        return mailbox
3✔
871
}
872

873
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
874
// given channel id. If none is found, a new one is creates, started, and
875
// recorded.
876
//
877
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
878
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(ctx context.Context,
879
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
3✔
880

3✔
881
        mailbox, ok := mo.mailboxes[chanID]
3✔
882
        if !ok {
6✔
883
                mailbox = newMemoryMailBox(&mailBoxConfig{
3✔
884
                        shortChanID:       shortChanID,
3✔
885
                        forwardPackets:    mo.cfg.forwardPackets,
3✔
886
                        clock:             mo.cfg.clock,
3✔
887
                        expiry:            mo.cfg.expiry,
3✔
888
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
3✔
889
                })
3✔
890
                mailbox.Start(ctx)
3✔
891
                mo.mailboxes[chanID] = mailbox
3✔
892
        }
3✔
893

894
        return mailbox
3✔
895
}
896

897
// BindLiveShortChanID registers that messages bound for a particular short
898
// channel id should be forwarded to the mailbox corresponding to the given
899
// channel id. This method also checks to see if there are any unclaimed
900
// packets for this short_chan_id. If any are found, they are delivered to the
901
// mailbox and removed (marked as claimed).
902
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
903
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
3✔
904

3✔
905
        mo.mu.Lock()
3✔
906
        // Update the mapping from short channel id to mailbox's channel id.
3✔
907
        mo.liveIndex[sid] = cid
3✔
908

3✔
909
        // Retrieve any unclaimed packets destined for this mailbox.
3✔
910
        pkts := mo.unclaimedPackets[sid]
3✔
911
        delete(mo.unclaimedPackets, sid)
3✔
912
        mo.mu.Unlock()
3✔
913

3✔
914
        // Deliver the unclaimed packets.
3✔
915
        for _, pkt := range pkts {
6✔
916
                mailbox.AddPacket(pkt)
3✔
917
        }
3✔
918
}
919

920
// Deliver lookups the target mailbox using the live index from short_chan_id
921
// to channel_id. If the mailbox is found, the message is delivered directly.
922
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
923
// mailbox upon the subsequent call to BindLiveShortChanID.
924
func (mo *mailOrchestrator) Deliver(ctx context.Context,
925
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
3✔
926

3✔
927
        var (
3✔
928
                mailbox MailBox
3✔
929
                found   bool
3✔
930
        )
3✔
931

3✔
932
        // First, try to find the channel id for the target short_chan_id. If
3✔
933
        // the link is live, we will also look up the created mailbox.
3✔
934
        mo.mu.RLock()
3✔
935
        chanID, isLive := mo.liveIndex[sid]
3✔
936
        if isLive {
6✔
937
                mailbox, found = mo.mailboxes[chanID]
3✔
938
        }
3✔
939
        mo.mu.RUnlock()
3✔
940

3✔
941
        // The link is live and target mailbox was found, deliver immediately.
3✔
942
        if isLive && found {
6✔
943
                return mailbox.AddPacket(pkt)
3✔
944
        }
3✔
945

946
        // If we detected that the link has not been made live, we will acquire
947
        // the exclusive lock preemptively in order to queue this packet in the
948
        // list of unclaimed packets.
949
        mo.mu.Lock()
3✔
950

3✔
951
        // Double check to see if the mailbox has been not made live since the
3✔
952
        // release of the shared lock.
3✔
953
        //
3✔
954
        // NOTE: Checking again with the exclusive lock held prevents a race
3✔
955
        // condition where BindLiveShortChanID is interleaved between the
3✔
956
        // release of the shared lock, and acquiring the exclusive lock. The
3✔
957
        // result would be stuck packets, as they wouldn't be redelivered until
3✔
958
        // the next call to BindLiveShortChanID, which is expected to occur
3✔
959
        // infrequently.
3✔
960
        chanID, isLive = mo.liveIndex[sid]
3✔
961
        if isLive {
3✔
962
                // Reaching this point indicates the mailbox is actually live.
×
963
                // We'll try to load the mailbox using the fresh channel id.
×
964
                //
×
965
                // NOTE: This should never create a new mailbox, as the live
×
966
                // index should only be set if the mailbox had been initialized
×
967
                // beforehand.  However, this does ensure that this case is
×
968
                // handled properly in the event that it could happen.
×
NEW
969
                mailbox = mo.exclusiveGetOrCreateMailBox(ctx, chanID, sid)
×
970
                mo.mu.Unlock()
×
971

×
972
                // Deliver the packet to the mailbox if it was found or created.
×
973
                return mailbox.AddPacket(pkt)
×
974
        }
×
975

976
        // Finally, if the channel id is still not found in the live index,
977
        // we'll add this to the list of unclaimed packets. These will be
978
        // delivered upon the next call to BindLiveShortChanID.
979
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
3✔
980
        mo.mu.Unlock()
3✔
981

3✔
982
        return nil
3✔
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc