• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16673052227

01 Aug 2025 10:44AM UTC coverage: 67.016% (-0.03%) from 67.047%
16673052227

Pull #9888

github

web-flow
Merge 1dd8765d7 into 37523b6cb
Pull Request #9888: Attributable failures

325 of 384 new or added lines in 16 files covered. (84.64%)

131 existing lines in 24 files now uncovered.

135611 of 202355 relevant lines covered (67.02%)

21613.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.9
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
var (
18
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
19
        // a shutdown request.
20
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
21

22
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
23
        // because it already exists in the mailbox.
24
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
25
)
26

27
// MailBox is an interface which represents a concurrent-safe, in-order
28
// delivery queue for messages from the network and also from the main switch.
29
// This struct serves as a buffer between incoming messages, and messages to
30
// the handled by the link. Each of the mutating methods within this interface
31
// should be implemented in a non-blocking manner.
32
type MailBox interface {
33
        // AddMessage appends a new message to the end of the message queue.
34
        AddMessage(msg lnwire.Message) error
35

36
        // AddPacket appends a new message to the end of the packet queue.
37
        AddPacket(pkt *htlcPacket) error
38

39
        // HasPacket queries the packets for a circuit key, this is used to drop
40
        // packets bound for the switch that already have a queued response.
41
        HasPacket(CircuitKey) bool
42

43
        // AckPacket removes a packet from the mailboxes in-memory replay
44
        // buffer. This will prevent a packet from being delivered after a link
45
        // restarts if the switch has remained online. The returned boolean
46
        // indicates whether or not a packet with the passed incoming circuit
47
        // key was removed.
48
        AckPacket(CircuitKey) bool
49

50
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
51
        // removing it from the in-memory replay buffer. This will prevent the
52
        // packet from being delivered after the link restarts if the switch has
53
        // remained online. The generated LinkError will show an
54
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
55
        FailAdd(pkt *htlcPacket)
56

57
        // MessageOutBox returns a channel that any new messages ready for
58
        // delivery will be sent on.
59
        MessageOutBox() chan lnwire.Message
60

61
        // PacketOutBox returns a channel that any new packets ready for
62
        // delivery will be sent on.
63
        PacketOutBox() chan *htlcPacket
64

65
        // Clears any pending wire messages from the inbox.
66
        ResetMessages() error
67

68
        // Reset the packet head to point at the first element in the list.
69
        ResetPackets() error
70

71
        // SetDustClosure takes in a closure that is used to evaluate whether
72
        // mailbox HTLC's are dust.
73
        SetDustClosure(isDust dustClosure)
74

75
        // SetFeeRate sets the feerate to be used when evaluating dust.
76
        SetFeeRate(feerate chainfee.SatPerKWeight)
77

78
        // DustPackets returns the dust sum for Adds in the mailbox for the
79
        // local and remote commitments.
80
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
81

82
        // Start starts the mailbox and any goroutines it needs to operate
83
        // properly.
84
        Start()
85

86
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
87
        Stop()
88
}
89

90
type mailBoxConfig struct {
91
        // shortChanID is the short channel id of the channel this mailbox
92
        // belongs to.
93
        shortChanID lnwire.ShortChannelID
94

95
        // forwardPackets send a varidic number of htlcPackets to the switch to
96
        // be routed. A quit channel should be provided so that the call can
97
        // properly exit during shutdown.
98
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
99

100
        // clock is a time source for the mailbox.
101
        clock clock.Clock
102

103
        // expiry is the interval after which Adds will be cancelled if they
104
        // have not been yet been delivered. The computed deadline will expiry
105
        // this long after the Adds are added via AddPacket.
106
        expiry time.Duration
107

108
        // failMailboxUpdate is used to fail an expired HTLC and use the
109
        // correct SCID if the underlying channel uses aliases.
110
        failMailboxUpdate func(outScid,
111
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
112
}
113

114
// memoryMailBox is an implementation of the MailBox struct backed by purely
115
// in-memory queues.
116
//
117
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
118
type memoryMailBox struct {
119
        started sync.Once
120
        stopped sync.Once
121

122
        cfg *mailBoxConfig
123

124
        wireMessages *list.List
125
        wireMtx      sync.Mutex
126
        wireCond     *sync.Cond
127

128
        messageOutbox chan lnwire.Message
129
        msgReset      chan chan struct{}
130

131
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
132
        repPkts  *list.List
133
        repIndex map[CircuitKey]*list.Element
134
        repHead  *list.Element
135

136
        // addPkts is a dedicated queue for Adds.
137
        addPkts  *list.List
138
        addIndex map[CircuitKey]*list.Element
139
        addHead  *list.Element
140

141
        pktMtx  sync.Mutex
142
        pktCond *sync.Cond
143

144
        pktOutbox chan *htlcPacket
145
        pktReset  chan chan struct{}
146

147
        wireShutdown chan struct{}
148
        pktShutdown  chan struct{}
149
        quit         chan struct{}
150

151
        // feeRate is set when the link receives or sends out fee updates. It
152
        // is refreshed when AttachMailBox is called in case a fee update did
153
        // not get committed. In some cases it may be out of sync with the
154
        // channel's feerate, but it should eventually get back in sync.
155
        feeRate chainfee.SatPerKWeight
156

157
        // isDust is set when AttachMailBox is called and serves to evaluate
158
        // the outstanding dust in the memoryMailBox given the current set
159
        // feeRate.
160
        isDust dustClosure
161
}
162

163
// newMemoryMailBox creates a new instance of the memoryMailBox.
164
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
336✔
165
        box := &memoryMailBox{
336✔
166
                cfg:           cfg,
336✔
167
                wireMessages:  list.New(),
336✔
168
                repPkts:       list.New(),
336✔
169
                addPkts:       list.New(),
336✔
170
                messageOutbox: make(chan lnwire.Message),
336✔
171
                pktOutbox:     make(chan *htlcPacket),
336✔
172
                msgReset:      make(chan chan struct{}, 1),
336✔
173
                pktReset:      make(chan chan struct{}, 1),
336✔
174
                repIndex:      make(map[CircuitKey]*list.Element),
336✔
175
                addIndex:      make(map[CircuitKey]*list.Element),
336✔
176
                wireShutdown:  make(chan struct{}),
336✔
177
                pktShutdown:   make(chan struct{}),
336✔
178
                quit:          make(chan struct{}),
336✔
179
        }
336✔
180
        box.wireCond = sync.NewCond(&box.wireMtx)
336✔
181
        box.pktCond = sync.NewCond(&box.pktMtx)
336✔
182

336✔
183
        return box
336✔
184
}
336✔
185

186
// A compile time assertion to ensure that memoryMailBox meets the MailBox
187
// interface.
188
var _ MailBox = (*memoryMailBox)(nil)
189

190
// courierType is an enum that reflects the distinct types of messages a
191
// MailBox can handle. Each type will be placed in an isolated mail box and
192
// will have a dedicated goroutine for delivering the messages.
193
type courierType uint8
194

195
const (
196
        // wireCourier is a type of courier that handles wire messages.
197
        wireCourier courierType = iota
198

199
        // pktCourier is a type of courier that handles htlc packets.
200
        pktCourier
201
)
202

203
// Start starts the mailbox and any goroutines it needs to operate properly.
204
//
205
// NOTE: This method is part of the MailBox interface.
206
func (m *memoryMailBox) Start() {
337✔
207
        m.started.Do(func() {
673✔
208
                go m.wireMailCourier()
336✔
209
                go m.pktMailCourier()
336✔
210
        })
336✔
211
}
212

213
// ResetMessages blocks until all buffered wire messages are cleared.
214
func (m *memoryMailBox) ResetMessages() error {
340✔
215
        msgDone := make(chan struct{})
340✔
216
        select {
340✔
217
        case m.msgReset <- msgDone:
340✔
218
                return m.signalUntilReset(wireCourier, msgDone)
340✔
219
        case <-m.quit:
×
220
                return ErrMailBoxShuttingDown
×
221
        }
222
}
223

224
// ResetPackets blocks until the head of packets buffer is reset, causing the
225
// packets to be redelivered in order.
226
func (m *memoryMailBox) ResetPackets() error {
544✔
227
        pktDone := make(chan struct{})
544✔
228
        select {
544✔
229
        case m.pktReset <- pktDone:
544✔
230
                return m.signalUntilReset(pktCourier, pktDone)
544✔
231
        case <-m.quit:
×
232
                return ErrMailBoxShuttingDown
×
233
        }
234
}
235

236
// signalUntilReset strobes the condition variable for the specified inbox type
237
// until receiving a response that the mailbox has processed a reset.
238
func (m *memoryMailBox) signalUntilReset(cType courierType,
239
        done chan struct{}) error {
881✔
240

881✔
241
        for {
2,078✔
242

1,197✔
243
                switch cType {
1,197✔
244
                case wireCourier:
654✔
245
                        m.wireCond.Signal()
654✔
246
                case pktCourier:
546✔
247
                        m.pktCond.Signal()
546✔
248
                }
249

250
                select {
1,197✔
251
                case <-time.After(time.Millisecond):
319✔
252
                        continue
319✔
253
                case <-done:
879✔
254
                        return nil
879✔
255
                case <-m.quit:
2✔
256
                        return ErrMailBoxShuttingDown
2✔
257
                }
258
        }
259
}
260

261
// AckPacket removes the packet identified by it's incoming circuit key from the
262
// queue of packets to be delivered. The returned boolean indicates whether or
263
// not a packet with the passed incoming circuit key was removed.
264
//
265
// NOTE: It is safe to call this method multiple times for the same circuit key.
266
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
592✔
267
        m.pktCond.L.Lock()
592✔
268
        defer m.pktCond.L.Unlock()
592✔
269

592✔
270
        if entry, ok := m.repIndex[inKey]; ok {
652✔
271
                // Check whether we are removing the head of the queue. If so,
60✔
272
                // we must advance the head to the next packet before removing.
60✔
273
                // It's possible that the courier has already advanced the
60✔
274
                // repHead, so this check prevents the repHead from getting
60✔
275
                // desynchronized.
60✔
276
                if entry == m.repHead {
66✔
277
                        m.repHead = entry.Next()
6✔
278
                }
6✔
279
                m.repPkts.Remove(entry)
60✔
280
                delete(m.repIndex, inKey)
60✔
281

60✔
282
                return true
60✔
283
        }
284

285
        if entry, ok := m.addIndex[inKey]; ok {
1,061✔
286
                // Check whether we are removing the head of the queue. If so,
526✔
287
                // we must advance the head to the next add before removing.
526✔
288
                // It's possible that the courier has already advanced the
526✔
289
                // addHead, so this check prevents the addHead from getting
526✔
290
                // desynchronized.
526✔
291
                //
526✔
292
                // NOTE: While this event is rare for Settles or Fails, it could
526✔
293
                // be very common for Adds since the mailbox has the ability to
526✔
294
                // cancel Adds before they are delivered. When that occurs, the
526✔
295
                // head of addPkts has only been peeked and we expect to be
526✔
296
                // removing the head of the queue.
526✔
297
                if entry == m.addHead {
564✔
298
                        m.addHead = entry.Next()
38✔
299
                }
38✔
300

301
                m.addPkts.Remove(entry)
526✔
302
                delete(m.addIndex, inKey)
526✔
303

526✔
304
                return true
526✔
305
        }
306

307
        return false
9✔
308
}
309

310
// HasPacket queries the packets for a circuit key, this is used to drop packets
311
// bound for the switch that already have a queued response.
312
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
577✔
313
        m.pktCond.L.Lock()
577✔
314
        _, ok := m.repIndex[inKey]
577✔
315
        m.pktCond.L.Unlock()
577✔
316

577✔
317
        return ok
577✔
318
}
577✔
319

320
// Stop signals the mailbox and its goroutines for a graceful shutdown.
321
//
322
// NOTE: This method is part of the MailBox interface.
323
func (m *memoryMailBox) Stop() {
313✔
324
        m.stopped.Do(func() {
625✔
325
                close(m.quit)
312✔
326

312✔
327
                m.signalUntilShutdown(wireCourier)
312✔
328
                m.signalUntilShutdown(pktCourier)
312✔
329
        })
312✔
330
}
331

332
// signalUntilShutdown strobes the condition variable of the passed courier
333
// type, blocking until the worker has exited.
334
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
621✔
335
        var (
621✔
336
                cond     *sync.Cond
621✔
337
                shutdown chan struct{}
621✔
338
        )
621✔
339

621✔
340
        switch cType {
621✔
341
        case wireCourier:
312✔
342
                cond = m.wireCond
312✔
343
                shutdown = m.wireShutdown
312✔
344
        case pktCourier:
312✔
345
                cond = m.pktCond
312✔
346
                shutdown = m.pktShutdown
312✔
347
        }
348

349
        for {
1,850✔
350
                select {
1,229✔
351
                case <-time.After(time.Millisecond):
611✔
352
                        cond.Signal()
611✔
353
                case <-shutdown:
621✔
354
                        return
621✔
355
                }
356
        }
357
}
358

359
// pktWithExpiry wraps an incoming packet and records the time at which it it
360
// should be canceled from the mailbox. This will be used to detect if it gets
361
// stuck in the mailbox and inform when to cancel back.
362
type pktWithExpiry struct {
363
        pkt    *htlcPacket
364
        expiry time.Time
365
}
366

367
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
603✔
368
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
603✔
369
}
603✔
370

371
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
372
// wire messages.
373
func (m *memoryMailBox) wireMailCourier() {
336✔
374
        defer close(m.wireShutdown)
336✔
375

336✔
376
        for {
3,976✔
377
                // First, we'll check our condition. If our mailbox is empty,
3,640✔
378
                // then we'll wait until a new item is added.
3,640✔
379
                m.wireCond.L.Lock()
3,640✔
380
                for m.wireMessages.Front() == nil {
7,434✔
381
                        m.wireCond.Wait()
3,794✔
382

3,794✔
383
                        select {
3,794✔
384
                        case msgDone := <-m.msgReset:
339✔
385
                                m.wireMessages.Init()
339✔
386
                                close(msgDone)
339✔
387
                        case <-m.quit:
312✔
388
                                m.wireCond.L.Unlock()
312✔
389
                                return
312✔
390
                        default:
3,125✔
391
                        }
392
                }
393

394
                // Grab the datum off the front of the queue, shifting the
395
                // slice's reference down one in order to remove the datum from
396
                // the queue.
397
                entry := m.wireMessages.Front()
3,307✔
398

3,307✔
399
                //nolint:forcetypeassert
3,307✔
400
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3,307✔
401

3,307✔
402
                // Now that we're done with the condition, we can unlock it to
3,307✔
403
                // allow any callers to append to the end of our target queue.
3,307✔
404
                m.wireCond.L.Unlock()
3,307✔
405

3,307✔
406
                // With the next message obtained, we'll now select to attempt
3,307✔
407
                // to deliver the message. If we receive a kill signal, then
3,307✔
408
                // we'll bail out.
3,307✔
409
                select {
3,307✔
410
                case m.messageOutbox <- nextMsg:
3,307✔
411
                case msgDone := <-m.msgReset:
×
412
                        m.wireCond.L.Lock()
×
413
                        m.wireMessages.Init()
×
414
                        m.wireCond.L.Unlock()
×
415

×
416
                        close(msgDone)
×
UNCOV
417
                case <-m.quit:
×
UNCOV
418
                        return
×
419
                }
420
        }
421
}
422

423
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
424
// packet messages.
425
func (m *memoryMailBox) pktMailCourier() {
336✔
426
        defer close(m.pktShutdown)
336✔
427

336✔
428
        for {
1,350✔
429
                // First, we'll check our condition. If our mailbox is empty,
1,014✔
430
                // then we'll wait until a new item is added.
1,014✔
431
                m.pktCond.L.Lock()
1,014✔
432
                for m.repHead == nil && m.addHead == nil {
2,376✔
433
                        m.pktCond.Wait()
1,362✔
434

1,362✔
435
                        select {
1,362✔
436
                        // Resetting the packet queue means just moving our
437
                        // pointer to the front. This ensures that any un-ACK'd
438
                        // messages are re-delivered upon reconnect.
439
                        case pktDone := <-m.pktReset:
537✔
440
                                m.repHead = m.repPkts.Front()
537✔
441
                                m.addHead = m.addPkts.Front()
537✔
442

537✔
443
                                close(pktDone)
537✔
444

445
                        case <-m.quit:
305✔
446
                                m.pktCond.L.Unlock()
305✔
447
                                return
305✔
448
                        default:
502✔
449
                        }
450
                }
451

452
                var (
688✔
453
                        nextRep   *htlcPacket
688✔
454
                        nextRepEl *list.Element
688✔
455
                        nextAdd   *pktWithExpiry
688✔
456
                        nextAddEl *list.Element
688✔
457
                )
688✔
458
                // For packets, we actually never remove an item until it has
688✔
459
                // been ACK'd by the link. This ensures that if a read packet
688✔
460
                // doesn't make it into a commitment, then it'll be
688✔
461
                // re-delivered once the link comes back online.
688✔
462

688✔
463
                // Peek at the head of the Settle/Fails and Add queues. We peak
688✔
464
                // both even if there is a Settle/Fail present because we need
688✔
465
                // to set a deadline for the next pending Add if it's present.
688✔
466
                // Due to clock monotonicity, we know that the head of the Adds
688✔
467
                // is the next to expire.
688✔
468
                if m.repHead != nil {
780✔
469
                        //nolint:forcetypeassert
92✔
470
                        nextRep = m.repHead.Value.(*htlcPacket)
92✔
471
                        nextRepEl = m.repHead
92✔
472
                }
92✔
473
                if m.addHead != nil {
1,291✔
474
                        //nolint:forcetypeassert
603✔
475
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
603✔
476
                        nextAddEl = m.addHead
603✔
477
                }
603✔
478

479
                // Now that we're done with the condition, we can unlock it to
480
                // allow any callers to append to the end of our target queue.
481
                m.pktCond.L.Unlock()
688✔
482

688✔
483
                var (
688✔
484
                        pktOutbox chan *htlcPacket
688✔
485
                        addOutbox chan *htlcPacket
688✔
486
                        add       *htlcPacket
688✔
487
                        deadline  <-chan time.Time
688✔
488
                )
688✔
489

688✔
490
                // Prioritize delivery of Settle/Fail packets over Adds. This
688✔
491
                // ensures that we actively clear the commitment of existing
688✔
492
                // HTLCs before trying to add new ones. This can help to improve
688✔
493
                // forwarding performance since the time to sign a commitment is
688✔
494
                // linear in the number of HTLCs manifested on the commitments.
688✔
495
                //
688✔
496
                // NOTE: Both types are eventually delivered over the same
688✔
497
                // channel, but we can control which is delivered by exclusively
688✔
498
                // making one nil and the other non-nil. We know from our loop
688✔
499
                // condition that at least one nextRep and nextAdd are non-nil.
688✔
500
                if nextRep != nil {
780✔
501
                        pktOutbox = m.pktOutbox
92✔
502
                } else {
691✔
503
                        addOutbox = m.pktOutbox
599✔
504
                }
599✔
505

506
                // If we have a pending Add, we'll also construct the deadline
507
                // so we can fail it back if we are unable to deliver any
508
                // message in time. We also dereference the nextAdd's packet,
509
                // since we will need access to it in the case we are delivering
510
                // it and/or if the deadline expires.
511
                //
512
                // NOTE: It's possible after this point for add to be nil, but
513
                // this can only occur when addOutbox is also nil, hence we
514
                // won't accidentally deliver a nil packet.
515
                if nextAdd != nil {
1,291✔
516
                        add = nextAdd.pkt
603✔
517
                        deadline = nextAdd.deadline(m.cfg.clock)
603✔
518
                }
603✔
519

520
                select {
688✔
521
                case pktOutbox <- nextRep:
90✔
522
                        m.pktCond.L.Lock()
90✔
523
                        // Only advance the repHead if this Settle or Fail is
90✔
524
                        // still at the head of the queue.
90✔
525
                        if m.repHead != nil && m.repHead == nextRepEl {
174✔
526
                                m.repHead = m.repHead.Next()
84✔
527
                        }
84✔
528
                        m.pktCond.L.Unlock()
90✔
529

530
                case addOutbox <- add:
552✔
531
                        m.pktCond.L.Lock()
552✔
532
                        // Only advance the addHead if this Add is still at the
552✔
533
                        // head of the queue.
552✔
534
                        if m.addHead != nil && m.addHead == nextAddEl {
1,102✔
535
                                m.addHead = m.addHead.Next()
550✔
536
                        }
550✔
537
                        m.pktCond.L.Unlock()
552✔
538

539
                case <-deadline:
36✔
540
                        log.Debugf("Expiring add htlc with "+
36✔
541
                                "keystone=%v", add.keystone())
36✔
542
                        m.FailAdd(add)
36✔
543

544
                case pktDone := <-m.pktReset:
9✔
545
                        m.pktCond.L.Lock()
9✔
546
                        m.repHead = m.repPkts.Front()
9✔
547
                        m.addHead = m.addPkts.Front()
9✔
548
                        m.pktCond.L.Unlock()
9✔
549

9✔
550
                        close(pktDone)
9✔
551

552
                case <-m.quit:
10✔
553
                        return
10✔
554
                }
555
        }
556
}
557

558
// AddMessage appends a new message to the end of the message queue.
559
//
560
// NOTE: This method is safe for concrete use and part of the MailBox
561
// interface.
562
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3,307✔
563
        // First, we'll lock the condition, and add the message to the end of
3,307✔
564
        // the wire message inbox.
3,307✔
565
        m.wireCond.L.Lock()
3,307✔
566
        m.wireMessages.PushBack(msg)
3,307✔
567
        m.wireCond.L.Unlock()
3,307✔
568

3,307✔
569
        // With the message added, we signal to the mailCourier that there are
3,307✔
570
        // additional messages to deliver.
3,307✔
571
        m.wireCond.Signal()
3,307✔
572

3,307✔
573
        return nil
3,307✔
574
}
3,307✔
575

576
// AddPacket appends a new message to the end of the packet queue.
577
//
578
// NOTE: This method is safe for concrete use and part of the MailBox
579
// interface.
580
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
2,093✔
581
        m.pktCond.L.Lock()
2,093✔
582
        switch htlc := pkt.htlc.(type) {
2,093✔
583
        // Split off Settle/Fail packets into the repPkts queue.
584
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
94✔
585
                if _, ok := m.repIndex[pkt.inKey()]; ok {
96✔
586
                        m.pktCond.L.Unlock()
2✔
587
                        return ErrPacketAlreadyExists
2✔
588
                }
2✔
589

590
                entry := m.repPkts.PushBack(pkt)
92✔
591
                m.repIndex[pkt.inKey()] = entry
92✔
592
                if m.repHead == nil {
181✔
593
                        m.repHead = entry
89✔
594
                }
89✔
595

596
        // Split off Add packets into the addPkts queue.
597
        case *lnwire.UpdateAddHTLC:
2,002✔
598
                if _, ok := m.addIndex[pkt.inKey()]; ok {
2,003✔
599
                        m.pktCond.L.Unlock()
1✔
600
                        return ErrPacketAlreadyExists
1✔
601
                }
1✔
602

603
                entry := m.addPkts.PushBack(&pktWithExpiry{
2,001✔
604
                        pkt:    pkt,
2,001✔
605
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
2,001✔
606
                })
2,001✔
607
                m.addIndex[pkt.inKey()] = entry
2,001✔
608
                if m.addHead == nil {
2,432✔
609
                        m.addHead = entry
431✔
610
                }
431✔
611

612
        default:
×
613
                m.pktCond.L.Unlock()
×
614
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
615
        }
616
        m.pktCond.L.Unlock()
2,090✔
617

2,090✔
618
        // With the packet added, we signal to the mailCourier that there are
2,090✔
619
        // additional packets to consume.
2,090✔
620
        m.pktCond.Signal()
2,090✔
621

2,090✔
622
        return nil
2,090✔
623
}
624

625
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
626
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
224✔
627
        m.pktCond.L.Lock()
224✔
628
        defer m.pktCond.L.Unlock()
224✔
629

224✔
630
        m.feeRate = feeRate
224✔
631
}
224✔
632

633
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
634
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
340✔
635
        m.pktCond.L.Lock()
340✔
636
        defer m.pktCond.L.Unlock()
340✔
637

340✔
638
        m.isDust = isDust
340✔
639
}
340✔
640

641
// DustPackets returns the dust sum for add packets in the mailbox. The first
642
// return value is the local dust sum and the second is the remote dust sum.
643
// This will keep track of a given dust HTLC from the time it is added via
644
// AddPacket until it is removed via AckPacket.
645
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
646
        lnwire.MilliSatoshi) {
408✔
647

408✔
648
        m.pktCond.L.Lock()
408✔
649
        defer m.pktCond.L.Unlock()
408✔
650

408✔
651
        var (
408✔
652
                localDustSum  lnwire.MilliSatoshi
408✔
653
                remoteDustSum lnwire.MilliSatoshi
408✔
654
        )
408✔
655

408✔
656
        // Run through the map of HTLC's and determine the dust sum with calls
408✔
657
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
408✔
658
        // are outgoing so the second argument to isDust will be false.
408✔
659
        for _, e := range m.addIndex {
3,985✔
660
                addPkt := e.Value.(*pktWithExpiry).pkt
3,577✔
661

3,577✔
662
                // Evaluate whether this HTLC is dust on the local commitment.
3,577✔
663
                if m.isDust(
3,577✔
664
                        m.feeRate, false, lntypes.Local,
3,577✔
665
                        addPkt.amount.ToSatoshis(),
3,577✔
666
                ) {
7,148✔
667

3,571✔
668
                        localDustSum += addPkt.amount
3,571✔
669
                }
3,571✔
670

671
                // Evaluate whether this HTLC is dust on the remote commitment.
672
                if m.isDust(
3,577✔
673
                        m.feeRate, false, lntypes.Remote,
3,577✔
674
                        addPkt.amount.ToSatoshis(),
3,577✔
675
                ) {
7,154✔
676

3,577✔
677
                        remoteDustSum += addPkt.amount
3,577✔
678
                }
3,577✔
679
        }
680

681
        return localDustSum, remoteDustSum
408✔
682
}
683

684
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
685
// from the in-memory replay buffer. This will prevent the packet from being
686
// delivered after the link restarts if the switch has remained online. The
687
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
688
// FailureDetail.
689
func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
50✔
690
        // First, remove the packet from mailbox. If we didn't find the packet
50✔
691
        // because it has already been acked, we'll exit early to avoid sending
50✔
692
        // a duplicate fail message through the switch.
50✔
693
        if !m.AckPacket(pkt.inKey()) {
55✔
694
                return
5✔
695
        }
5✔
696

697
        var (
45✔
698
                localFailure = false
45✔
699
                reason       lnwire.OpaqueReason
45✔
700
                attrData     []byte
45✔
701
        )
45✔
702

45✔
703
        // Create a temporary channel failure which we will send back to our
45✔
704
        // peer if this is a forward, or report to the user if the failed
45✔
705
        // payment was locally initiated.
45✔
706
        failure := m.cfg.failMailboxUpdate(
45✔
707
                pkt.originalOutgoingChanID, m.cfg.shortChanID,
45✔
708
        )
45✔
709

45✔
710
        // If the payment was locally initiated (which is indicated by a nil
45✔
711
        // obfuscator), we do not need to encrypt it back to the sender.
45✔
712
        if pkt.obfuscator == nil {
84✔
713
                var b bytes.Buffer
39✔
714
                err := lnwire.EncodeFailure(&b, failure, 0)
39✔
715
                if err != nil {
39✔
716
                        log.Errorf("Unable to encode failure: %v", err)
×
717
                        return
×
718
                }
×
719
                reason = lnwire.OpaqueReason(b.Bytes())
39✔
720
                localFailure = true
39✔
721
        } else {
6✔
722
                // If the packet is part of a forward, (identified by a non-nil
6✔
723
                // obfuscator) we need to encrypt the error back to the source.
6✔
724
                var err error
6✔
725
                reason, attrData, err = pkt.obfuscator.EncryptFirstHop(failure)
6✔
726
                if err != nil {
6✔
727
                        log.Errorf("Unable to obfuscate error: %v", err)
×
728
                        return
×
729
                }
×
730
        }
731

732
        extraData, err := lnwire.AttrDataToExtraData(attrData)
45✔
733
        if err != nil {
45✔
NEW
734
                log.Errorf("Failed to convert attr data: %w", err)
×
NEW
735
        }
×
736

737
        // Create a link error containing the temporary channel failure and a
738
        // detail which indicates the we failed to add the htlc.
739
        linkError := NewDetailedLinkError(
45✔
740
                failure, OutgoingFailureDownstreamHtlcAdd,
45✔
741
        )
45✔
742

45✔
743
        failPkt := &htlcPacket{
45✔
744
                incomingChanID: pkt.incomingChanID,
45✔
745
                incomingHTLCID: pkt.incomingHTLCID,
45✔
746
                circuit:        pkt.circuit,
45✔
747
                sourceRef:      pkt.sourceRef,
45✔
748
                hasSource:      true,
45✔
749
                localFailure:   localFailure,
45✔
750
                obfuscator:     pkt.obfuscator,
45✔
751
                linkFailure:    linkError,
45✔
752
                htlc: &lnwire.UpdateFailHTLC{
45✔
753
                        Reason:    reason,
45✔
754
                        ExtraData: extraData,
45✔
755
                },
45✔
756
        }
45✔
757

45✔
758
        if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
45✔
759
                log.Errorf("Unhandled error while reforwarding packets "+
×
760
                        "settle/fail over htlcswitch: %v", err)
×
761
        }
×
762
}
763

764
// MessageOutBox returns a channel that any new messages ready for delivery
765
// will be sent on.
766
//
767
// NOTE: This method is part of the MailBox interface.
768
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
236✔
769
        return m.messageOutbox
236✔
770
}
236✔
771

772
// PacketOutBox returns a channel that any new packets ready for delivery will
773
// be sent on.
774
//
775
// NOTE: This method is part of the MailBox interface.
776
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
387✔
777
        return m.pktOutbox
387✔
778
}
387✔
779

780
// mailOrchestrator is responsible for coordinating the creation and lifecycle
781
// of mailboxes used within the switch. It supports the ability to create
782
// mailboxes, reassign their short channel id's, deliver htlc packets, and
783
// queue packets for mailboxes that have not been created due to a link's late
784
// registration.
785
type mailOrchestrator struct {
786
        mu sync.RWMutex
787

788
        cfg *mailOrchConfig
789

790
        // mailboxes caches exactly one mailbox for all known channels.
791
        mailboxes map[lnwire.ChannelID]MailBox
792

793
        // liveIndex maps a live short chan id to the primary mailbox key.
794
        // An index in liveIndex map is only entered under two conditions:
795
        //   1. A link has a non-zero short channel id at time of AddLink.
796
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
797
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
798

799
        // TODO(conner): add another pair of indexes:
800
        //   chan_id -> short_chan_id
801
        //   short_chan_id -> mailbox
802
        // so that Deliver can lookup mailbox directly once live,
803
        // but still queryable by channel_id.
804

805
        // unclaimedPackets maps a live short chan id to queue of packets if no
806
        // mailbox has been created.
807
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
808
}
809

810
type mailOrchConfig struct {
811
        // forwardPackets send a varidic number of htlcPackets to the switch to
812
        // be routed. A quit channel should be provided so that the call can
813
        // properly exit during shutdown.
814
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
815

816
        // clock is a time source for the generated mailboxes.
817
        clock clock.Clock
818

819
        // expiry is the interval after which Adds will be cancelled if they
820
        // have not been yet been delivered. The computed deadline will expiry
821
        // this long after the Adds are added to a mailbox via AddPacket.
822
        expiry time.Duration
823

824
        // failMailboxUpdate is used to fail an expired HTLC and use the
825
        // correct SCID if the underlying channel uses aliases.
826
        failMailboxUpdate func(outScid,
827
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
828
}
829

830
// newMailOrchestrator initializes a fresh mailOrchestrator.
831
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
345✔
832
        return &mailOrchestrator{
345✔
833
                cfg:              cfg,
345✔
834
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
345✔
835
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
345✔
836
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
345✔
837
        }
345✔
838
}
345✔
839

840
// Stop instructs the orchestrator to stop all active mailboxes.
841
func (mo *mailOrchestrator) Stop() {
313✔
842
        for _, mailbox := range mo.mailboxes {
617✔
843
                mailbox.Stop()
304✔
844
        }
304✔
845
}
846

847
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
848
// creates and returns a new mailbox if none is found.
849
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
850
        shortChanID lnwire.ShortChannelID) MailBox {
736✔
851

736✔
852
        // First, try lookup the mailbox directly using only the shared mutex.
736✔
853
        mo.mu.RLock()
736✔
854
        mailbox, ok := mo.mailboxes[chanID]
736✔
855
        if ok {
1,147✔
856
                mo.mu.RUnlock()
411✔
857
                return mailbox
411✔
858
        }
411✔
859
        mo.mu.RUnlock()
328✔
860

328✔
861
        // Otherwise, we will try again with exclusive lock, creating a mailbox
328✔
862
        // if one still has not been created.
328✔
863
        mo.mu.Lock()
328✔
864
        mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
328✔
865
        mo.mu.Unlock()
328✔
866

328✔
867
        return mailbox
328✔
868
}
869

870
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
871
// given channel id. If none is found, a new one is creates, started, and
872
// recorded.
873
//
874
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
875
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
876
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
328✔
877

328✔
878
        mailbox, ok := mo.mailboxes[chanID]
328✔
879
        if !ok {
656✔
880
                mailbox = newMemoryMailBox(&mailBoxConfig{
328✔
881
                        shortChanID:       shortChanID,
328✔
882
                        forwardPackets:    mo.cfg.forwardPackets,
328✔
883
                        clock:             mo.cfg.clock,
328✔
884
                        expiry:            mo.cfg.expiry,
328✔
885
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
328✔
886
                })
328✔
887
                mailbox.Start()
328✔
888
                mo.mailboxes[chanID] = mailbox
328✔
889
        }
328✔
890

891
        return mailbox
328✔
892
}
893

894
// BindLiveShortChanID registers that messages bound for a particular short
895
// channel id should be forwarded to the mailbox corresponding to the given
896
// channel id. This method also checks to see if there are any unclaimed
897
// packets for this short_chan_id. If any are found, they are delivered to the
898
// mailbox and removed (marked as claimed).
899
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
900
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
339✔
901

339✔
902
        mo.mu.Lock()
339✔
903
        // Update the mapping from short channel id to mailbox's channel id.
339✔
904
        mo.liveIndex[sid] = cid
339✔
905

339✔
906
        // Retrieve any unclaimed packets destined for this mailbox.
339✔
907
        pkts := mo.unclaimedPackets[sid]
339✔
908
        delete(mo.unclaimedPackets, sid)
339✔
909
        mo.mu.Unlock()
339✔
910

339✔
911
        // Deliver the unclaimed packets.
339✔
912
        for _, pkt := range pkts {
347✔
913
                mailbox.AddPacket(pkt)
8✔
914
        }
8✔
915
}
916

917
// Deliver lookups the target mailbox using the live index from short_chan_id
918
// to channel_id. If the mailbox is found, the message is delivered directly.
919
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
920
// mailbox upon the subsequent call to BindLiveShortChanID.
921
func (mo *mailOrchestrator) Deliver(
922
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
91✔
923

91✔
924
        var (
91✔
925
                mailbox MailBox
91✔
926
                found   bool
91✔
927
        )
91✔
928

91✔
929
        // First, try to find the channel id for the target short_chan_id. If
91✔
930
        // the link is live, we will also look up the created mailbox.
91✔
931
        mo.mu.RLock()
91✔
932
        chanID, isLive := mo.liveIndex[sid]
91✔
933
        if isLive {
177✔
934
                mailbox, found = mo.mailboxes[chanID]
86✔
935
        }
86✔
936
        mo.mu.RUnlock()
91✔
937

91✔
938
        // The link is live and target mailbox was found, deliver immediately.
91✔
939
        if isLive && found {
177✔
940
                return mailbox.AddPacket(pkt)
86✔
941
        }
86✔
942

943
        // If we detected that the link has not been made live, we will acquire
944
        // the exclusive lock preemptively in order to queue this packet in the
945
        // list of unclaimed packets.
946
        mo.mu.Lock()
8✔
947

8✔
948
        // Double check to see if the mailbox has been not made live since the
8✔
949
        // release of the shared lock.
8✔
950
        //
8✔
951
        // NOTE: Checking again with the exclusive lock held prevents a race
8✔
952
        // condition where BindLiveShortChanID is interleaved between the
8✔
953
        // release of the shared lock, and acquiring the exclusive lock. The
8✔
954
        // result would be stuck packets, as they wouldn't be redelivered until
8✔
955
        // the next call to BindLiveShortChanID, which is expected to occur
8✔
956
        // infrequently.
8✔
957
        chanID, isLive = mo.liveIndex[sid]
8✔
958
        if isLive {
8✔
959
                // Reaching this point indicates the mailbox is actually live.
×
960
                // We'll try to load the mailbox using the fresh channel id.
×
961
                //
×
962
                // NOTE: This should never create a new mailbox, as the live
×
963
                // index should only be set if the mailbox had been initialized
×
964
                // beforehand.  However, this does ensure that this case is
×
965
                // handled properly in the event that it could happen.
×
966
                mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
×
967
                mo.mu.Unlock()
×
968

×
969
                // Deliver the packet to the mailbox if it was found or created.
×
970
                return mailbox.AddPacket(pkt)
×
971
        }
×
972

973
        // Finally, if the channel id is still not found in the live index,
974
        // we'll add this to the list of unclaimed packets. These will be
975
        // delivered upon the next call to BindLiveShortChanID.
976
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
8✔
977
        mo.mu.Unlock()
8✔
978

8✔
979
        return nil
8✔
980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc