• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13589597166

28 Feb 2025 01:55PM UTC coverage: 68.638% (+9.8%) from 58.841%
13589597166

Pull #9521

github

web-flow
Merge 7761e3752 into dc0ba7227
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129969 of 189353 relevant lines covered (68.64%)

23692.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.55
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
var (
18
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
19
        // a shutdown request.
20
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
21

22
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
23
        // because it already exists in the mailbox.
24
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
25
)
26

27
// MailBox is an interface which represents a concurrent-safe, in-order
28
// delivery queue for messages from the network and also from the main switch.
29
// This struct serves as a buffer between incoming messages, and messages to
30
// the handled by the link. Each of the mutating methods within this interface
31
// should be implemented in a non-blocking manner.
32
type MailBox interface {
33
        // AddMessage appends a new message to the end of the message queue.
34
        AddMessage(msg lnwire.Message) error
35

36
        // AddPacket appends a new message to the end of the packet queue.
37
        AddPacket(pkt *htlcPacket) error
38

39
        // HasPacket queries the packets for a circuit key, this is used to drop
40
        // packets bound for the switch that already have a queued response.
41
        HasPacket(CircuitKey) bool
42

43
        // AckPacket removes a packet from the mailboxes in-memory replay
44
        // buffer. This will prevent a packet from being delivered after a link
45
        // restarts if the switch has remained online. The returned boolean
46
        // indicates whether or not a packet with the passed incoming circuit
47
        // key was removed.
48
        AckPacket(CircuitKey) bool
49

50
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
51
        // removing it from the in-memory replay buffer. This will prevent the
52
        // packet from being delivered after the link restarts if the switch has
53
        // remained online. The generated LinkError will show an
54
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
55
        FailAdd(pkt *htlcPacket)
56

57
        // MessageOutBox returns a channel that any new messages ready for
58
        // delivery will be sent on.
59
        MessageOutBox() chan lnwire.Message
60

61
        // PacketOutBox returns a channel that any new packets ready for
62
        // delivery will be sent on.
63
        PacketOutBox() chan *htlcPacket
64

65
        // Clears any pending wire messages from the inbox.
66
        ResetMessages() error
67

68
        // Reset the packet head to point at the first element in the list.
69
        ResetPackets() error
70

71
        // SetDustClosure takes in a closure that is used to evaluate whether
72
        // mailbox HTLC's are dust.
73
        SetDustClosure(isDust dustClosure)
74

75
        // SetFeeRate sets the feerate to be used when evaluating dust.
76
        SetFeeRate(feerate chainfee.SatPerKWeight)
77

78
        // DustPackets returns the dust sum for Adds in the mailbox for the
79
        // local and remote commitments.
80
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
81

82
        // Start starts the mailbox and any goroutines it needs to operate
83
        // properly.
84
        Start()
85

86
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
87
        Stop()
88
}
89

90
type mailBoxConfig struct {
91
        // shortChanID is the short channel id of the channel this mailbox
92
        // belongs to.
93
        shortChanID lnwire.ShortChannelID
94

95
        // forwardPackets send a varidic number of htlcPackets to the switch to
96
        // be routed. A quit channel should be provided so that the call can
97
        // properly exit during shutdown.
98
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
99

100
        // clock is a time source for the mailbox.
101
        clock clock.Clock
102

103
        // expiry is the interval after which Adds will be cancelled if they
104
        // have not been yet been delivered. The computed deadline will expiry
105
        // this long after the Adds are added via AddPacket.
106
        expiry time.Duration
107

108
        // failMailboxUpdate is used to fail an expired HTLC and use the
109
        // correct SCID if the underlying channel uses aliases.
110
        failMailboxUpdate func(outScid,
111
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
112
}
113

114
// memoryMailBox is an implementation of the MailBox struct backed by purely
115
// in-memory queues.
116
//
117
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
118
type memoryMailBox struct {
119
        started sync.Once
120
        stopped sync.Once
121

122
        cfg *mailBoxConfig
123

124
        wireMessages *list.List
125
        wireMtx      sync.Mutex
126
        wireCond     *sync.Cond
127

128
        messageOutbox chan lnwire.Message
129
        msgReset      chan chan struct{}
130

131
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
132
        repPkts  *list.List
133
        repIndex map[CircuitKey]*list.Element
134
        repHead  *list.Element
135

136
        // addPkts is a dedicated queue for Adds.
137
        addPkts  *list.List
138
        addIndex map[CircuitKey]*list.Element
139
        addHead  *list.Element
140

141
        pktMtx  sync.Mutex
142
        pktCond *sync.Cond
143

144
        pktOutbox chan *htlcPacket
145
        pktReset  chan chan struct{}
146

147
        wireShutdown chan struct{}
148
        pktShutdown  chan struct{}
149
        quit         chan struct{}
150

151
        // feeRate is set when the link receives or sends out fee updates. It
152
        // is refreshed when AttachMailBox is called in case a fee update did
153
        // not get committed. In some cases it may be out of sync with the
154
        // channel's feerate, but it should eventually get back in sync.
155
        feeRate chainfee.SatPerKWeight
156

157
        // isDust is set when AttachMailBox is called and serves to evaluate
158
        // the outstanding dust in the memoryMailBox given the current set
159
        // feeRate.
160
        isDust dustClosure
161
}
162

163
// newMemoryMailBox creates a new instance of the memoryMailBox.
164
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
336✔
165
        box := &memoryMailBox{
336✔
166
                cfg:           cfg,
336✔
167
                wireMessages:  list.New(),
336✔
168
                repPkts:       list.New(),
336✔
169
                addPkts:       list.New(),
336✔
170
                messageOutbox: make(chan lnwire.Message),
336✔
171
                pktOutbox:     make(chan *htlcPacket),
336✔
172
                msgReset:      make(chan chan struct{}, 1),
336✔
173
                pktReset:      make(chan chan struct{}, 1),
336✔
174
                repIndex:      make(map[CircuitKey]*list.Element),
336✔
175
                addIndex:      make(map[CircuitKey]*list.Element),
336✔
176
                wireShutdown:  make(chan struct{}),
336✔
177
                pktShutdown:   make(chan struct{}),
336✔
178
                quit:          make(chan struct{}),
336✔
179
        }
336✔
180
        box.wireCond = sync.NewCond(&box.wireMtx)
336✔
181
        box.pktCond = sync.NewCond(&box.pktMtx)
336✔
182

336✔
183
        return box
336✔
184
}
336✔
185

186
// A compile time assertion to ensure that memoryMailBox meets the MailBox
187
// interface.
188
var _ MailBox = (*memoryMailBox)(nil)
189

190
// courierType is an enum that reflects the distinct types of messages a
191
// MailBox can handle. Each type will be placed in an isolated mail box and
192
// will have a dedicated goroutine for delivering the messages.
193
type courierType uint8
194

195
const (
196
        // wireCourier is a type of courier that handles wire messages.
197
        wireCourier courierType = iota
198

199
        // pktCourier is a type of courier that handles htlc packets.
200
        pktCourier
201
)
202

203
// Start starts the mailbox and any goroutines it needs to operate properly.
204
//
205
// NOTE: This method is part of the MailBox interface.
206
func (m *memoryMailBox) Start() {
337✔
207
        m.started.Do(func() {
673✔
208
                go m.wireMailCourier()
336✔
209
                go m.pktMailCourier()
336✔
210
        })
336✔
211
}
212

213
// ResetMessages blocks until all buffered wire messages are cleared.
214
func (m *memoryMailBox) ResetMessages() error {
340✔
215
        msgDone := make(chan struct{})
340✔
216
        select {
340✔
217
        case m.msgReset <- msgDone:
340✔
218
                return m.signalUntilReset(wireCourier, msgDone)
340✔
219
        case <-m.quit:
×
220
                return ErrMailBoxShuttingDown
×
221
        }
222
}
223

224
// ResetPackets blocks until the head of packets buffer is reset, causing the
225
// packets to be redelivered in order.
226
func (m *memoryMailBox) ResetPackets() error {
544✔
227
        pktDone := make(chan struct{})
544✔
228
        select {
544✔
229
        case m.pktReset <- pktDone:
543✔
230
                return m.signalUntilReset(pktCourier, pktDone)
543✔
231
        case <-m.quit:
1✔
232
                return ErrMailBoxShuttingDown
1✔
233
        }
234
}
235

236
// signalUntilReset strobes the condition variable for the specified inbox type
237
// until receiving a response that the mailbox has processed a reset.
238
func (m *memoryMailBox) signalUntilReset(cType courierType,
239
        done chan struct{}) error {
880✔
240

880✔
241
        for {
2,082✔
242

1,202✔
243
                switch cType {
1,202✔
244
                case wireCourier:
659✔
245
                        m.wireCond.Signal()
659✔
246
                case pktCourier:
546✔
247
                        m.pktCond.Signal()
546✔
248
                }
249

250
                select {
1,202✔
251
                case <-time.After(time.Millisecond):
325✔
252
                        continue
325✔
253
                case <-done:
879✔
254
                        return nil
879✔
255
                case <-m.quit:
1✔
256
                        return ErrMailBoxShuttingDown
1✔
257
                }
258
        }
259
}
260

261
// AckPacket removes the packet identified by it's incoming circuit key from the
262
// queue of packets to be delivered. The returned boolean indicates whether or
263
// not a packet with the passed incoming circuit key was removed.
264
//
265
// NOTE: It is safe to call this method multiple times for the same circuit key.
266
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
599✔
267
        m.pktCond.L.Lock()
599✔
268
        defer m.pktCond.L.Unlock()
599✔
269

599✔
270
        if entry, ok := m.repIndex[inKey]; ok {
660✔
271
                // Check whether we are removing the head of the queue. If so,
61✔
272
                // we must advance the head to the next packet before removing.
61✔
273
                // It's possible that the courier has already advanced the
61✔
274
                // repHead, so this check prevents the repHead from getting
61✔
275
                // desynchronized.
61✔
276
                if entry == m.repHead {
67✔
277
                        m.repHead = entry.Next()
6✔
278
                }
6✔
279
                m.repPkts.Remove(entry)
61✔
280
                delete(m.repIndex, inKey)
61✔
281

61✔
282
                return true
61✔
283
        }
284

285
        if entry, ok := m.addIndex[inKey]; ok {
1,073✔
286
                // Check whether we are removing the head of the queue. If so,
532✔
287
                // we must advance the head to the next add before removing.
532✔
288
                // It's possible that the courier has already advanced the
532✔
289
                // addHead, so this check prevents the addHead from getting
532✔
290
                // desynchronized.
532✔
291
                //
532✔
292
                // NOTE: While this event is rare for Settles or Fails, it could
532✔
293
                // be very common for Adds since the mailbox has the ability to
532✔
294
                // cancel Adds before they are delivered. When that occurs, the
532✔
295
                // head of addPkts has only been peeked and we expect to be
532✔
296
                // removing the head of the queue.
532✔
297
                if entry == m.addHead {
572✔
298
                        m.addHead = entry.Next()
40✔
299
                }
40✔
300

301
                m.addPkts.Remove(entry)
532✔
302
                delete(m.addIndex, inKey)
532✔
303

532✔
304
                return true
532✔
305
        }
306

307
        return false
9✔
308
}
309

310
// HasPacket queries the packets for a circuit key, this is used to drop packets
311
// bound for the switch that already have a queued response.
312
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
579✔
313
        m.pktCond.L.Lock()
579✔
314
        _, ok := m.repIndex[inKey]
579✔
315
        m.pktCond.L.Unlock()
579✔
316

579✔
317
        return ok
579✔
318
}
579✔
319

320
// Stop signals the mailbox and its goroutines for a graceful shutdown.
321
//
322
// NOTE: This method is part of the MailBox interface.
323
func (m *memoryMailBox) Stop() {
313✔
324
        m.stopped.Do(func() {
625✔
325
                close(m.quit)
312✔
326

312✔
327
                m.signalUntilShutdown(wireCourier)
312✔
328
                m.signalUntilShutdown(pktCourier)
312✔
329
        })
312✔
330
}
331

332
// signalUntilShutdown strobes the condition variable of the passed courier
333
// type, blocking until the worker has exited.
334
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
621✔
335
        var (
621✔
336
                cond     *sync.Cond
621✔
337
                shutdown chan struct{}
621✔
338
        )
621✔
339

621✔
340
        switch cType {
621✔
341
        case wireCourier:
312✔
342
                cond = m.wireCond
312✔
343
                shutdown = m.wireShutdown
312✔
344
        case pktCourier:
312✔
345
                cond = m.pktCond
312✔
346
                shutdown = m.pktShutdown
312✔
347
        }
348

349
        for {
1,850✔
350
                select {
1,229✔
351
                case <-time.After(time.Millisecond):
611✔
352
                        cond.Signal()
611✔
353
                case <-shutdown:
621✔
354
                        return
621✔
355
                }
356
        }
357
}
358

359
// pktWithExpiry wraps an incoming packet and records the time at which it it
360
// should be canceled from the mailbox. This will be used to detect if it gets
361
// stuck in the mailbox and inform when to cancel back.
362
type pktWithExpiry struct {
363
        pkt    *htlcPacket
364
        expiry time.Time
365
}
366

367
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
607✔
368
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
607✔
369
}
607✔
370

371
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
372
// wire messages.
373
func (m *memoryMailBox) wireMailCourier() {
336✔
374
        defer close(m.wireShutdown)
336✔
375

336✔
376
        for {
4,037✔
377
                // First, we'll check our condition. If our mailbox is empty,
3,701✔
378
                // then we'll wait until a new item is added.
3,701✔
379
                m.wireCond.L.Lock()
3,701✔
380
                for m.wireMessages.Front() == nil {
7,536✔
381
                        m.wireCond.Wait()
3,835✔
382

3,835✔
383
                        select {
3,835✔
384
                        case msgDone := <-m.msgReset:
339✔
385
                                m.wireMessages.Init()
339✔
386
                                close(msgDone)
339✔
387
                        case <-m.quit:
312✔
388
                                m.wireCond.L.Unlock()
312✔
389
                                return
312✔
390
                        default:
3,166✔
391
                        }
392
                }
393

394
                // Grab the datum off the front of the queue, shifting the
395
                // slice's reference down one in order to remove the datum from
396
                // the queue.
397
                entry := m.wireMessages.Front()
3,368✔
398

3,368✔
399
                //nolint:forcetypeassert
3,368✔
400
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3,368✔
401

3,368✔
402
                // Now that we're done with the condition, we can unlock it to
3,368✔
403
                // allow any callers to append to the end of our target queue.
3,368✔
404
                m.wireCond.L.Unlock()
3,368✔
405

3,368✔
406
                // With the next message obtained, we'll now select to attempt
3,368✔
407
                // to deliver the message. If we receive a kill signal, then
3,368✔
408
                // we'll bail out.
3,368✔
409
                select {
3,368✔
410
                case m.messageOutbox <- nextMsg:
3,368✔
411
                case msgDone := <-m.msgReset:
×
412
                        m.wireCond.L.Lock()
×
413
                        m.wireMessages.Init()
×
414
                        m.wireCond.L.Unlock()
×
415

×
416
                        close(msgDone)
×
417
                case <-m.quit:
×
418
                        return
×
419
                }
420
        }
421
}
422

423
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
424
// packet messages.
425
func (m *memoryMailBox) pktMailCourier() {
336✔
426
        defer close(m.pktShutdown)
336✔
427

336✔
428
        for {
1,356✔
429
                // First, we'll check our condition. If our mailbox is empty,
1,020✔
430
                // then we'll wait until a new item is added.
1,020✔
431
                m.pktCond.L.Lock()
1,020✔
432
                for m.repHead == nil && m.addHead == nil {
2,375✔
433
                        m.pktCond.Wait()
1,355✔
434

1,355✔
435
                        select {
1,355✔
436
                        // Resetting the packet queue means just moving our
437
                        // pointer to the front. This ensures that any un-ACK'd
438
                        // messages are re-delivered upon reconnect.
439
                        case pktDone := <-m.pktReset:
538✔
440
                                m.repHead = m.repPkts.Front()
538✔
441
                                m.addHead = m.addPkts.Front()
538✔
442

538✔
443
                                close(pktDone)
538✔
444

445
                        case <-m.quit:
306✔
446
                                m.pktCond.L.Unlock()
306✔
447
                                return
306✔
448
                        default:
493✔
449
                        }
450
                }
451

452
                var (
693✔
453
                        nextRep   *htlcPacket
693✔
454
                        nextRepEl *list.Element
693✔
455
                        nextAdd   *pktWithExpiry
693✔
456
                        nextAddEl *list.Element
693✔
457
                )
693✔
458
                // For packets, we actually never remove an item until it has
693✔
459
                // been ACK'd by the link. This ensures that if a read packet
693✔
460
                // doesn't make it into a commitment, then it'll be
693✔
461
                // re-delivered once the link comes back online.
693✔
462

693✔
463
                // Peek at the head of the Settle/Fails and Add queues. We peak
693✔
464
                // both even if there is a Settle/Fail present because we need
693✔
465
                // to set a deadline for the next pending Add if it's present.
693✔
466
                // Due to clock monotonicity, we know that the head of the Adds
693✔
467
                // is the next to expire.
693✔
468
                if m.repHead != nil {
786✔
469
                        //nolint:forcetypeassert
93✔
470
                        nextRep = m.repHead.Value.(*htlcPacket)
93✔
471
                        nextRepEl = m.repHead
93✔
472
                }
93✔
473
                if m.addHead != nil {
1,300✔
474
                        //nolint:forcetypeassert
607✔
475
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
607✔
476
                        nextAddEl = m.addHead
607✔
477
                }
607✔
478

479
                // Now that we're done with the condition, we can unlock it to
480
                // allow any callers to append to the end of our target queue.
481
                m.pktCond.L.Unlock()
693✔
482

693✔
483
                var (
693✔
484
                        pktOutbox chan *htlcPacket
693✔
485
                        addOutbox chan *htlcPacket
693✔
486
                        add       *htlcPacket
693✔
487
                        deadline  <-chan time.Time
693✔
488
                )
693✔
489

693✔
490
                // Prioritize delivery of Settle/Fail packets over Adds. This
693✔
491
                // ensures that we actively clear the commitment of existing
693✔
492
                // HTLCs before trying to add new ones. This can help to improve
693✔
493
                // forwarding performance since the time to sign a commitment is
693✔
494
                // linear in the number of HTLCs manifested on the commitments.
693✔
495
                //
693✔
496
                // NOTE: Both types are eventually delivered over the same
693✔
497
                // channel, but we can control which is delivered by exclusively
693✔
498
                // making one nil and the other non-nil. We know from our loop
693✔
499
                // condition that at least one nextRep and nextAdd are non-nil.
693✔
500
                if nextRep != nil {
786✔
501
                        pktOutbox = m.pktOutbox
93✔
502
                } else {
696✔
503
                        addOutbox = m.pktOutbox
603✔
504
                }
603✔
505

506
                // If we have a pending Add, we'll also construct the deadline
507
                // so we can fail it back if we are unable to deliver any
508
                // message in time. We also dereference the nextAdd's packet,
509
                // since we will need access to it in the case we are delivering
510
                // it and/or if the deadline expires.
511
                //
512
                // NOTE: It's possible after this point for add to be nil, but
513
                // this can only occur when addOutbox is also nil, hence we
514
                // won't accidentally deliver a nil packet.
515
                if nextAdd != nil {
1,300✔
516
                        add = nextAdd.pkt
607✔
517
                        deadline = nextAdd.deadline(m.cfg.clock)
607✔
518
                }
607✔
519

520
                select {
693✔
521
                case pktOutbox <- nextRep:
91✔
522
                        m.pktCond.L.Lock()
91✔
523
                        // Only advance the repHead if this Settle or Fail is
91✔
524
                        // still at the head of the queue.
91✔
525
                        if m.repHead != nil && m.repHead == nextRepEl {
176✔
526
                                m.repHead = m.repHead.Next()
85✔
527
                        }
85✔
528
                        m.pktCond.L.Unlock()
91✔
529

530
                case addOutbox <- add:
558✔
531
                        m.pktCond.L.Lock()
558✔
532
                        // Only advance the addHead if this Add is still at the
558✔
533
                        // head of the queue.
558✔
534
                        if m.addHead != nil && m.addHead == nextAddEl {
1,113✔
535
                                m.addHead = m.addHead.Next()
555✔
536
                        }
555✔
537
                        m.pktCond.L.Unlock()
558✔
538

539
                case <-deadline:
36✔
540
                        log.Debugf("Expiring add htlc with "+
36✔
541
                                "keystone=%v", add.keystone())
36✔
542
                        m.FailAdd(add)
36✔
543

544
                case pktDone := <-m.pktReset:
8✔
545
                        m.pktCond.L.Lock()
8✔
546
                        m.repHead = m.repPkts.Front()
8✔
547
                        m.addHead = m.addPkts.Front()
8✔
548
                        m.pktCond.L.Unlock()
8✔
549

8✔
550
                        close(pktDone)
8✔
551

552
                case <-m.quit:
9✔
553
                        return
9✔
554
                }
555
        }
556
}
557

558
// AddMessage appends a new message to the end of the message queue.
559
//
560
// NOTE: This method is safe for concrete use and part of the MailBox
561
// interface.
562
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3,368✔
563
        // First, we'll lock the condition, and add the message to the end of
3,368✔
564
        // the wire message inbox.
3,368✔
565
        m.wireCond.L.Lock()
3,368✔
566
        m.wireMessages.PushBack(msg)
3,368✔
567
        m.wireCond.L.Unlock()
3,368✔
568

3,368✔
569
        // With the message added, we signal to the mailCourier that there are
3,368✔
570
        // additional messages to deliver.
3,368✔
571
        m.wireCond.Signal()
3,368✔
572

3,368✔
573
        return nil
3,368✔
574
}
3,368✔
575

576
// AddPacket appends a new message to the end of the packet queue.
577
//
578
// NOTE: This method is safe for concrete use and part of the MailBox
579
// interface.
580
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
2,099✔
581
        m.pktCond.L.Lock()
2,099✔
582
        switch htlc := pkt.htlc.(type) {
2,099✔
583
        // Split off Settle/Fail packets into the repPkts queue.
584
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
95✔
585
                if _, ok := m.repIndex[pkt.inKey()]; ok {
97✔
586
                        m.pktCond.L.Unlock()
2✔
587
                        return ErrPacketAlreadyExists
2✔
588
                }
2✔
589

590
                entry := m.repPkts.PushBack(pkt)
93✔
591
                m.repIndex[pkt.inKey()] = entry
93✔
592
                if m.repHead == nil {
183✔
593
                        m.repHead = entry
90✔
594
                }
90✔
595

596
        // Split off Add packets into the addPkts queue.
597
        case *lnwire.UpdateAddHTLC:
2,007✔
598
                if _, ok := m.addIndex[pkt.inKey()]; ok {
2,008✔
599
                        m.pktCond.L.Unlock()
1✔
600
                        return ErrPacketAlreadyExists
1✔
601
                }
1✔
602

603
                entry := m.addPkts.PushBack(&pktWithExpiry{
2,006✔
604
                        pkt:    pkt,
2,006✔
605
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
2,006✔
606
                })
2,006✔
607
                m.addIndex[pkt.inKey()] = entry
2,006✔
608
                if m.addHead == nil {
2,427✔
609
                        m.addHead = entry
421✔
610
                }
421✔
611

612
        default:
×
613
                m.pktCond.L.Unlock()
×
614
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
615
        }
616
        m.pktCond.L.Unlock()
2,096✔
617

2,096✔
618
        // With the packet added, we signal to the mailCourier that there are
2,096✔
619
        // additional packets to consume.
2,096✔
620
        m.pktCond.Signal()
2,096✔
621

2,096✔
622
        return nil
2,096✔
623
}
624

625
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
626
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
224✔
627
        m.pktCond.L.Lock()
224✔
628
        defer m.pktCond.L.Unlock()
224✔
629

224✔
630
        m.feeRate = feeRate
224✔
631
}
224✔
632

633
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
634
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
340✔
635
        m.pktCond.L.Lock()
340✔
636
        defer m.pktCond.L.Unlock()
340✔
637

340✔
638
        m.isDust = isDust
340✔
639
}
340✔
640

641
// DustPackets returns the dust sum for add packets in the mailbox. The first
642
// return value is the local dust sum and the second is the remote dust sum.
643
// This will keep track of a given dust HTLC from the time it is added via
644
// AddPacket until it is removed via AckPacket.
645
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
646
        lnwire.MilliSatoshi) {
416✔
647

416✔
648
        m.pktCond.L.Lock()
416✔
649
        defer m.pktCond.L.Unlock()
416✔
650

416✔
651
        var (
416✔
652
                localDustSum  lnwire.MilliSatoshi
416✔
653
                remoteDustSum lnwire.MilliSatoshi
416✔
654
        )
416✔
655

416✔
656
        // Run through the map of HTLC's and determine the dust sum with calls
416✔
657
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
416✔
658
        // are outgoing so the second argument to isDust will be false.
416✔
659
        for _, e := range m.addIndex {
4,001✔
660
                addPkt := e.Value.(*pktWithExpiry).pkt
3,585✔
661

3,585✔
662
                // Evaluate whether this HTLC is dust on the local commitment.
3,585✔
663
                if m.isDust(
3,585✔
664
                        m.feeRate, false, lntypes.Local,
3,585✔
665
                        addPkt.amount.ToSatoshis(),
3,585✔
666
                ) {
7,164✔
667

3,579✔
668
                        localDustSum += addPkt.amount
3,579✔
669
                }
3,579✔
670

671
                // Evaluate whether this HTLC is dust on the remote commitment.
672
                if m.isDust(
3,585✔
673
                        m.feeRate, false, lntypes.Remote,
3,585✔
674
                        addPkt.amount.ToSatoshis(),
3,585✔
675
                ) {
7,170✔
676

3,585✔
677
                        remoteDustSum += addPkt.amount
3,585✔
678
                }
3,585✔
679
        }
680

681
        return localDustSum, remoteDustSum
416✔
682
}
683

684
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
685
// from the in-memory replay buffer. This will prevent the packet from being
686
// delivered after the link restarts if the switch has remained online. The
687
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
688
// FailureDetail.
689
func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
54✔
690
        // First, remove the packet from mailbox. If we didn't find the packet
54✔
691
        // because it has already been acked, we'll exit early to avoid sending
54✔
692
        // a duplicate fail message through the switch.
54✔
693
        if !m.AckPacket(pkt.inKey()) {
59✔
694
                return
5✔
695
        }
5✔
696

697
        var (
49✔
698
                localFailure = false
49✔
699
                reason       lnwire.OpaqueReason
49✔
700
        )
49✔
701

49✔
702
        // Create a temporary channel failure which we will send back to our
49✔
703
        // peer if this is a forward, or report to the user if the failed
49✔
704
        // payment was locally initiated.
49✔
705
        failure := m.cfg.failMailboxUpdate(
49✔
706
                pkt.originalOutgoingChanID, m.cfg.shortChanID,
49✔
707
        )
49✔
708

49✔
709
        // If the payment was locally initiated (which is indicated by a nil
49✔
710
        // obfuscator), we do not need to encrypt it back to the sender.
49✔
711
        if pkt.obfuscator == nil {
91✔
712
                var b bytes.Buffer
42✔
713
                err := lnwire.EncodeFailure(&b, failure, 0)
42✔
714
                if err != nil {
42✔
715
                        log.Errorf("Unable to encode failure: %v", err)
×
716
                        return
×
717
                }
×
718
                reason = lnwire.OpaqueReason(b.Bytes())
42✔
719
                localFailure = true
42✔
720
        } else {
7✔
721
                // If the packet is part of a forward, (identified by a non-nil
7✔
722
                // obfuscator) we need to encrypt the error back to the source.
7✔
723
                var err error
7✔
724
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
7✔
725
                if err != nil {
7✔
726
                        log.Errorf("Unable to obfuscate error: %v", err)
×
727
                        return
×
728
                }
×
729
        }
730

731
        // Create a link error containing the temporary channel failure and a
732
        // detail which indicates the we failed to add the htlc.
733
        linkError := NewDetailedLinkError(
49✔
734
                failure, OutgoingFailureDownstreamHtlcAdd,
49✔
735
        )
49✔
736

49✔
737
        failPkt := &htlcPacket{
49✔
738
                incomingChanID: pkt.incomingChanID,
49✔
739
                incomingHTLCID: pkt.incomingHTLCID,
49✔
740
                circuit:        pkt.circuit,
49✔
741
                sourceRef:      pkt.sourceRef,
49✔
742
                hasSource:      true,
49✔
743
                localFailure:   localFailure,
49✔
744
                obfuscator:     pkt.obfuscator,
49✔
745
                linkFailure:    linkError,
49✔
746
                htlc: &lnwire.UpdateFailHTLC{
49✔
747
                        Reason: reason,
49✔
748
                },
49✔
749
        }
49✔
750

49✔
751
        if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
49✔
752
                log.Errorf("Unhandled error while reforwarding packets "+
×
753
                        "settle/fail over htlcswitch: %v", err)
×
754
        }
×
755
}
756

757
// MessageOutBox returns a channel that any new messages ready for delivery
758
// will be sent on.
759
//
760
// NOTE: This method is part of the MailBox interface.
761
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
236✔
762
        return m.messageOutbox
236✔
763
}
236✔
764

765
// PacketOutBox returns a channel that any new packets ready for delivery will
766
// be sent on.
767
//
768
// NOTE: This method is part of the MailBox interface.
769
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
387✔
770
        return m.pktOutbox
387✔
771
}
387✔
772

773
// mailOrchestrator is responsible for coordinating the creation and lifecycle
774
// of mailboxes used within the switch. It supports the ability to create
775
// mailboxes, reassign their short channel id's, deliver htlc packets, and
776
// queue packets for mailboxes that have not been created due to a link's late
777
// registration.
778
type mailOrchestrator struct {
779
        mu sync.RWMutex
780

781
        cfg *mailOrchConfig
782

783
        // mailboxes caches exactly one mailbox for all known channels.
784
        mailboxes map[lnwire.ChannelID]MailBox
785

786
        // liveIndex maps a live short chan id to the primary mailbox key.
787
        // An index in liveIndex map is only entered under two conditions:
788
        //   1. A link has a non-zero short channel id at time of AddLink.
789
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
790
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
791

792
        // TODO(conner): add another pair of indexes:
793
        //   chan_id -> short_chan_id
794
        //   short_chan_id -> mailbox
795
        // so that Deliver can lookup mailbox directly once live,
796
        // but still queryable by channel_id.
797

798
        // unclaimedPackets maps a live short chan id to queue of packets if no
799
        // mailbox has been created.
800
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
801
}
802

803
type mailOrchConfig struct {
804
        // forwardPackets send a varidic number of htlcPackets to the switch to
805
        // be routed. A quit channel should be provided so that the call can
806
        // properly exit during shutdown.
807
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
808

809
        // clock is a time source for the generated mailboxes.
810
        clock clock.Clock
811

812
        // expiry is the interval after which Adds will be cancelled if they
813
        // have not been yet been delivered. The computed deadline will expiry
814
        // this long after the Adds are added to a mailbox via AddPacket.
815
        expiry time.Duration
816

817
        // failMailboxUpdate is used to fail an expired HTLC and use the
818
        // correct SCID if the underlying channel uses aliases.
819
        failMailboxUpdate func(outScid,
820
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
821
}
822

823
// newMailOrchestrator initializes a fresh mailOrchestrator.
824
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
345✔
825
        return &mailOrchestrator{
345✔
826
                cfg:              cfg,
345✔
827
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
345✔
828
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
345✔
829
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
345✔
830
        }
345✔
831
}
345✔
832

833
// Stop instructs the orchestrator to stop all active mailboxes.
834
func (mo *mailOrchestrator) Stop() {
313✔
835
        for _, mailbox := range mo.mailboxes {
617✔
836
                mailbox.Stop()
304✔
837
        }
304✔
838
}
839

840
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
841
// creates and returns a new mailbox if none is found.
842
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
843
        shortChanID lnwire.ShortChannelID) MailBox {
743✔
844

743✔
845
        // First, try lookup the mailbox directly using only the shared mutex.
743✔
846
        mo.mu.RLock()
743✔
847
        mailbox, ok := mo.mailboxes[chanID]
743✔
848
        if ok {
1,161✔
849
                mo.mu.RUnlock()
418✔
850
                return mailbox
418✔
851
        }
418✔
852
        mo.mu.RUnlock()
328✔
853

328✔
854
        // Otherwise, we will try again with exclusive lock, creating a mailbox
328✔
855
        // if one still has not been created.
328✔
856
        mo.mu.Lock()
328✔
857
        mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
328✔
858
        mo.mu.Unlock()
328✔
859

328✔
860
        return mailbox
328✔
861
}
862

863
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
864
// given channel id. If none is found, a new one is creates, started, and
865
// recorded.
866
//
867
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
868
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
869
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
328✔
870

328✔
871
        mailbox, ok := mo.mailboxes[chanID]
328✔
872
        if !ok {
656✔
873
                mailbox = newMemoryMailBox(&mailBoxConfig{
328✔
874
                        shortChanID:       shortChanID,
328✔
875
                        forwardPackets:    mo.cfg.forwardPackets,
328✔
876
                        clock:             mo.cfg.clock,
328✔
877
                        expiry:            mo.cfg.expiry,
328✔
878
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
328✔
879
                })
328✔
880
                mailbox.Start()
328✔
881
                mo.mailboxes[chanID] = mailbox
328✔
882
        }
328✔
883

884
        return mailbox
328✔
885
}
886

887
// BindLiveShortChanID registers that messages bound for a particular short
888
// channel id should be forwarded to the mailbox corresponding to the given
889
// channel id. This method also checks to see if there are any unclaimed
890
// packets for this short_chan_id. If any are found, they are delivered to the
891
// mailbox and removed (marked as claimed).
892
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
893
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
339✔
894

339✔
895
        mo.mu.Lock()
339✔
896
        // Update the mapping from short channel id to mailbox's channel id.
339✔
897
        mo.liveIndex[sid] = cid
339✔
898

339✔
899
        // Retrieve any unclaimed packets destined for this mailbox.
339✔
900
        pkts := mo.unclaimedPackets[sid]
339✔
901
        delete(mo.unclaimedPackets, sid)
339✔
902
        mo.mu.Unlock()
339✔
903

339✔
904
        // Deliver the unclaimed packets.
339✔
905
        for _, pkt := range pkts {
347✔
906
                mailbox.AddPacket(pkt)
8✔
907
        }
8✔
908
}
909

910
// Deliver lookups the target mailbox using the live index from short_chan_id
911
// to channel_id. If the mailbox is found, the message is delivered directly.
912
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
913
// mailbox upon the subsequent call to BindLiveShortChanID.
914
func (mo *mailOrchestrator) Deliver(
915
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
92✔
916

92✔
917
        var (
92✔
918
                mailbox MailBox
92✔
919
                found   bool
92✔
920
        )
92✔
921

92✔
922
        // First, try to find the channel id for the target short_chan_id. If
92✔
923
        // the link is live, we will also look up the created mailbox.
92✔
924
        mo.mu.RLock()
92✔
925
        chanID, isLive := mo.liveIndex[sid]
92✔
926
        if isLive {
179✔
927
                mailbox, found = mo.mailboxes[chanID]
87✔
928
        }
87✔
929
        mo.mu.RUnlock()
92✔
930

92✔
931
        // The link is live and target mailbox was found, deliver immediately.
92✔
932
        if isLive && found {
179✔
933
                return mailbox.AddPacket(pkt)
87✔
934
        }
87✔
935

936
        // If we detected that the link has not been made live, we will acquire
937
        // the exclusive lock preemptively in order to queue this packet in the
938
        // list of unclaimed packets.
939
        mo.mu.Lock()
8✔
940

8✔
941
        // Double check to see if the mailbox has been not made live since the
8✔
942
        // release of the shared lock.
8✔
943
        //
8✔
944
        // NOTE: Checking again with the exclusive lock held prevents a race
8✔
945
        // condition where BindLiveShortChanID is interleaved between the
8✔
946
        // release of the shared lock, and acquiring the exclusive lock. The
8✔
947
        // result would be stuck packets, as they wouldn't be redelivered until
8✔
948
        // the next call to BindLiveShortChanID, which is expected to occur
8✔
949
        // infrequently.
8✔
950
        chanID, isLive = mo.liveIndex[sid]
8✔
951
        if isLive {
8✔
952
                // Reaching this point indicates the mailbox is actually live.
×
953
                // We'll try to load the mailbox using the fresh channel id.
×
954
                //
×
955
                // NOTE: This should never create a new mailbox, as the live
×
956
                // index should only be set if the mailbox had been initialized
×
957
                // beforehand.  However, this does ensure that this case is
×
958
                // handled properly in the event that it could happen.
×
959
                mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
×
960
                mo.mu.Unlock()
×
961

×
962
                // Deliver the packet to the mailbox if it was found or created.
×
963
                return mailbox.AddPacket(pkt)
×
964
        }
×
965

966
        // Finally, if the channel id is still not found in the live index,
967
        // we'll add this to the list of unclaimed packets. These will be
968
        // delivered upon the next call to BindLiveShortChanID.
969
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
8✔
970
        mo.mu.Unlock()
8✔
971

8✔
972
        return nil
8✔
973
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc