• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13436790699

20 Feb 2025 01:40PM UTC coverage: 58.78% (-0.01%) from 58.794%
13436790699

Pull #9534

github

ellemouton
graph: refactor Builder network message handling

The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are
currently synchronous since even though they pass messages to the
network handler which spins off the handling in a goroutine, the public
methods still wait for a response from the handling before returning.
The only part that is actually done asynchronously is the topology
notifications.

We previously tried to simplify things in [this
commit](https://github.com/lightningnetwork/lnd/pull/9476/commits/d757b3bcf)
but we soon realised that there was a reason for sending the messages to
the central/synchronous network handler first: it was to ensure
consistency for topology clients: ie, the ordering between when there is
a new topology client or if it is cancelled needs to be consistent and
handled synchronously with new network updates. So for example, if a new
update comes in right after a topology client cancels its subscription,
then it should _not_ be notified. Similariy for new subscriptions. So
this commit was reverted soon after.

We can, however, still simplify things as is done in this commit by
noting that _only topology subscriptions and notifications_ need to be
handled separately. The actual network updates do not need to. So that
is what is done here.

This refactor will make moving the topology subscription logic to a new
subsystem later on much easier.
Pull Request #9534: graph: refactor Builder network message handling

38 of 44 new or added lines in 1 file covered. (86.36%)

55 existing lines in 11 files now uncovered.

136048 of 231453 relevant lines covered (58.78%)

19264.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.19
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
var (
18
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
19
        // a shutdown request.
20
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
21

22
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
23
        // because it already exists in the mailbox.
24
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
25
)
26

27
// MailBox is an interface which represents a concurrent-safe, in-order
28
// delivery queue for messages from the network and also from the main switch.
29
// This struct serves as a buffer between incoming messages, and messages to
30
// the handled by the link. Each of the mutating methods within this interface
31
// should be implemented in a non-blocking manner.
32
type MailBox interface {
33
        // AddMessage appends a new message to the end of the message queue.
34
        AddMessage(msg lnwire.Message) error
35

36
        // AddPacket appends a new message to the end of the packet queue.
37
        AddPacket(pkt *htlcPacket) error
38

39
        // HasPacket queries the packets for a circuit key, this is used to drop
40
        // packets bound for the switch that already have a queued response.
41
        HasPacket(CircuitKey) bool
42

43
        // AckPacket removes a packet from the mailboxes in-memory replay
44
        // buffer. This will prevent a packet from being delivered after a link
45
        // restarts if the switch has remained online. The returned boolean
46
        // indicates whether or not a packet with the passed incoming circuit
47
        // key was removed.
48
        AckPacket(CircuitKey) bool
49

50
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
51
        // removing it from the in-memory replay buffer. This will prevent the
52
        // packet from being delivered after the link restarts if the switch has
53
        // remained online. The generated LinkError will show an
54
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
55
        FailAdd(pkt *htlcPacket)
56

57
        // MessageOutBox returns a channel that any new messages ready for
58
        // delivery will be sent on.
59
        MessageOutBox() chan lnwire.Message
60

61
        // PacketOutBox returns a channel that any new packets ready for
62
        // delivery will be sent on.
63
        PacketOutBox() chan *htlcPacket
64

65
        // Clears any pending wire messages from the inbox.
66
        ResetMessages() error
67

68
        // Reset the packet head to point at the first element in the list.
69
        ResetPackets() error
70

71
        // SetDustClosure takes in a closure that is used to evaluate whether
72
        // mailbox HTLC's are dust.
73
        SetDustClosure(isDust dustClosure)
74

75
        // SetFeeRate sets the feerate to be used when evaluating dust.
76
        SetFeeRate(feerate chainfee.SatPerKWeight)
77

78
        // DustPackets returns the dust sum for Adds in the mailbox for the
79
        // local and remote commitments.
80
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
81

82
        // Start starts the mailbox and any goroutines it needs to operate
83
        // properly.
84
        Start()
85

86
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
87
        Stop()
88
}
89

90
type mailBoxConfig struct {
91
        // shortChanID is the short channel id of the channel this mailbox
92
        // belongs to.
93
        shortChanID lnwire.ShortChannelID
94

95
        // forwardPackets send a varidic number of htlcPackets to the switch to
96
        // be routed. A quit channel should be provided so that the call can
97
        // properly exit during shutdown.
98
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
99

100
        // clock is a time source for the mailbox.
101
        clock clock.Clock
102

103
        // expiry is the interval after which Adds will be cancelled if they
104
        // have not been yet been delivered. The computed deadline will expiry
105
        // this long after the Adds are added via AddPacket.
106
        expiry time.Duration
107

108
        // failMailboxUpdate is used to fail an expired HTLC and use the
109
        // correct SCID if the underlying channel uses aliases.
110
        failMailboxUpdate func(outScid,
111
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
112
}
113

114
// memoryMailBox is an implementation of the MailBox struct backed by purely
115
// in-memory queues.
116
//
117
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
118
type memoryMailBox struct {
119
        started sync.Once
120
        stopped sync.Once
121

122
        cfg *mailBoxConfig
123

124
        wireMessages *list.List
125
        wireMtx      sync.Mutex
126
        wireCond     *sync.Cond
127

128
        messageOutbox chan lnwire.Message
129
        msgReset      chan chan struct{}
130

131
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
132
        repPkts  *list.List
133
        repIndex map[CircuitKey]*list.Element
134
        repHead  *list.Element
135

136
        // addPkts is a dedicated queue for Adds.
137
        addPkts  *list.List
138
        addIndex map[CircuitKey]*list.Element
139
        addHead  *list.Element
140

141
        pktMtx  sync.Mutex
142
        pktCond *sync.Cond
143

144
        pktOutbox chan *htlcPacket
145
        pktReset  chan chan struct{}
146

147
        wireShutdown chan struct{}
148
        pktShutdown  chan struct{}
149
        quit         chan struct{}
150

151
        // feeRate is set when the link receives or sends out fee updates. It
152
        // is refreshed when AttachMailBox is called in case a fee update did
153
        // not get committed. In some cases it may be out of sync with the
154
        // channel's feerate, but it should eventually get back in sync.
155
        feeRate chainfee.SatPerKWeight
156

157
        // isDust is set when AttachMailBox is called and serves to evaluate
158
        // the outstanding dust in the memoryMailBox given the current set
159
        // feeRate.
160
        isDust dustClosure
161
}
162

163
// newMemoryMailBox creates a new instance of the memoryMailBox.
164
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
336✔
165
        box := &memoryMailBox{
336✔
166
                cfg:           cfg,
336✔
167
                wireMessages:  list.New(),
336✔
168
                repPkts:       list.New(),
336✔
169
                addPkts:       list.New(),
336✔
170
                messageOutbox: make(chan lnwire.Message),
336✔
171
                pktOutbox:     make(chan *htlcPacket),
336✔
172
                msgReset:      make(chan chan struct{}, 1),
336✔
173
                pktReset:      make(chan chan struct{}, 1),
336✔
174
                repIndex:      make(map[CircuitKey]*list.Element),
336✔
175
                addIndex:      make(map[CircuitKey]*list.Element),
336✔
176
                wireShutdown:  make(chan struct{}),
336✔
177
                pktShutdown:   make(chan struct{}),
336✔
178
                quit:          make(chan struct{}),
336✔
179
        }
336✔
180
        box.wireCond = sync.NewCond(&box.wireMtx)
336✔
181
        box.pktCond = sync.NewCond(&box.pktMtx)
336✔
182

336✔
183
        return box
336✔
184
}
336✔
185

186
// A compile time assertion to ensure that memoryMailBox meets the MailBox
187
// interface.
188
var _ MailBox = (*memoryMailBox)(nil)
189

190
// courierType is an enum that reflects the distinct types of messages a
191
// MailBox can handle. Each type will be placed in an isolated mail box and
192
// will have a dedicated goroutine for delivering the messages.
193
type courierType uint8
194

195
const (
196
        // wireCourier is a type of courier that handles wire messages.
197
        wireCourier courierType = iota
198

199
        // pktCourier is a type of courier that handles htlc packets.
200
        pktCourier
201
)
202

203
// Start starts the mailbox and any goroutines it needs to operate properly.
204
//
205
// NOTE: This method is part of the MailBox interface.
206
func (m *memoryMailBox) Start() {
337✔
207
        m.started.Do(func() {
673✔
208
                go m.wireMailCourier()
336✔
209
                go m.pktMailCourier()
336✔
210
        })
336✔
211
}
212

213
// ResetMessages blocks until all buffered wire messages are cleared.
214
func (m *memoryMailBox) ResetMessages() error {
340✔
215
        msgDone := make(chan struct{})
340✔
216
        select {
340✔
217
        case m.msgReset <- msgDone:
340✔
218
                return m.signalUntilReset(wireCourier, msgDone)
340✔
UNCOV
219
        case <-m.quit:
×
UNCOV
220
                return ErrMailBoxShuttingDown
×
221
        }
222
}
223

224
// ResetPackets blocks until the head of packets buffer is reset, causing the
225
// packets to be redelivered in order.
226
func (m *memoryMailBox) ResetPackets() error {
544✔
227
        pktDone := make(chan struct{})
544✔
228
        select {
544✔
229
        case m.pktReset <- pktDone:
544✔
230
                return m.signalUntilReset(pktCourier, pktDone)
544✔
UNCOV
231
        case <-m.quit:
×
UNCOV
232
                return ErrMailBoxShuttingDown
×
233
        }
234
}
235

236
// signalUntilReset strobes the condition variable for the specified inbox type
237
// until receiving a response that the mailbox has processed a reset.
238
func (m *memoryMailBox) signalUntilReset(cType courierType,
239
        done chan struct{}) error {
881✔
240

881✔
241
        for {
2,078✔
242

1,197✔
243
                switch cType {
1,197✔
244
                case wireCourier:
656✔
245
                        m.wireCond.Signal()
656✔
246
                case pktCourier:
544✔
247
                        m.pktCond.Signal()
544✔
248
                }
249

250
                select {
1,197✔
251
                case <-time.After(time.Millisecond):
319✔
252
                        continue
319✔
253
                case <-done:
879✔
254
                        return nil
879✔
255
                case <-m.quit:
2✔
256
                        return ErrMailBoxShuttingDown
2✔
257
                }
258
        }
259
}
260

261
// AckPacket removes the packet identified by it's incoming circuit key from the
262
// queue of packets to be delivered. The returned boolean indicates whether or
263
// not a packet with the passed incoming circuit key was removed.
264
//
265
// NOTE: It is safe to call this method multiple times for the same circuit key.
266
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
599✔
267
        m.pktCond.L.Lock()
599✔
268
        defer m.pktCond.L.Unlock()
599✔
269

599✔
270
        if entry, ok := m.repIndex[inKey]; ok {
660✔
271
                // Check whether we are removing the head of the queue. If so,
61✔
272
                // we must advance the head to the next packet before removing.
61✔
273
                // It's possible that the courier has already advanced the
61✔
274
                // repHead, so this check prevents the repHead from getting
61✔
275
                // desynchronized.
61✔
276
                if entry == m.repHead {
67✔
277
                        m.repHead = entry.Next()
6✔
278
                }
6✔
279
                m.repPkts.Remove(entry)
61✔
280
                delete(m.repIndex, inKey)
61✔
281

61✔
282
                return true
61✔
283
        }
284

285
        if entry, ok := m.addIndex[inKey]; ok {
1,073✔
286
                // Check whether we are removing the head of the queue. If so,
532✔
287
                // we must advance the head to the next add before removing.
532✔
288
                // It's possible that the courier has already advanced the
532✔
289
                // addHead, so this check prevents the addHead from getting
532✔
290
                // desynchronized.
532✔
291
                //
532✔
292
                // NOTE: While this event is rare for Settles or Fails, it could
532✔
293
                // be very common for Adds since the mailbox has the ability to
532✔
294
                // cancel Adds before they are delivered. When that occurs, the
532✔
295
                // head of addPkts has only been peeked and we expect to be
532✔
296
                // removing the head of the queue.
532✔
297
                if entry == m.addHead {
571✔
298
                        m.addHead = entry.Next()
39✔
299
                }
39✔
300

301
                m.addPkts.Remove(entry)
532✔
302
                delete(m.addIndex, inKey)
532✔
303

532✔
304
                return true
532✔
305
        }
306

307
        return false
9✔
308
}
309

310
// HasPacket queries the packets for a circuit key, this is used to drop packets
311
// bound for the switch that already have a queued response.
312
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
578✔
313
        m.pktCond.L.Lock()
578✔
314
        _, ok := m.repIndex[inKey]
578✔
315
        m.pktCond.L.Unlock()
578✔
316

578✔
317
        return ok
578✔
318
}
578✔
319

320
// Stop signals the mailbox and its goroutines for a graceful shutdown.
321
//
322
// NOTE: This method is part of the MailBox interface.
323
func (m *memoryMailBox) Stop() {
313✔
324
        m.stopped.Do(func() {
625✔
325
                close(m.quit)
312✔
326

312✔
327
                m.signalUntilShutdown(wireCourier)
312✔
328
                m.signalUntilShutdown(pktCourier)
312✔
329
        })
312✔
330
}
331

332
// signalUntilShutdown strobes the condition variable of the passed courier
333
// type, blocking until the worker has exited.
334
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
621✔
335
        var (
621✔
336
                cond     *sync.Cond
621✔
337
                shutdown chan struct{}
621✔
338
        )
621✔
339

621✔
340
        switch cType {
621✔
341
        case wireCourier:
312✔
342
                cond = m.wireCond
312✔
343
                shutdown = m.wireShutdown
312✔
344
        case pktCourier:
312✔
345
                cond = m.pktCond
312✔
346
                shutdown = m.pktShutdown
312✔
347
        }
348

349
        for {
1,850✔
350
                select {
1,229✔
351
                case <-time.After(time.Millisecond):
611✔
352
                        cond.Signal()
611✔
353
                case <-shutdown:
621✔
354
                        return
621✔
355
                }
356
        }
357
}
358

359
// pktWithExpiry wraps an incoming packet and records the time at which it it
360
// should be canceled from the mailbox. This will be used to detect if it gets
361
// stuck in the mailbox and inform when to cancel back.
362
type pktWithExpiry struct {
363
        pkt    *htlcPacket
364
        expiry time.Time
365
}
366

367
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
606✔
368
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
606✔
369
}
606✔
370

371
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
372
// wire messages.
373
func (m *memoryMailBox) wireMailCourier() {
336✔
374
        defer close(m.wireShutdown)
336✔
375

336✔
376
        for {
3,989✔
377
                // First, we'll check our condition. If our mailbox is empty,
3,653✔
378
                // then we'll wait until a new item is added.
3,653✔
379
                m.wireCond.L.Lock()
3,653✔
380
                for m.wireMessages.Front() == nil {
7,475✔
381
                        m.wireCond.Wait()
3,822✔
382

3,822✔
383
                        select {
3,822✔
384
                        case msgDone := <-m.msgReset:
339✔
385
                                m.wireMessages.Init()
339✔
386
                                close(msgDone)
339✔
387
                        case <-m.quit:
312✔
388
                                m.wireCond.L.Unlock()
312✔
389
                                return
312✔
390
                        default:
3,153✔
391
                        }
392
                }
393

394
                // Grab the datum off the front of the queue, shifting the
395
                // slice's reference down one in order to remove the datum from
396
                // the queue.
397
                entry := m.wireMessages.Front()
3,320✔
398

3,320✔
399
                //nolint:forcetypeassert
3,320✔
400
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
3,320✔
401

3,320✔
402
                // Now that we're done with the condition, we can unlock it to
3,320✔
403
                // allow any callers to append to the end of our target queue.
3,320✔
404
                m.wireCond.L.Unlock()
3,320✔
405

3,320✔
406
                // With the next message obtained, we'll now select to attempt
3,320✔
407
                // to deliver the message. If we receive a kill signal, then
3,320✔
408
                // we'll bail out.
3,320✔
409
                select {
3,320✔
410
                case m.messageOutbox <- nextMsg:
3,320✔
411
                case msgDone := <-m.msgReset:
×
412
                        m.wireCond.L.Lock()
×
413
                        m.wireMessages.Init()
×
414
                        m.wireCond.L.Unlock()
×
415

×
416
                        close(msgDone)
×
417
                case <-m.quit:
×
418
                        return
×
419
                }
420
        }
421
}
422

423
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
424
// packet messages.
425
func (m *memoryMailBox) pktMailCourier() {
336✔
426
        defer close(m.pktShutdown)
336✔
427

336✔
428
        for {
1,356✔
429
                // First, we'll check our condition. If our mailbox is empty,
1,020✔
430
                // then we'll wait until a new item is added.
1,020✔
431
                m.pktCond.L.Lock()
1,020✔
432
                for m.repHead == nil && m.addHead == nil {
2,405✔
433
                        m.pktCond.Wait()
1,385✔
434

1,385✔
435
                        select {
1,385✔
436
                        // Resetting the packet queue means just moving our
437
                        // pointer to the front. This ensures that any un-ACK'd
438
                        // messages are re-delivered upon reconnect.
439
                        case pktDone := <-m.pktReset:
538✔
440
                                m.repHead = m.repPkts.Front()
538✔
441
                                m.addHead = m.addPkts.Front()
538✔
442

538✔
443
                                close(pktDone)
538✔
444

445
                        case <-m.quit:
307✔
446
                                m.pktCond.L.Unlock()
307✔
447
                                return
307✔
448
                        default:
522✔
449
                        }
450
                }
451

452
                var (
692✔
453
                        nextRep   *htlcPacket
692✔
454
                        nextRepEl *list.Element
692✔
455
                        nextAdd   *pktWithExpiry
692✔
456
                        nextAddEl *list.Element
692✔
457
                )
692✔
458
                // For packets, we actually never remove an item until it has
692✔
459
                // been ACK'd by the link. This ensures that if a read packet
692✔
460
                // doesn't make it into a commitment, then it'll be
692✔
461
                // re-delivered once the link comes back online.
692✔
462

692✔
463
                // Peek at the head of the Settle/Fails and Add queues. We peak
692✔
464
                // both even if there is a Settle/Fail present because we need
692✔
465
                // to set a deadline for the next pending Add if it's present.
692✔
466
                // Due to clock monotonicity, we know that the head of the Adds
692✔
467
                // is the next to expire.
692✔
468
                if m.repHead != nil {
785✔
469
                        //nolint:forcetypeassert
93✔
470
                        nextRep = m.repHead.Value.(*htlcPacket)
93✔
471
                        nextRepEl = m.repHead
93✔
472
                }
93✔
473
                if m.addHead != nil {
1,298✔
474
                        //nolint:forcetypeassert
606✔
475
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
606✔
476
                        nextAddEl = m.addHead
606✔
477
                }
606✔
478

479
                // Now that we're done with the condition, we can unlock it to
480
                // allow any callers to append to the end of our target queue.
481
                m.pktCond.L.Unlock()
692✔
482

692✔
483
                var (
692✔
484
                        pktOutbox chan *htlcPacket
692✔
485
                        addOutbox chan *htlcPacket
692✔
486
                        add       *htlcPacket
692✔
487
                        deadline  <-chan time.Time
692✔
488
                )
692✔
489

692✔
490
                // Prioritize delivery of Settle/Fail packets over Adds. This
692✔
491
                // ensures that we actively clear the commitment of existing
692✔
492
                // HTLCs before trying to add new ones. This can help to improve
692✔
493
                // forwarding performance since the time to sign a commitment is
692✔
494
                // linear in the number of HTLCs manifested on the commitments.
692✔
495
                //
692✔
496
                // NOTE: Both types are eventually delivered over the same
692✔
497
                // channel, but we can control which is delivered by exclusively
692✔
498
                // making one nil and the other non-nil. We know from our loop
692✔
499
                // condition that at least one nextRep and nextAdd are non-nil.
692✔
500
                if nextRep != nil {
785✔
501
                        pktOutbox = m.pktOutbox
93✔
502
                } else {
695✔
503
                        addOutbox = m.pktOutbox
602✔
504
                }
602✔
505

506
                // If we have a pending Add, we'll also construct the deadline
507
                // so we can fail it back if we are unable to deliver any
508
                // message in time. We also dereference the nextAdd's packet,
509
                // since we will need access to it in the case we are delivering
510
                // it and/or if the deadline expires.
511
                //
512
                // NOTE: It's possible after this point for add to be nil, but
513
                // this can only occur when addOutbox is also nil, hence we
514
                // won't accidentally deliver a nil packet.
515
                if nextAdd != nil {
1,298✔
516
                        add = nextAdd.pkt
606✔
517
                        deadline = nextAdd.deadline(m.cfg.clock)
606✔
518
                }
606✔
519

520
                select {
692✔
521
                case pktOutbox <- nextRep:
91✔
522
                        m.pktCond.L.Lock()
91✔
523
                        // Only advance the repHead if this Settle or Fail is
91✔
524
                        // still at the head of the queue.
91✔
525
                        if m.repHead != nil && m.repHead == nextRepEl {
176✔
526
                                m.repHead = m.repHead.Next()
85✔
527
                        }
85✔
528
                        m.pktCond.L.Unlock()
91✔
529

530
                case addOutbox <- add:
558✔
531
                        m.pktCond.L.Lock()
558✔
532
                        // Only advance the addHead if this Add is still at the
558✔
533
                        // head of the queue.
558✔
534
                        if m.addHead != nil && m.addHead == nextAddEl {
1,113✔
535
                                m.addHead = m.addHead.Next()
555✔
536
                        }
555✔
537
                        m.pktCond.L.Unlock()
558✔
538

539
                case <-deadline:
36✔
540
                        log.Debugf("Expiring add htlc with "+
36✔
541
                                "keystone=%v", add.keystone())
36✔
542
                        m.FailAdd(add)
36✔
543

544
                case pktDone := <-m.pktReset:
8✔
545
                        m.pktCond.L.Lock()
8✔
546
                        m.repHead = m.repPkts.Front()
8✔
547
                        m.addHead = m.addPkts.Front()
8✔
548
                        m.pktCond.L.Unlock()
8✔
549

8✔
550
                        close(pktDone)
8✔
551

552
                case <-m.quit:
8✔
553
                        return
8✔
554
                }
555
        }
556
}
557

558
// AddMessage appends a new message to the end of the message queue.
559
//
560
// NOTE: This method is safe for concrete use and part of the MailBox
561
// interface.
562
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
3,320✔
563
        // First, we'll lock the condition, and add the message to the end of
3,320✔
564
        // the wire message inbox.
3,320✔
565
        m.wireCond.L.Lock()
3,320✔
566
        m.wireMessages.PushBack(msg)
3,320✔
567
        m.wireCond.L.Unlock()
3,320✔
568

3,320✔
569
        // With the message added, we signal to the mailCourier that there are
3,320✔
570
        // additional messages to deliver.
3,320✔
571
        m.wireCond.Signal()
3,320✔
572

3,320✔
573
        return nil
3,320✔
574
}
3,320✔
575

576
// AddPacket appends a new message to the end of the packet queue.
577
//
578
// NOTE: This method is safe for concrete use and part of the MailBox
579
// interface.
580
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
2,099✔
581
        m.pktCond.L.Lock()
2,099✔
582
        switch htlc := pkt.htlc.(type) {
2,099✔
583
        // Split off Settle/Fail packets into the repPkts queue.
584
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
95✔
585
                if _, ok := m.repIndex[pkt.inKey()]; ok {
97✔
586
                        m.pktCond.L.Unlock()
2✔
587
                        return ErrPacketAlreadyExists
2✔
588
                }
2✔
589

590
                entry := m.repPkts.PushBack(pkt)
93✔
591
                m.repIndex[pkt.inKey()] = entry
93✔
592
                if m.repHead == nil {
183✔
593
                        m.repHead = entry
90✔
594
                }
90✔
595

596
        // Split off Add packets into the addPkts queue.
597
        case *lnwire.UpdateAddHTLC:
2,007✔
598
                if _, ok := m.addIndex[pkt.inKey()]; ok {
2,008✔
599
                        m.pktCond.L.Unlock()
1✔
600
                        return ErrPacketAlreadyExists
1✔
601
                }
1✔
602

603
                entry := m.addPkts.PushBack(&pktWithExpiry{
2,006✔
604
                        pkt:    pkt,
2,006✔
605
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
2,006✔
606
                })
2,006✔
607
                m.addIndex[pkt.inKey()] = entry
2,006✔
608
                if m.addHead == nil {
2,456✔
609
                        m.addHead = entry
450✔
610
                }
450✔
611

612
        default:
×
613
                m.pktCond.L.Unlock()
×
614
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
615
        }
616
        m.pktCond.L.Unlock()
2,096✔
617

2,096✔
618
        // With the packet added, we signal to the mailCourier that there are
2,096✔
619
        // additional packets to consume.
2,096✔
620
        m.pktCond.Signal()
2,096✔
621

2,096✔
622
        return nil
2,096✔
623
}
624

625
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
626
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
224✔
627
        m.pktCond.L.Lock()
224✔
628
        defer m.pktCond.L.Unlock()
224✔
629

224✔
630
        m.feeRate = feeRate
224✔
631
}
224✔
632

633
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
634
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
340✔
635
        m.pktCond.L.Lock()
340✔
636
        defer m.pktCond.L.Unlock()
340✔
637

340✔
638
        m.isDust = isDust
340✔
639
}
340✔
640

641
// DustPackets returns the dust sum for add packets in the mailbox. The first
642
// return value is the local dust sum and the second is the remote dust sum.
643
// This will keep track of a given dust HTLC from the time it is added via
644
// AddPacket until it is removed via AckPacket.
645
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
646
        lnwire.MilliSatoshi) {
416✔
647

416✔
648
        m.pktCond.L.Lock()
416✔
649
        defer m.pktCond.L.Unlock()
416✔
650

416✔
651
        var (
416✔
652
                localDustSum  lnwire.MilliSatoshi
416✔
653
                remoteDustSum lnwire.MilliSatoshi
416✔
654
        )
416✔
655

416✔
656
        // Run through the map of HTLC's and determine the dust sum with calls
416✔
657
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
416✔
658
        // are outgoing so the second argument to isDust will be false.
416✔
659
        for _, e := range m.addIndex {
3,797✔
660
                addPkt := e.Value.(*pktWithExpiry).pkt
3,381✔
661

3,381✔
662
                // Evaluate whether this HTLC is dust on the local commitment.
3,381✔
663
                if m.isDust(
3,381✔
664
                        m.feeRate, false, lntypes.Local,
3,381✔
665
                        addPkt.amount.ToSatoshis(),
3,381✔
666
                ) {
6,756✔
667

3,375✔
668
                        localDustSum += addPkt.amount
3,375✔
669
                }
3,375✔
670

671
                // Evaluate whether this HTLC is dust on the remote commitment.
672
                if m.isDust(
3,381✔
673
                        m.feeRate, false, lntypes.Remote,
3,381✔
674
                        addPkt.amount.ToSatoshis(),
3,381✔
675
                ) {
6,762✔
676

3,381✔
677
                        remoteDustSum += addPkt.amount
3,381✔
678
                }
3,381✔
679
        }
680

681
        return localDustSum, remoteDustSum
416✔
682
}
683

684
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
685
// from the in-memory replay buffer. This will prevent the packet from being
686
// delivered after the link restarts if the switch has remained online. The
687
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
688
// FailureDetail.
689
func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
54✔
690
        // First, remove the packet from mailbox. If we didn't find the packet
54✔
691
        // because it has already been acked, we'll exit early to avoid sending
54✔
692
        // a duplicate fail message through the switch.
54✔
693
        if !m.AckPacket(pkt.inKey()) {
59✔
694
                return
5✔
695
        }
5✔
696

697
        var (
49✔
698
                localFailure = false
49✔
699
                reason       lnwire.OpaqueReason
49✔
700
        )
49✔
701

49✔
702
        // Create a temporary channel failure which we will send back to our
49✔
703
        // peer if this is a forward, or report to the user if the failed
49✔
704
        // payment was locally initiated.
49✔
705
        failure := m.cfg.failMailboxUpdate(
49✔
706
                pkt.originalOutgoingChanID, m.cfg.shortChanID,
49✔
707
        )
49✔
708

49✔
709
        // If the payment was locally initiated (which is indicated by a nil
49✔
710
        // obfuscator), we do not need to encrypt it back to the sender.
49✔
711
        if pkt.obfuscator == nil {
91✔
712
                var b bytes.Buffer
42✔
713
                err := lnwire.EncodeFailure(&b, failure, 0)
42✔
714
                if err != nil {
42✔
715
                        log.Errorf("Unable to encode failure: %v", err)
×
716
                        return
×
717
                }
×
718
                reason = lnwire.OpaqueReason(b.Bytes())
42✔
719
                localFailure = true
42✔
720
        } else {
7✔
721
                // If the packet is part of a forward, (identified by a non-nil
7✔
722
                // obfuscator) we need to encrypt the error back to the source.
7✔
723
                var err error
7✔
724
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
7✔
725
                if err != nil {
7✔
726
                        log.Errorf("Unable to obfuscate error: %v", err)
×
727
                        return
×
728
                }
×
729
        }
730

731
        // Create a link error containing the temporary channel failure and a
732
        // detail which indicates the we failed to add the htlc.
733
        linkError := NewDetailedLinkError(
49✔
734
                failure, OutgoingFailureDownstreamHtlcAdd,
49✔
735
        )
49✔
736

49✔
737
        failPkt := &htlcPacket{
49✔
738
                incomingChanID: pkt.incomingChanID,
49✔
739
                incomingHTLCID: pkt.incomingHTLCID,
49✔
740
                circuit:        pkt.circuit,
49✔
741
                sourceRef:      pkt.sourceRef,
49✔
742
                hasSource:      true,
49✔
743
                localFailure:   localFailure,
49✔
744
                obfuscator:     pkt.obfuscator,
49✔
745
                linkFailure:    linkError,
49✔
746
                htlc: &lnwire.UpdateFailHTLC{
49✔
747
                        Reason: reason,
49✔
748
                },
49✔
749
        }
49✔
750

49✔
751
        if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
49✔
752
                log.Errorf("Unhandled error while reforwarding packets "+
×
753
                        "settle/fail over htlcswitch: %v", err)
×
754
        }
×
755
}
756

757
// MessageOutBox returns a channel that any new messages ready for delivery
758
// will be sent on.
759
//
760
// NOTE: This method is part of the MailBox interface.
761
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
236✔
762
        return m.messageOutbox
236✔
763
}
236✔
764

765
// PacketOutBox returns a channel that any new packets ready for delivery will
766
// be sent on.
767
//
768
// NOTE: This method is part of the MailBox interface.
769
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
387✔
770
        return m.pktOutbox
387✔
771
}
387✔
772

773
// mailOrchestrator is responsible for coordinating the creation and lifecycle
774
// of mailboxes used within the switch. It supports the ability to create
775
// mailboxes, reassign their short channel id's, deliver htlc packets, and
776
// queue packets for mailboxes that have not been created due to a link's late
777
// registration.
778
type mailOrchestrator struct {
779
        mu sync.RWMutex
780

781
        cfg *mailOrchConfig
782

783
        // mailboxes caches exactly one mailbox for all known channels.
784
        mailboxes map[lnwire.ChannelID]MailBox
785

786
        // liveIndex maps a live short chan id to the primary mailbox key.
787
        // An index in liveIndex map is only entered under two conditions:
788
        //   1. A link has a non-zero short channel id at time of AddLink.
789
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
790
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
791

792
        // TODO(conner): add another pair of indexes:
793
        //   chan_id -> short_chan_id
794
        //   short_chan_id -> mailbox
795
        // so that Deliver can lookup mailbox directly once live,
796
        // but still queryable by channel_id.
797

798
        // unclaimedPackets maps a live short chan id to queue of packets if no
799
        // mailbox has been created.
800
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
801
}
802

803
type mailOrchConfig struct {
804
        // forwardPackets send a varidic number of htlcPackets to the switch to
805
        // be routed. A quit channel should be provided so that the call can
806
        // properly exit during shutdown.
807
        forwardPackets func(<-chan struct{}, ...*htlcPacket) error
808

809
        // clock is a time source for the generated mailboxes.
810
        clock clock.Clock
811

812
        // expiry is the interval after which Adds will be cancelled if they
813
        // have not been yet been delivered. The computed deadline will expiry
814
        // this long after the Adds are added to a mailbox via AddPacket.
815
        expiry time.Duration
816

817
        // failMailboxUpdate is used to fail an expired HTLC and use the
818
        // correct SCID if the underlying channel uses aliases.
819
        failMailboxUpdate func(outScid,
820
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
821
}
822

823
// newMailOrchestrator initializes a fresh mailOrchestrator.
824
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
345✔
825
        return &mailOrchestrator{
345✔
826
                cfg:              cfg,
345✔
827
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
345✔
828
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
345✔
829
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
345✔
830
        }
345✔
831
}
345✔
832

833
// Stop instructs the orchestrator to stop all active mailboxes.
834
func (mo *mailOrchestrator) Stop() {
313✔
835
        for _, mailbox := range mo.mailboxes {
617✔
836
                mailbox.Stop()
304✔
837
        }
304✔
838
}
839

840
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
841
// creates and returns a new mailbox if none is found.
842
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
843
        shortChanID lnwire.ShortChannelID) MailBox {
743✔
844

743✔
845
        // First, try lookup the mailbox directly using only the shared mutex.
743✔
846
        mo.mu.RLock()
743✔
847
        mailbox, ok := mo.mailboxes[chanID]
743✔
848
        if ok {
1,161✔
849
                mo.mu.RUnlock()
418✔
850
                return mailbox
418✔
851
        }
418✔
852
        mo.mu.RUnlock()
328✔
853

328✔
854
        // Otherwise, we will try again with exclusive lock, creating a mailbox
328✔
855
        // if one still has not been created.
328✔
856
        mo.mu.Lock()
328✔
857
        mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
328✔
858
        mo.mu.Unlock()
328✔
859

328✔
860
        return mailbox
328✔
861
}
862

863
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
864
// given channel id. If none is found, a new one is creates, started, and
865
// recorded.
866
//
867
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
868
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
869
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
328✔
870

328✔
871
        mailbox, ok := mo.mailboxes[chanID]
328✔
872
        if !ok {
656✔
873
                mailbox = newMemoryMailBox(&mailBoxConfig{
328✔
874
                        shortChanID:       shortChanID,
328✔
875
                        forwardPackets:    mo.cfg.forwardPackets,
328✔
876
                        clock:             mo.cfg.clock,
328✔
877
                        expiry:            mo.cfg.expiry,
328✔
878
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
328✔
879
                })
328✔
880
                mailbox.Start()
328✔
881
                mo.mailboxes[chanID] = mailbox
328✔
882
        }
328✔
883

884
        return mailbox
328✔
885
}
886

887
// BindLiveShortChanID registers that messages bound for a particular short
888
// channel id should be forwarded to the mailbox corresponding to the given
889
// channel id. This method also checks to see if there are any unclaimed
890
// packets for this short_chan_id. If any are found, they are delivered to the
891
// mailbox and removed (marked as claimed).
892
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
893
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
339✔
894

339✔
895
        mo.mu.Lock()
339✔
896
        // Update the mapping from short channel id to mailbox's channel id.
339✔
897
        mo.liveIndex[sid] = cid
339✔
898

339✔
899
        // Retrieve any unclaimed packets destined for this mailbox.
339✔
900
        pkts := mo.unclaimedPackets[sid]
339✔
901
        delete(mo.unclaimedPackets, sid)
339✔
902
        mo.mu.Unlock()
339✔
903

339✔
904
        // Deliver the unclaimed packets.
339✔
905
        for _, pkt := range pkts {
347✔
906
                mailbox.AddPacket(pkt)
8✔
907
        }
8✔
908
}
909

910
// Deliver lookups the target mailbox using the live index from short_chan_id
911
// to channel_id. If the mailbox is found, the message is delivered directly.
912
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
913
// mailbox upon the subsequent call to BindLiveShortChanID.
914
func (mo *mailOrchestrator) Deliver(
915
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
92✔
916

92✔
917
        var (
92✔
918
                mailbox MailBox
92✔
919
                found   bool
92✔
920
        )
92✔
921

92✔
922
        // First, try to find the channel id for the target short_chan_id. If
92✔
923
        // the link is live, we will also look up the created mailbox.
92✔
924
        mo.mu.RLock()
92✔
925
        chanID, isLive := mo.liveIndex[sid]
92✔
926
        if isLive {
179✔
927
                mailbox, found = mo.mailboxes[chanID]
87✔
928
        }
87✔
929
        mo.mu.RUnlock()
92✔
930

92✔
931
        // The link is live and target mailbox was found, deliver immediately.
92✔
932
        if isLive && found {
179✔
933
                return mailbox.AddPacket(pkt)
87✔
934
        }
87✔
935

936
        // If we detected that the link has not been made live, we will acquire
937
        // the exclusive lock preemptively in order to queue this packet in the
938
        // list of unclaimed packets.
939
        mo.mu.Lock()
8✔
940

8✔
941
        // Double check to see if the mailbox has been not made live since the
8✔
942
        // release of the shared lock.
8✔
943
        //
8✔
944
        // NOTE: Checking again with the exclusive lock held prevents a race
8✔
945
        // condition where BindLiveShortChanID is interleaved between the
8✔
946
        // release of the shared lock, and acquiring the exclusive lock. The
8✔
947
        // result would be stuck packets, as they wouldn't be redelivered until
8✔
948
        // the next call to BindLiveShortChanID, which is expected to occur
8✔
949
        // infrequently.
8✔
950
        chanID, isLive = mo.liveIndex[sid]
8✔
951
        if isLive {
8✔
952
                // Reaching this point indicates the mailbox is actually live.
×
953
                // We'll try to load the mailbox using the fresh channel id.
×
954
                //
×
955
                // NOTE: This should never create a new mailbox, as the live
×
956
                // index should only be set if the mailbox had been initialized
×
957
                // beforehand.  However, this does ensure that this case is
×
958
                // handled properly in the event that it could happen.
×
959
                mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
×
960
                mo.mu.Unlock()
×
961

×
962
                // Deliver the packet to the mailbox if it was found or created.
×
963
                return mailbox.AddPacket(pkt)
×
964
        }
×
965

966
        // Finally, if the channel id is still not found in the live index,
967
        // we'll add this to the list of unclaimed packets. These will be
968
        // delivered upon the next call to BindLiveShortChanID.
969
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
8✔
970
        mo.mu.Unlock()
8✔
971

8✔
972
        return nil
8✔
973
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc