• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14445630877

14 Apr 2025 12:28PM UTC coverage: 58.622%. First build
14445630877

Pull #9699

github

web-flow
Merge 29a66f31c into 7438f2227
Pull Request #9699: htlcswitch+peer [2/2]: thread context through in preparation for passing to graph DB calls

142 of 208 new or added lines in 14 files covered. (68.27%)

97202 of 165812 relevant lines covered (58.62%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.01
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "context"
7
        "errors"
8
        "fmt"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/clock"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/lntypes"
15
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
21
        // a shutdown request.
22
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
23

24
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
25
        // because it already exists in the mailbox.
26
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
27
)
28

29
// MailBox is an interface which represents a concurrent-safe, in-order
30
// delivery queue for messages from the network and also from the main switch.
31
// This struct serves as a buffer between incoming messages, and messages to
32
// the handled by the link. Each of the mutating methods within this interface
33
// should be implemented in a non-blocking manner.
34
type MailBox interface {
35
        // AddMessage appends a new message to the end of the message queue.
36
        AddMessage(msg lnwire.Message) error
37

38
        // AddPacket appends a new message to the end of the packet queue.
39
        AddPacket(pkt *htlcPacket) error
40

41
        // HasPacket queries the packets for a circuit key, this is used to drop
42
        // packets bound for the switch that already have a queued response.
43
        HasPacket(CircuitKey) bool
44

45
        // AckPacket removes a packet from the mailboxes in-memory replay
46
        // buffer. This will prevent a packet from being delivered after a link
47
        // restarts if the switch has remained online. The returned boolean
48
        // indicates whether or not a packet with the passed incoming circuit
49
        // key was removed.
50
        AckPacket(CircuitKey) bool
51

52
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
53
        // removing it from the in-memory replay buffer. This will prevent the
54
        // packet from being delivered after the link restarts if the switch has
55
        // remained online. The generated LinkError will show an
56
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
57
        FailAdd(ctx context.Context, pkt *htlcPacket)
58

59
        // MessageOutBox returns a channel that any new messages ready for
60
        // delivery will be sent on.
61
        MessageOutBox() chan lnwire.Message
62

63
        // PacketOutBox returns a channel that any new packets ready for
64
        // delivery will be sent on.
65
        PacketOutBox() chan *htlcPacket
66

67
        // Clears any pending wire messages from the inbox.
68
        ResetMessages() error
69

70
        // Reset the packet head to point at the first element in the list.
71
        ResetPackets() error
72

73
        // SetDustClosure takes in a closure that is used to evaluate whether
74
        // mailbox HTLC's are dust.
75
        SetDustClosure(isDust dustClosure)
76

77
        // SetFeeRate sets the feerate to be used when evaluating dust.
78
        SetFeeRate(feerate chainfee.SatPerKWeight)
79

80
        // DustPackets returns the dust sum for Adds in the mailbox for the
81
        // local and remote commitments.
82
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
83

84
        // Start starts the mailbox and any goroutines it needs to operate
85
        // properly.
86
        Start()
87

88
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
89
        Stop()
90
}
91

92
type mailBoxConfig struct {
93
        // shortChanID is the short channel id of the channel this mailbox
94
        // belongs to.
95
        shortChanID lnwire.ShortChannelID
96

97
        // forwardPackets send a varidic number of htlcPackets to the switch to
98
        // be routed. A quit channel should be provided so that the call can
99
        // properly exit during shutdown.
100
        forwardPackets func(context.Context, <-chan struct{},
101
                ...*htlcPacket) error
102

103
        // clock is a time source for the mailbox.
104
        clock clock.Clock
105

106
        // expiry is the interval after which Adds will be cancelled if they
107
        // have not been yet been delivered. The computed deadline will expiry
108
        // this long after the Adds are added via AddPacket.
109
        expiry time.Duration
110

111
        // failMailboxUpdate is used to fail an expired HTLC and use the
112
        // correct SCID if the underlying channel uses aliases.
113
        failMailboxUpdate func(ctx context.Context, outScid,
114
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
115
}
116

117
// memoryMailBox is an implementation of the MailBox struct backed by purely
118
// in-memory queues.
119
//
120
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
121
type memoryMailBox struct {
122
        started sync.Once
123
        stopped sync.Once
124

125
        cfg *mailBoxConfig
126

127
        wireMessages *list.List
128
        wireMtx      sync.Mutex
129
        wireCond     *sync.Cond
130

131
        messageOutbox chan lnwire.Message
132
        msgReset      chan chan struct{}
133

134
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
135
        repPkts  *list.List
136
        repIndex map[CircuitKey]*list.Element
137
        repHead  *list.Element
138

139
        // addPkts is a dedicated queue for Adds.
140
        addPkts  *list.List
141
        addIndex map[CircuitKey]*list.Element
142
        addHead  *list.Element
143

144
        pktMtx  sync.Mutex
145
        pktCond *sync.Cond
146

147
        pktOutbox chan *htlcPacket
148
        pktReset  chan chan struct{}
149

150
        wireShutdown chan struct{}
151
        pktShutdown  chan struct{}
152
        quit         chan struct{}
153
        cancel       fn.Option[context.CancelFunc]
154

155
        // feeRate is set when the link receives or sends out fee updates. It
156
        // is refreshed when AttachMailBox is called in case a fee update did
157
        // not get committed. In some cases it may be out of sync with the
158
        // channel's feerate, but it should eventually get back in sync.
159
        feeRate chainfee.SatPerKWeight
160

161
        // isDust is set when AttachMailBox is called and serves to evaluate
162
        // the outstanding dust in the memoryMailBox given the current set
163
        // feeRate.
164
        isDust dustClosure
165
}
166

167
// newMemoryMailBox creates a new instance of the memoryMailBox.
168
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
3✔
169
        box := &memoryMailBox{
3✔
170
                cfg:           cfg,
3✔
171
                wireMessages:  list.New(),
3✔
172
                repPkts:       list.New(),
3✔
173
                addPkts:       list.New(),
3✔
174
                messageOutbox: make(chan lnwire.Message),
3✔
175
                pktOutbox:     make(chan *htlcPacket),
3✔
176
                msgReset:      make(chan chan struct{}, 1),
3✔
177
                pktReset:      make(chan chan struct{}, 1),
3✔
178
                repIndex:      make(map[CircuitKey]*list.Element),
3✔
179
                addIndex:      make(map[CircuitKey]*list.Element),
3✔
180
                wireShutdown:  make(chan struct{}),
3✔
181
                pktShutdown:   make(chan struct{}),
3✔
182
                quit:          make(chan struct{}),
3✔
183
        }
3✔
184
        box.wireCond = sync.NewCond(&box.wireMtx)
3✔
185
        box.pktCond = sync.NewCond(&box.pktMtx)
3✔
186

3✔
187
        return box
3✔
188
}
3✔
189

190
// A compile time assertion to ensure that memoryMailBox meets the MailBox
191
// interface.
192
var _ MailBox = (*memoryMailBox)(nil)
193

194
// courierType is an enum that reflects the distinct types of messages a
195
// MailBox can handle. Each type will be placed in an isolated mail box and
196
// will have a dedicated goroutine for delivering the messages.
197
type courierType uint8
198

199
const (
200
        // wireCourier is a type of courier that handles wire messages.
201
        wireCourier courierType = iota
202

203
        // pktCourier is a type of courier that handles htlc packets.
204
        pktCourier
205
)
206

207
// Start starts the mailbox and any goroutines it needs to operate properly.
208
//
209
// NOTE: This method is part of the MailBox interface.
210
func (m *memoryMailBox) Start() {
3✔
211
        m.started.Do(func() {
6✔
212
                ctx, cancel := context.WithCancel(context.Background())
3✔
213
                m.cancel = fn.Some(cancel)
3✔
214

3✔
215
                go m.wireMailCourier()
3✔
216
                go m.pktMailCourier(ctx)
3✔
217
        })
3✔
218
}
219

220
// ResetMessages blocks until all buffered wire messages are cleared.
221
func (m *memoryMailBox) ResetMessages() error {
3✔
222
        msgDone := make(chan struct{})
3✔
223
        select {
3✔
224
        case m.msgReset <- msgDone:
3✔
225
                return m.signalUntilReset(wireCourier, msgDone)
3✔
226
        case <-m.quit:
×
227
                return ErrMailBoxShuttingDown
×
228
        }
229
}
230

231
// ResetPackets blocks until the head of packets buffer is reset, causing the
232
// packets to be redelivered in order.
233
func (m *memoryMailBox) ResetPackets() error {
3✔
234
        pktDone := make(chan struct{})
3✔
235
        select {
3✔
236
        case m.pktReset <- pktDone:
3✔
237
                return m.signalUntilReset(pktCourier, pktDone)
3✔
238
        case <-m.quit:
×
239
                return ErrMailBoxShuttingDown
×
240
        }
241
}
242

243
// signalUntilReset strobes the condition variable for the specified inbox type
244
// until receiving a response that the mailbox has processed a reset.
245
func (m *memoryMailBox) signalUntilReset(cType courierType,
246
        done chan struct{}) error {
3✔
247

3✔
248
        for {
6✔
249

3✔
250
                switch cType {
3✔
251
                case wireCourier:
3✔
252
                        m.wireCond.Signal()
3✔
253
                case pktCourier:
3✔
254
                        m.pktCond.Signal()
3✔
255
                }
256

257
                select {
3✔
258
                case <-time.After(time.Millisecond):
3✔
259
                        continue
3✔
260
                case <-done:
3✔
261
                        return nil
3✔
262
                case <-m.quit:
×
263
                        return ErrMailBoxShuttingDown
×
264
                }
265
        }
266
}
267

268
// AckPacket removes the packet identified by it's incoming circuit key from the
269
// queue of packets to be delivered. The returned boolean indicates whether or
270
// not a packet with the passed incoming circuit key was removed.
271
//
272
// NOTE: It is safe to call this method multiple times for the same circuit key.
273
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
3✔
274
        m.pktCond.L.Lock()
3✔
275
        defer m.pktCond.L.Unlock()
3✔
276

3✔
277
        if entry, ok := m.repIndex[inKey]; ok {
6✔
278
                // Check whether we are removing the head of the queue. If so,
3✔
279
                // we must advance the head to the next packet before removing.
3✔
280
                // It's possible that the courier has already advanced the
3✔
281
                // repHead, so this check prevents the repHead from getting
3✔
282
                // desynchronized.
3✔
283
                if entry == m.repHead {
3✔
284
                        m.repHead = entry.Next()
×
285
                }
×
286
                m.repPkts.Remove(entry)
3✔
287
                delete(m.repIndex, inKey)
3✔
288

3✔
289
                return true
3✔
290
        }
291

292
        if entry, ok := m.addIndex[inKey]; ok {
6✔
293
                // Check whether we are removing the head of the queue. If so,
3✔
294
                // we must advance the head to the next add before removing.
3✔
295
                // It's possible that the courier has already advanced the
3✔
296
                // addHead, so this check prevents the addHead from getting
3✔
297
                // desynchronized.
3✔
298
                //
3✔
299
                // NOTE: While this event is rare for Settles or Fails, it could
3✔
300
                // be very common for Adds since the mailbox has the ability to
3✔
301
                // cancel Adds before they are delivered. When that occurs, the
3✔
302
                // head of addPkts has only been peeked and we expect to be
3✔
303
                // removing the head of the queue.
3✔
304
                if entry == m.addHead {
3✔
305
                        m.addHead = entry.Next()
×
306
                }
×
307

308
                m.addPkts.Remove(entry)
3✔
309
                delete(m.addIndex, inKey)
3✔
310

3✔
311
                return true
3✔
312
        }
313

314
        return false
×
315
}
316

317
// HasPacket queries the packets for a circuit key, this is used to drop packets
318
// bound for the switch that already have a queued response.
319
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
3✔
320
        m.pktCond.L.Lock()
3✔
321
        _, ok := m.repIndex[inKey]
3✔
322
        m.pktCond.L.Unlock()
3✔
323

3✔
324
        return ok
3✔
325
}
3✔
326

327
// Stop signals the mailbox and its goroutines for a graceful shutdown.
328
//
329
// NOTE: This method is part of the MailBox interface.
330
func (m *memoryMailBox) Stop() {
3✔
331
        m.stopped.Do(func() {
6✔
332
                m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
6✔
333
                close(m.quit)
3✔
334

3✔
335
                m.signalUntilShutdown(wireCourier)
3✔
336
                m.signalUntilShutdown(pktCourier)
3✔
337
        })
338
}
339

340
// signalUntilShutdown strobes the condition variable of the passed courier
341
// type, blocking until the worker has exited.
342
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
3✔
343
        var (
3✔
344
                cond     *sync.Cond
3✔
345
                shutdown chan struct{}
3✔
346
        )
3✔
347

3✔
348
        switch cType {
3✔
349
        case wireCourier:
3✔
350
                cond = m.wireCond
3✔
351
                shutdown = m.wireShutdown
3✔
352
        case pktCourier:
3✔
353
                cond = m.pktCond
3✔
354
                shutdown = m.pktShutdown
3✔
355
        }
356

357
        for {
6✔
358
                select {
3✔
359
                case <-time.After(time.Millisecond):
3✔
360
                        cond.Signal()
3✔
361
                case <-shutdown:
3✔
362
                        return
3✔
363
                }
364
        }
365
}
366

367
// pktWithExpiry wraps an incoming packet and records the time at which it it
368
// should be canceled from the mailbox. This will be used to detect if it gets
369
// stuck in the mailbox and inform when to cancel back.
370
type pktWithExpiry struct {
371
        pkt    *htlcPacket
372
        expiry time.Time
373
}
374

375
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
3✔
376
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
3✔
377
}
3✔
378

379
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
380
// wire messages.
381
func (m *memoryMailBox) wireMailCourier() {
3✔
382
        defer close(m.wireShutdown)
3✔
383

3✔
384
        for {
6✔
385
                // First, we'll check our condition. If our mailbox is empty,
3✔
386
                // then we'll wait until a new item is added.
3✔
387
                m.wireCond.L.Lock()
3✔
388
                for m.wireMessages.Front() == nil {
6✔
389
                        m.wireCond.Wait()
3✔
390

3✔
391
                        select {
3✔
392
                        case msgDone := <-m.msgReset:
3✔
393
                                m.wireMessages.Init()
3✔
394
                                close(msgDone)
3✔
395
                        case <-m.quit:
3✔
396
                                m.wireCond.L.Unlock()
3✔
397
                                return
3✔
398
                        default:
3✔
399
                        }
400
                }
401

402
                // Grab the datum off the front of the queue, shifting the
403
                // slice's reference down one in order to remove the datum from
404
                // the queue.
405
                entry := m.wireMessages.Front()
3✔
406

3✔
407
                //nolint:forcetypeassert
3✔
408
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3✔
409

3✔
410
                // Now that we're done with the condition, we can unlock it to
3✔
411
                // allow any callers to append to the end of our target queue.
3✔
412
                m.wireCond.L.Unlock()
3✔
413

3✔
414
                // With the next message obtained, we'll now select to attempt
3✔
415
                // to deliver the message. If we receive a kill signal, then
3✔
416
                // we'll bail out.
3✔
417
                select {
3✔
418
                case m.messageOutbox <- nextMsg:
3✔
419
                case msgDone := <-m.msgReset:
×
420
                        m.wireCond.L.Lock()
×
421
                        m.wireMessages.Init()
×
422
                        m.wireCond.L.Unlock()
×
423

×
424
                        close(msgDone)
×
425
                case <-m.quit:
1✔
426
                        return
1✔
427
                }
428
        }
429
}
430

431
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
432
// packet messages.
433
func (m *memoryMailBox) pktMailCourier(ctx context.Context) {
3✔
434
        defer close(m.pktShutdown)
3✔
435

3✔
436
        for {
6✔
437
                // First, we'll check our condition. If our mailbox is empty,
3✔
438
                // then we'll wait until a new item is added.
3✔
439
                m.pktCond.L.Lock()
3✔
440
                for m.repHead == nil && m.addHead == nil {
6✔
441
                        m.pktCond.Wait()
3✔
442

3✔
443
                        select {
3✔
444
                        // Resetting the packet queue means just moving our
445
                        // pointer to the front. This ensures that any un-ACK'd
446
                        // messages are re-delivered upon reconnect.
447
                        case pktDone := <-m.pktReset:
3✔
448
                                m.repHead = m.repPkts.Front()
3✔
449
                                m.addHead = m.addPkts.Front()
3✔
450

3✔
451
                                close(pktDone)
3✔
452

453
                        case <-m.quit:
3✔
454
                                m.pktCond.L.Unlock()
3✔
455
                                return
3✔
456
                        default:
3✔
457
                        }
458
                }
459

460
                var (
3✔
461
                        nextRep   *htlcPacket
3✔
462
                        nextRepEl *list.Element
3✔
463
                        nextAdd   *pktWithExpiry
3✔
464
                        nextAddEl *list.Element
3✔
465
                )
3✔
466
                // For packets, we actually never remove an item until it has
3✔
467
                // been ACK'd by the link. This ensures that if a read packet
3✔
468
                // doesn't make it into a commitment, then it'll be
3✔
469
                // re-delivered once the link comes back online.
3✔
470

3✔
471
                // Peek at the head of the Settle/Fails and Add queues. We peak
3✔
472
                // both even if there is a Settle/Fail present because we need
3✔
473
                // to set a deadline for the next pending Add if it's present.
3✔
474
                // Due to clock monotonicity, we know that the head of the Adds
3✔
475
                // is the next to expire.
3✔
476
                if m.repHead != nil {
6✔
477
                        //nolint:forcetypeassert
3✔
478
                        nextRep = m.repHead.Value.(*htlcPacket)
3✔
479
                        nextRepEl = m.repHead
3✔
480
                }
3✔
481
                if m.addHead != nil {
6✔
482
                        //nolint:forcetypeassert
3✔
483
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
3✔
484
                        nextAddEl = m.addHead
3✔
485
                }
3✔
486

487
                // Now that we're done with the condition, we can unlock it to
488
                // allow any callers to append to the end of our target queue.
489
                m.pktCond.L.Unlock()
3✔
490

3✔
491
                var (
3✔
492
                        pktOutbox chan *htlcPacket
3✔
493
                        addOutbox chan *htlcPacket
3✔
494
                        add       *htlcPacket
3✔
495
                        deadline  <-chan time.Time
3✔
496
                )
3✔
497

3✔
498
                // Prioritize delivery of Settle/Fail packets over Adds. This
3✔
499
                // ensures that we actively clear the commitment of existing
3✔
500
                // HTLCs before trying to add new ones. This can help to improve
3✔
501
                // forwarding performance since the time to sign a commitment is
3✔
502
                // linear in the number of HTLCs manifested on the commitments.
3✔
503
                //
3✔
504
                // NOTE: Both types are eventually delivered over the same
3✔
505
                // channel, but we can control which is delivered by exclusively
3✔
506
                // making one nil and the other non-nil. We know from our loop
3✔
507
                // condition that at least one nextRep and nextAdd are non-nil.
3✔
508
                if nextRep != nil {
6✔
509
                        pktOutbox = m.pktOutbox
3✔
510
                } else {
6✔
511
                        addOutbox = m.pktOutbox
3✔
512
                }
3✔
513

514
                // If we have a pending Add, we'll also construct the deadline
515
                // so we can fail it back if we are unable to deliver any
516
                // message in time. We also dereference the nextAdd's packet,
517
                // since we will need access to it in the case we are delivering
518
                // it and/or if the deadline expires.
519
                //
520
                // NOTE: It's possible after this point for add to be nil, but
521
                // this can only occur when addOutbox is also nil, hence we
522
                // won't accidentally deliver a nil packet.
523
                if nextAdd != nil {
6✔
524
                        add = nextAdd.pkt
3✔
525
                        deadline = nextAdd.deadline(m.cfg.clock)
3✔
526
                }
3✔
527

528
                select {
3✔
529
                case pktOutbox <- nextRep:
3✔
530
                        m.pktCond.L.Lock()
3✔
531
                        // Only advance the repHead if this Settle or Fail is
3✔
532
                        // still at the head of the queue.
3✔
533
                        if m.repHead != nil && m.repHead == nextRepEl {
6✔
534
                                m.repHead = m.repHead.Next()
3✔
535
                        }
3✔
536
                        m.pktCond.L.Unlock()
3✔
537

538
                case addOutbox <- add:
3✔
539
                        m.pktCond.L.Lock()
3✔
540
                        // Only advance the addHead if this Add is still at the
3✔
541
                        // head of the queue.
3✔
542
                        if m.addHead != nil && m.addHead == nextAddEl {
6✔
543
                                m.addHead = m.addHead.Next()
3✔
544
                        }
3✔
545
                        m.pktCond.L.Unlock()
3✔
546

547
                case <-deadline:
×
548
                        log.Debugf("Expiring add htlc with "+
×
549
                                "keystone=%v", add.keystone())
×
NEW
550
                        m.FailAdd(ctx, add)
×
551

552
                case pktDone := <-m.pktReset:
3✔
553
                        m.pktCond.L.Lock()
3✔
554
                        m.repHead = m.repPkts.Front()
3✔
555
                        m.addHead = m.addPkts.Front()
3✔
556
                        m.pktCond.L.Unlock()
3✔
557

3✔
558
                        close(pktDone)
3✔
559

560
                case <-m.quit:
3✔
561
                        return
3✔
562
                }
563
        }
564
}
565

566
// AddMessage appends a new message to the end of the message queue.
567
//
568
// NOTE: This method is safe for concrete use and part of the MailBox
569
// interface.
570
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3✔
571
        // First, we'll lock the condition, and add the message to the end of
3✔
572
        // the wire message inbox.
3✔
573
        m.wireCond.L.Lock()
3✔
574
        m.wireMessages.PushBack(msg)
3✔
575
        m.wireCond.L.Unlock()
3✔
576

3✔
577
        // With the message added, we signal to the mailCourier that there are
3✔
578
        // additional messages to deliver.
3✔
579
        m.wireCond.Signal()
3✔
580

3✔
581
        return nil
3✔
582
}
3✔
583

584
// AddPacket appends a new message to the end of the packet queue.
585
//
586
// NOTE: This method is safe for concrete use and part of the MailBox
587
// interface.
588
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
3✔
589
        m.pktCond.L.Lock()
3✔
590
        switch htlc := pkt.htlc.(type) {
3✔
591
        // Split off Settle/Fail packets into the repPkts queue.
592
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
3✔
593
                if _, ok := m.repIndex[pkt.inKey()]; ok {
3✔
594
                        m.pktCond.L.Unlock()
×
595
                        return ErrPacketAlreadyExists
×
596
                }
×
597

598
                entry := m.repPkts.PushBack(pkt)
3✔
599
                m.repIndex[pkt.inKey()] = entry
3✔
600
                if m.repHead == nil {
6✔
601
                        m.repHead = entry
3✔
602
                }
3✔
603

604
        // Split off Add packets into the addPkts queue.
605
        case *lnwire.UpdateAddHTLC:
3✔
606
                if _, ok := m.addIndex[pkt.inKey()]; ok {
3✔
607
                        m.pktCond.L.Unlock()
×
608
                        return ErrPacketAlreadyExists
×
609
                }
×
610

611
                entry := m.addPkts.PushBack(&pktWithExpiry{
3✔
612
                        pkt:    pkt,
3✔
613
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
3✔
614
                })
3✔
615
                m.addIndex[pkt.inKey()] = entry
3✔
616
                if m.addHead == nil {
6✔
617
                        m.addHead = entry
3✔
618
                }
3✔
619

620
        default:
×
621
                m.pktCond.L.Unlock()
×
622
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
623
        }
624
        m.pktCond.L.Unlock()
3✔
625

3✔
626
        // With the packet added, we signal to the mailCourier that there are
3✔
627
        // additional packets to consume.
3✔
628
        m.pktCond.Signal()
3✔
629

3✔
630
        return nil
3✔
631
}
632

633
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
634
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
3✔
635
        m.pktCond.L.Lock()
3✔
636
        defer m.pktCond.L.Unlock()
3✔
637

3✔
638
        m.feeRate = feeRate
3✔
639
}
3✔
640

641
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
642
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
3✔
643
        m.pktCond.L.Lock()
3✔
644
        defer m.pktCond.L.Unlock()
3✔
645

3✔
646
        m.isDust = isDust
3✔
647
}
3✔
648

649
// DustPackets returns the dust sum for add packets in the mailbox. The first
650
// return value is the local dust sum and the second is the remote dust sum.
651
// This will keep track of a given dust HTLC from the time it is added via
652
// AddPacket until it is removed via AckPacket.
653
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
654
        lnwire.MilliSatoshi) {
3✔
655

3✔
656
        m.pktCond.L.Lock()
3✔
657
        defer m.pktCond.L.Unlock()
3✔
658

3✔
659
        var (
3✔
660
                localDustSum  lnwire.MilliSatoshi
3✔
661
                remoteDustSum lnwire.MilliSatoshi
3✔
662
        )
3✔
663

3✔
664
        // Run through the map of HTLC's and determine the dust sum with calls
3✔
665
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
3✔
666
        // are outgoing so the second argument to isDust will be false.
3✔
667
        for _, e := range m.addIndex {
6✔
668
                addPkt := e.Value.(*pktWithExpiry).pkt
3✔
669

3✔
670
                // Evaluate whether this HTLC is dust on the local commitment.
3✔
671
                if m.isDust(
3✔
672
                        m.feeRate, false, lntypes.Local,
3✔
673
                        addPkt.amount.ToSatoshis(),
3✔
674
                ) {
6✔
675

3✔
676
                        localDustSum += addPkt.amount
3✔
677
                }
3✔
678

679
                // Evaluate whether this HTLC is dust on the remote commitment.
680
                if m.isDust(
3✔
681
                        m.feeRate, false, lntypes.Remote,
3✔
682
                        addPkt.amount.ToSatoshis(),
3✔
683
                ) {
6✔
684

3✔
685
                        remoteDustSum += addPkt.amount
3✔
686
                }
3✔
687
        }
688

689
        return localDustSum, remoteDustSum
3✔
690
}
691

692
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
693
// from the in-memory replay buffer. This will prevent the packet from being
694
// delivered after the link restarts if the switch has remained online. The
695
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
696
// FailureDetail.
697
func (m *memoryMailBox) FailAdd(ctx context.Context, pkt *htlcPacket) {
3✔
698
        // First, remove the packet from mailbox. If we didn't find the packet
3✔
699
        // because it has already been acked, we'll exit early to avoid sending
3✔
700
        // a duplicate fail message through the switch.
3✔
701
        if !m.AckPacket(pkt.inKey()) {
3✔
702
                return
×
703
        }
×
704

705
        var (
3✔
706
                localFailure = false
3✔
707
                reason       lnwire.OpaqueReason
3✔
708
        )
3✔
709

3✔
710
        // Create a temporary channel failure which we will send back to our
3✔
711
        // peer if this is a forward, or report to the user if the failed
3✔
712
        // payment was locally initiated.
3✔
713
        failure := m.cfg.failMailboxUpdate(
3✔
714
                ctx, pkt.originalOutgoingChanID, m.cfg.shortChanID,
3✔
715
        )
3✔
716

3✔
717
        // If the payment was locally initiated (which is indicated by a nil
3✔
718
        // obfuscator), we do not need to encrypt it back to the sender.
3✔
719
        if pkt.obfuscator == nil {
6✔
720
                var b bytes.Buffer
3✔
721
                err := lnwire.EncodeFailure(&b, failure, 0)
3✔
722
                if err != nil {
3✔
723
                        log.Errorf("Unable to encode failure: %v", err)
×
724
                        return
×
725
                }
×
726
                reason = lnwire.OpaqueReason(b.Bytes())
3✔
727
                localFailure = true
3✔
728
        } else {
×
729
                // If the packet is part of a forward, (identified by a non-nil
×
730
                // obfuscator) we need to encrypt the error back to the source.
×
731
                var err error
×
732
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
×
733
                if err != nil {
×
734
                        log.Errorf("Unable to obfuscate error: %v", err)
×
735
                        return
×
736
                }
×
737
        }
738

739
        // Create a link error containing the temporary channel failure and a
740
        // detail which indicates the we failed to add the htlc.
741
        linkError := NewDetailedLinkError(
3✔
742
                failure, OutgoingFailureDownstreamHtlcAdd,
3✔
743
        )
3✔
744

3✔
745
        failPkt := &htlcPacket{
3✔
746
                incomingChanID: pkt.incomingChanID,
3✔
747
                incomingHTLCID: pkt.incomingHTLCID,
3✔
748
                circuit:        pkt.circuit,
3✔
749
                sourceRef:      pkt.sourceRef,
3✔
750
                hasSource:      true,
3✔
751
                localFailure:   localFailure,
3✔
752
                obfuscator:     pkt.obfuscator,
3✔
753
                linkFailure:    linkError,
3✔
754
                htlc: &lnwire.UpdateFailHTLC{
3✔
755
                        Reason: reason,
3✔
756
                },
3✔
757
        }
3✔
758

3✔
759
        if err := m.cfg.forwardPackets(ctx, m.quit, failPkt); err != nil {
3✔
760
                log.Errorf("Unhandled error while reforwarding packets "+
×
761
                        "settle/fail over htlcswitch: %v", err)
×
762
        }
×
763
}
764

765
// MessageOutBox returns a channel that any new messages ready for delivery
766
// will be sent on.
767
//
768
// NOTE: This method is part of the MailBox interface.
769
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
3✔
770
        return m.messageOutbox
3✔
771
}
3✔
772

773
// PacketOutBox returns a channel that any new packets ready for delivery will
774
// be sent on.
775
//
776
// NOTE: This method is part of the MailBox interface.
777
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
3✔
778
        return m.pktOutbox
3✔
779
}
3✔
780

781
// mailOrchestrator is responsible for coordinating the creation and lifecycle
782
// of mailboxes used within the switch. It supports the ability to create
783
// mailboxes, reassign their short channel id's, deliver htlc packets, and
784
// queue packets for mailboxes that have not been created due to a link's late
785
// registration.
786
type mailOrchestrator struct {
787
        mu sync.RWMutex
788

789
        cfg *mailOrchConfig
790

791
        // mailboxes caches exactly one mailbox for all known channels.
792
        mailboxes map[lnwire.ChannelID]MailBox
793

794
        // liveIndex maps a live short chan id to the primary mailbox key.
795
        // An index in liveIndex map is only entered under two conditions:
796
        //   1. A link has a non-zero short channel id at time of AddLink.
797
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
798
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
799

800
        // TODO(conner): add another pair of indexes:
801
        //   chan_id -> short_chan_id
802
        //   short_chan_id -> mailbox
803
        // so that Deliver can lookup mailbox directly once live,
804
        // but still queryable by channel_id.
805

806
        // unclaimedPackets maps a live short chan id to queue of packets if no
807
        // mailbox has been created.
808
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
809
}
810

811
type mailOrchConfig struct {
812
        // forwardPackets send a varidic number of htlcPackets to the switch to
813
        // be routed. A quit channel should be provided so that the call can
814
        // properly exit during shutdown.
815
        forwardPackets func(context.Context, <-chan struct{},
816
                ...*htlcPacket) error
817

818
        // clock is a time source for the generated mailboxes.
819
        clock clock.Clock
820

821
        // expiry is the interval after which Adds will be cancelled if they
822
        // have not been yet been delivered. The computed deadline will expiry
823
        // this long after the Adds are added to a mailbox via AddPacket.
824
        expiry time.Duration
825

826
        // failMailboxUpdate is used to fail an expired HTLC and use the
827
        // correct SCID if the underlying channel uses aliases.
828
        failMailboxUpdate func(ctx context.Context, outScid,
829
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
830
}
831

832
// newMailOrchestrator initializes a fresh mailOrchestrator.
833
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
3✔
834
        return &mailOrchestrator{
3✔
835
                cfg:              cfg,
3✔
836
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
3✔
837
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
3✔
838
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
3✔
839
        }
3✔
840
}
3✔
841

842
// Stop instructs the orchestrator to stop all active mailboxes.
843
func (mo *mailOrchestrator) Stop() {
3✔
844
        for _, mailbox := range mo.mailboxes {
6✔
845
                mailbox.Stop()
3✔
846
        }
3✔
847
}
848

849
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
850
// creates and returns a new mailbox if none is found.
851
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
852
        shortChanID lnwire.ShortChannelID) MailBox {
3✔
853

3✔
854
        // First, try lookup the mailbox directly using only the shared mutex.
3✔
855
        mo.mu.RLock()
3✔
856
        mailbox, ok := mo.mailboxes[chanID]
3✔
857
        if ok {
6✔
858
                mo.mu.RUnlock()
3✔
859
                return mailbox
3✔
860
        }
3✔
861
        mo.mu.RUnlock()
3✔
862

3✔
863
        // Otherwise, we will try again with exclusive lock, creating a mailbox
3✔
864
        // if one still has not been created.
3✔
865
        mo.mu.Lock()
3✔
866
        mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
3✔
867
        mo.mu.Unlock()
3✔
868

3✔
869
        return mailbox
3✔
870
}
871

872
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
873
// given channel id. If none is found, a new one is creates, started, and
874
// recorded.
875
//
876
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
877
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
878
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
3✔
879

3✔
880
        mailbox, ok := mo.mailboxes[chanID]
3✔
881
        if !ok {
6✔
882
                mailbox = newMemoryMailBox(&mailBoxConfig{
3✔
883
                        shortChanID:       shortChanID,
3✔
884
                        forwardPackets:    mo.cfg.forwardPackets,
3✔
885
                        clock:             mo.cfg.clock,
3✔
886
                        expiry:            mo.cfg.expiry,
3✔
887
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
3✔
888
                })
3✔
889
                mailbox.Start()
3✔
890
                mo.mailboxes[chanID] = mailbox
3✔
891
        }
3✔
892

893
        return mailbox
3✔
894
}
895

896
// BindLiveShortChanID registers that messages bound for a particular short
897
// channel id should be forwarded to the mailbox corresponding to the given
898
// channel id. This method also checks to see if there are any unclaimed
899
// packets for this short_chan_id. If any are found, they are delivered to the
900
// mailbox and removed (marked as claimed).
901
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
902
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
3✔
903

3✔
904
        mo.mu.Lock()
3✔
905
        // Update the mapping from short channel id to mailbox's channel id.
3✔
906
        mo.liveIndex[sid] = cid
3✔
907

3✔
908
        // Retrieve any unclaimed packets destined for this mailbox.
3✔
909
        pkts := mo.unclaimedPackets[sid]
3✔
910
        delete(mo.unclaimedPackets, sid)
3✔
911
        mo.mu.Unlock()
3✔
912

3✔
913
        // Deliver the unclaimed packets.
3✔
914
        for _, pkt := range pkts {
6✔
915
                mailbox.AddPacket(pkt)
3✔
916
        }
3✔
917
}
918

919
// Deliver lookups the target mailbox using the live index from short_chan_id
920
// to channel_id. If the mailbox is found, the message is delivered directly.
921
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
922
// mailbox upon the subsequent call to BindLiveShortChanID.
923
func (mo *mailOrchestrator) Deliver(
924
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
3✔
925

3✔
926
        var (
3✔
927
                mailbox MailBox
3✔
928
                found   bool
3✔
929
        )
3✔
930

3✔
931
        // First, try to find the channel id for the target short_chan_id. If
3✔
932
        // the link is live, we will also look up the created mailbox.
3✔
933
        mo.mu.RLock()
3✔
934
        chanID, isLive := mo.liveIndex[sid]
3✔
935
        if isLive {
6✔
936
                mailbox, found = mo.mailboxes[chanID]
3✔
937
        }
3✔
938
        mo.mu.RUnlock()
3✔
939

3✔
940
        // The link is live and target mailbox was found, deliver immediately.
3✔
941
        if isLive && found {
6✔
942
                return mailbox.AddPacket(pkt)
3✔
943
        }
3✔
944

945
        // If we detected that the link has not been made live, we will acquire
946
        // the exclusive lock preemptively in order to queue this packet in the
947
        // list of unclaimed packets.
948
        mo.mu.Lock()
3✔
949

3✔
950
        // Double check to see if the mailbox has been not made live since the
3✔
951
        // release of the shared lock.
3✔
952
        //
3✔
953
        // NOTE: Checking again with the exclusive lock held prevents a race
3✔
954
        // condition where BindLiveShortChanID is interleaved between the
3✔
955
        // release of the shared lock, and acquiring the exclusive lock. The
3✔
956
        // result would be stuck packets, as they wouldn't be redelivered until
3✔
957
        // the next call to BindLiveShortChanID, which is expected to occur
3✔
958
        // infrequently.
3✔
959
        chanID, isLive = mo.liveIndex[sid]
3✔
960
        if isLive {
3✔
961
                // Reaching this point indicates the mailbox is actually live.
×
962
                // We'll try to load the mailbox using the fresh channel id.
×
963
                //
×
964
                // NOTE: This should never create a new mailbox, as the live
×
965
                // index should only be set if the mailbox had been initialized
×
966
                // beforehand.  However, this does ensure that this case is
×
967
                // handled properly in the event that it could happen.
×
968
                mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
×
969
                mo.mu.Unlock()
×
970

×
971
                // Deliver the packet to the mailbox if it was found or created.
×
972
                return mailbox.AddPacket(pkt)
×
973
        }
×
974

975
        // Finally, if the channel id is still not found in the live index,
976
        // we'll add this to the list of unclaimed packets. These will be
977
        // delivered upon the next call to BindLiveShortChanID.
978
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
3✔
979
        mo.mu.Unlock()
3✔
980

3✔
981
        return nil
3✔
982
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc