• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9111774206

pending completion
9111774206

Pull #8765

github

hieblmi
routing: log edge when skipping it
Pull Request #8765: routing: log edge when skipping it

1 of 1 new or added line in 1 file covered. (100.0%)

104 existing lines in 27 files now uncovered.

122984 of 210570 relevant lines covered (58.41%)

28065.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.16
/htlcswitch/mailbox.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
)
15

16
var (
17
        // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by
18
        // a shutdown request.
19
        ErrMailBoxShuttingDown = errors.New("mailbox is shutting down")
20

21
        // ErrPacketAlreadyExists signals that an attempt to add a packet failed
22
        // because it already exists in the mailbox.
23
        ErrPacketAlreadyExists = errors.New("mailbox already has packet")
24
)
25

26
// MailBox is an interface which represents a concurrent-safe, in-order
27
// delivery queue for messages from the network and also from the main switch.
28
// This struct serves as a buffer between incoming messages, and messages to
29
// the handled by the link. Each of the mutating methods within this interface
30
// should be implemented in a non-blocking manner.
31
type MailBox interface {
32
        // AddMessage appends a new message to the end of the message queue.
33
        AddMessage(msg lnwire.Message) error
34

35
        // AddPacket appends a new message to the end of the packet queue.
36
        AddPacket(pkt *htlcPacket) error
37

38
        // HasPacket queries the packets for a circuit key, this is used to drop
39
        // packets bound for the switch that already have a queued response.
40
        HasPacket(CircuitKey) bool
41

42
        // AckPacket removes a packet from the mailboxes in-memory replay
43
        // buffer. This will prevent a packet from being delivered after a link
44
        // restarts if the switch has remained online. The returned boolean
45
        // indicates whether or not a packet with the passed incoming circuit
46
        // key was removed.
47
        AckPacket(CircuitKey) bool
48

49
        // FailAdd fails an UpdateAddHTLC that exists within the mailbox,
50
        // removing it from the in-memory replay buffer. This will prevent the
51
        // packet from being delivered after the link restarts if the switch has
52
        // remained online. The generated LinkError will show an
53
        // OutgoingFailureDownstreamHtlcAdd FailureDetail.
54
        FailAdd(pkt *htlcPacket)
55

56
        // MessageOutBox returns a channel that any new messages ready for
57
        // delivery will be sent on.
58
        MessageOutBox() chan lnwire.Message
59

60
        // PacketOutBox returns a channel that any new packets ready for
61
        // delivery will be sent on.
62
        PacketOutBox() chan *htlcPacket
63

64
        // Clears any pending wire messages from the inbox.
65
        ResetMessages() error
66

67
        // Reset the packet head to point at the first element in the list.
68
        ResetPackets() error
69

70
        // SetDustClosure takes in a closure that is used to evaluate whether
71
        // mailbox HTLC's are dust.
72
        SetDustClosure(isDust dustClosure)
73

74
        // SetFeeRate sets the feerate to be used when evaluating dust.
75
        SetFeeRate(feerate chainfee.SatPerKWeight)
76

77
        // DustPackets returns the dust sum for Adds in the mailbox for the
78
        // local and remote commitments.
79
        DustPackets() (lnwire.MilliSatoshi, lnwire.MilliSatoshi)
80

81
        // Start starts the mailbox and any goroutines it needs to operate
82
        // properly.
83
        Start()
84

85
        // Stop signals the mailbox and its goroutines for a graceful shutdown.
86
        Stop()
87
}
88

89
type mailBoxConfig struct {
90
        // shortChanID is the short channel id of the channel this mailbox
91
        // belongs to.
92
        shortChanID lnwire.ShortChannelID
93

94
        // forwardPackets send a varidic number of htlcPackets to the switch to
95
        // be routed. A quit channel should be provided so that the call can
96
        // properly exit during shutdown.
97
        forwardPackets func(chan struct{}, ...*htlcPacket) error
98

99
        // clock is a time source for the mailbox.
100
        clock clock.Clock
101

102
        // expiry is the interval after which Adds will be cancelled if they
103
        // have not been yet been delivered. The computed deadline will expiry
104
        // this long after the Adds are added via AddPacket.
105
        expiry time.Duration
106

107
        // failMailboxUpdate is used to fail an expired HTLC and use the
108
        // correct SCID if the underlying channel uses aliases.
109
        failMailboxUpdate func(outScid,
110
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
111
}
112

113
// memoryMailBox is an implementation of the MailBox struct backed by purely
114
// in-memory queues.
115
//
116
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
117
type memoryMailBox struct {
118
        started sync.Once
119
        stopped sync.Once
120

121
        cfg *mailBoxConfig
122

123
        wireMessages *list.List
124
        wireMtx      sync.Mutex
125
        wireCond     *sync.Cond
126

127
        messageOutbox chan lnwire.Message
128
        msgReset      chan chan struct{}
129

130
        // repPkts is a queue for reply packets, e.g. Settles and Fails.
131
        repPkts  *list.List
132
        repIndex map[CircuitKey]*list.Element
133
        repHead  *list.Element
134

135
        // addPkts is a dedicated queue for Adds.
136
        addPkts  *list.List
137
        addIndex map[CircuitKey]*list.Element
138
        addHead  *list.Element
139

140
        pktMtx  sync.Mutex
141
        pktCond *sync.Cond
142

143
        pktOutbox chan *htlcPacket
144
        pktReset  chan chan struct{}
145

146
        wireShutdown chan struct{}
147
        pktShutdown  chan struct{}
148
        quit         chan struct{}
149

150
        // feeRate is set when the link receives or sends out fee updates. It
151
        // is refreshed when AttachMailBox is called in case a fee update did
152
        // not get committed. In some cases it may be out of sync with the
153
        // channel's feerate, but it should eventually get back in sync.
154
        feeRate chainfee.SatPerKWeight
155

156
        // isDust is set when AttachMailBox is called and serves to evaluate
157
        // the outstanding dust in the memoryMailBox given the current set
158
        // feeRate.
159
        isDust dustClosure
160
}
161

162
// newMemoryMailBox creates a new instance of the memoryMailBox.
163
func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
335✔
164
        box := &memoryMailBox{
335✔
165
                cfg:           cfg,
335✔
166
                wireMessages:  list.New(),
335✔
167
                repPkts:       list.New(),
335✔
168
                addPkts:       list.New(),
335✔
169
                messageOutbox: make(chan lnwire.Message),
335✔
170
                pktOutbox:     make(chan *htlcPacket),
335✔
171
                msgReset:      make(chan chan struct{}, 1),
335✔
172
                pktReset:      make(chan chan struct{}, 1),
335✔
173
                repIndex:      make(map[CircuitKey]*list.Element),
335✔
174
                addIndex:      make(map[CircuitKey]*list.Element),
335✔
175
                wireShutdown:  make(chan struct{}),
335✔
176
                pktShutdown:   make(chan struct{}),
335✔
177
                quit:          make(chan struct{}),
335✔
178
        }
335✔
179
        box.wireCond = sync.NewCond(&box.wireMtx)
335✔
180
        box.pktCond = sync.NewCond(&box.pktMtx)
335✔
181

335✔
182
        return box
335✔
183
}
335✔
184

185
// A compile time assertion to ensure that memoryMailBox meets the MailBox
186
// interface.
187
var _ MailBox = (*memoryMailBox)(nil)
188

189
// courierType is an enum that reflects the distinct types of messages a
190
// MailBox can handle. Each type will be placed in an isolated mail box and
191
// will have a dedicated goroutine for delivering the messages.
192
type courierType uint8
193

194
const (
195
        // wireCourier is a type of courier that handles wire messages.
196
        wireCourier courierType = iota
197

198
        // pktCourier is a type of courier that handles htlc packets.
199
        pktCourier
200
)
201

202
// Start starts the mailbox and any goroutines it needs to operate properly.
203
//
204
// NOTE: This method is part of the MailBox interface.
205
func (m *memoryMailBox) Start() {
336✔
206
        m.started.Do(func() {
671✔
207
                go m.wireMailCourier()
335✔
208
                go m.pktMailCourier()
335✔
209
        })
335✔
210
}
211

212
// ResetMessages blocks until all buffered wire messages are cleared.
213
func (m *memoryMailBox) ResetMessages() error {
339✔
214
        msgDone := make(chan struct{})
339✔
215
        select {
339✔
216
        case m.msgReset <- msgDone:
339✔
217
                return m.signalUntilReset(wireCourier, msgDone)
339✔
UNCOV
218
        case <-m.quit:
×
UNCOV
219
                return ErrMailBoxShuttingDown
×
220
        }
221
}
222

223
// ResetPackets blocks until the head of packets buffer is reset, causing the
224
// packets to be redelivered in order.
225
func (m *memoryMailBox) ResetPackets() error {
541✔
226
        pktDone := make(chan struct{})
541✔
227
        select {
541✔
228
        case m.pktReset <- pktDone:
541✔
229
                return m.signalUntilReset(pktCourier, pktDone)
541✔
230
        case <-m.quit:
×
231
                return ErrMailBoxShuttingDown
×
232
        }
233
}
234

235
// signalUntilReset strobes the condition variable for the specified inbox type
236
// until receiving a response that the mailbox has processed a reset.
237
func (m *memoryMailBox) signalUntilReset(cType courierType,
238
        done chan struct{}) error {
876✔
239

876✔
240
        for {
2,080✔
241

1,204✔
242
                switch cType {
1,204✔
243
                case wireCourier:
667✔
244
                        m.wireCond.Signal()
667✔
245
                case pktCourier:
541✔
246
                        m.pktCond.Signal()
541✔
247
                }
248

249
                select {
1,204✔
250
                case <-time.After(time.Millisecond):
332✔
251
                        continue
332✔
252
                case <-done:
874✔
253
                        return nil
874✔
254
                case <-m.quit:
2✔
255
                        return ErrMailBoxShuttingDown
2✔
256
                }
257
        }
258
}
259

260
// AckPacket removes the packet identified by it's incoming circuit key from the
261
// queue of packets to be delivered. The returned boolean indicates whether or
262
// not a packet with the passed incoming circuit key was removed.
263
//
264
// NOTE: It is safe to call this method multiple times for the same circuit key.
265
func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
1,643✔
266
        m.pktCond.L.Lock()
1,643✔
267
        defer m.pktCond.L.Unlock()
1,643✔
268

1,643✔
269
        if entry, ok := m.repIndex[inKey]; ok {
1,705✔
270
                // Check whether we are removing the head of the queue. If so,
62✔
271
                // we must advance the head to the next packet before removing.
62✔
272
                // It's possible that the courier has already advanced the
62✔
273
                // repHead, so this check prevents the repHead from getting
62✔
274
                // desynchronized.
62✔
275
                if entry == m.repHead {
69✔
276
                        m.repHead = entry.Next()
7✔
277
                }
7✔
278
                m.repPkts.Remove(entry)
62✔
279
                delete(m.repIndex, inKey)
62✔
280

62✔
281
                return true
62✔
282
        }
283

284
        if entry, ok := m.addIndex[inKey]; ok {
3,161✔
285
                // Check whether we are removing the head of the queue. If so,
1,576✔
286
                // we must advance the head to the next add before removing.
1,576✔
287
                // It's possible that the courier has already advanced the
1,576✔
288
                // addHead, so this check prevents the addHead from getting
1,576✔
289
                // desynchronized.
1,576✔
290
                //
1,576✔
291
                // NOTE: While this event is rare for Settles or Fails, it could
1,576✔
292
                // be very common for Adds since the mailbox has the ability to
1,576✔
293
                // cancel Adds before they are delivered. When that occurs, the
1,576✔
294
                // head of addPkts has only been peeked and we expect to be
1,576✔
295
                // removing the head of the queue.
1,576✔
296
                if entry == m.addHead {
1,622✔
297
                        m.addHead = entry.Next()
46✔
298
                }
46✔
299

300
                m.addPkts.Remove(entry)
1,576✔
301
                delete(m.addIndex, inKey)
1,576✔
302

1,576✔
303
                return true
1,576✔
304
        }
305

306
        return false
9✔
307
}
308

309
// HasPacket queries the packets for a circuit key, this is used to drop packets
310
// bound for the switch that already have a queued response.
311
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
1,446✔
312
        m.pktCond.L.Lock()
1,446✔
313
        _, ok := m.repIndex[inKey]
1,446✔
314
        m.pktCond.L.Unlock()
1,446✔
315

1,446✔
316
        return ok
1,446✔
317
}
1,446✔
318

319
// Stop signals the mailbox and its goroutines for a graceful shutdown.
320
//
321
// NOTE: This method is part of the MailBox interface.
322
func (m *memoryMailBox) Stop() {
312✔
323
        m.stopped.Do(func() {
623✔
324
                close(m.quit)
311✔
325

311✔
326
                m.signalUntilShutdown(wireCourier)
311✔
327
                m.signalUntilShutdown(pktCourier)
311✔
328
        })
311✔
329
}
330

331
// signalUntilShutdown strobes the condition variable of the passed courier
332
// type, blocking until the worker has exited.
333
func (m *memoryMailBox) signalUntilShutdown(cType courierType) {
618✔
334
        var (
618✔
335
                cond     *sync.Cond
618✔
336
                shutdown chan struct{}
618✔
337
        )
618✔
338

618✔
339
        switch cType {
618✔
340
        case wireCourier:
311✔
341
                cond = m.wireCond
311✔
342
                shutdown = m.wireShutdown
311✔
343
        case pktCourier:
311✔
344
                cond = m.pktCond
311✔
345
                shutdown = m.pktShutdown
311✔
346
        }
347

348
        for {
1,840✔
349
                select {
1,222✔
350
                case <-time.After(time.Millisecond):
608✔
351
                        cond.Signal()
608✔
352
                case <-shutdown:
618✔
353
                        return
618✔
354
                }
355
        }
356
}
357

358
// pktWithExpiry wraps an incoming packet and records the time at which it it
359
// should be canceled from the mailbox. This will be used to detect if it gets
360
// stuck in the mailbox and inform when to cancel back.
361
type pktWithExpiry struct {
362
        pkt    *htlcPacket
363
        expiry time.Time
364
}
365

366
func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
1,656✔
367
        return clock.TickAfter(p.expiry.Sub(clock.Now()))
1,656✔
368
}
1,656✔
369

370
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
371
// wire messages.
372
func (m *memoryMailBox) wireMailCourier() {
335✔
373
        defer close(m.wireShutdown)
335✔
374

335✔
375
        for {
7,485✔
376
                // First, we'll check our condition. If our mailbox is empty,
7,150✔
377
                // then we'll wait until a new item is added.
7,150✔
378
                m.wireCond.L.Lock()
7,150✔
379
                for m.wireMessages.Front() == nil {
13,181✔
380
                        m.wireCond.Wait()
6,031✔
381

6,031✔
382
                        select {
6,031✔
383
                        case msgDone := <-m.msgReset:
338✔
384
                                m.wireMessages.Init()
338✔
385
                                close(msgDone)
338✔
386
                        case <-m.quit:
311✔
387
                                m.wireCond.L.Unlock()
311✔
388
                                return
311✔
389
                        default:
5,366✔
390
                        }
391
                }
392

393
                // Grab the datum off the front of the queue, shifting the
394
                // slice's reference down one in order to remove the datum from
395
                // the queue.
396
                entry := m.wireMessages.Front()
6,819✔
397

6,819✔
398
                //nolint:forcetypeassert
6,819✔
399
                nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
6,819✔
400

6,819✔
401
                // Now that we're done with the condition, we can unlock it to
6,819✔
402
                // allow any callers to append to the end of our target queue.
6,819✔
403
                m.wireCond.L.Unlock()
6,819✔
404

6,819✔
405
                // With the next message obtained, we'll now select to attempt
6,819✔
406
                // to deliver the message. If we receive a kill signal, then
6,819✔
407
                // we'll bail out.
6,819✔
408
                select {
6,819✔
409
                case m.messageOutbox <- nextMsg:
6,819✔
410
                case msgDone := <-m.msgReset:
×
411
                        m.wireCond.L.Lock()
×
412
                        m.wireMessages.Init()
×
413
                        m.wireCond.L.Unlock()
×
414

×
415
                        close(msgDone)
×
416
                case <-m.quit:
×
417
                        return
×
418
                }
419
        }
420
}
421

422
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
423
// packet messages.
424
func (m *memoryMailBox) pktMailCourier() {
335✔
425
        defer close(m.pktShutdown)
335✔
426

335✔
427
        for {
2,402✔
428
                // First, we'll check our condition. If our mailbox is empty,
2,067✔
429
                // then we'll wait until a new item is added.
2,067✔
430
                m.pktCond.L.Lock()
2,067✔
431
                for m.repHead == nil && m.addHead == nil {
3,403✔
432
                        m.pktCond.Wait()
1,336✔
433

1,336✔
434
                        select {
1,336✔
435
                        // Resetting the packet queue means just moving our
436
                        // pointer to the front. This ensures that any un-ACK'd
437
                        // messages are re-delivered upon reconnect.
438
                        case pktDone := <-m.pktReset:
530✔
439
                                m.repHead = m.repPkts.Front()
530✔
440
                                m.addHead = m.addPkts.Front()
530✔
441

530✔
442
                                close(pktDone)
530✔
443

444
                        case <-m.quit:
305✔
445
                                m.pktCond.L.Unlock()
305✔
446
                                return
305✔
447
                        default:
485✔
448
                        }
449
                }
450

451
                var (
1,742✔
452
                        nextRep   *htlcPacket
1,742✔
453
                        nextRepEl *list.Element
1,742✔
454
                        nextAdd   *pktWithExpiry
1,742✔
455
                        nextAddEl *list.Element
1,742✔
456
                )
1,742✔
457
                // For packets, we actually never remove an item until it has
1,742✔
458
                // been ACK'd by the link. This ensures that if a read packet
1,742✔
459
                // doesn't make it into a commitment, then it'll be
1,742✔
460
                // re-delivered once the link comes back online.
1,742✔
461

1,742✔
462
                // Peek at the head of the Settle/Fails and Add queues. We peak
1,742✔
463
                // both even if there is a Settle/Fail present because we need
1,742✔
464
                // to set a deadline for the next pending Add if it's present.
1,742✔
465
                // Due to clock monotonicity, we know that the head of the Adds
1,742✔
466
                // is the next to expire.
1,742✔
467
                if m.repHead != nil {
1,836✔
468
                        //nolint:forcetypeassert
94✔
469
                        nextRep = m.repHead.Value.(*htlcPacket)
94✔
470
                        nextRepEl = m.repHead
94✔
471
                }
94✔
472
                if m.addHead != nil {
3,398✔
473
                        //nolint:forcetypeassert
1,656✔
474
                        nextAdd = m.addHead.Value.(*pktWithExpiry)
1,656✔
475
                        nextAddEl = m.addHead
1,656✔
476
                }
1,656✔
477

478
                // Now that we're done with the condition, we can unlock it to
479
                // allow any callers to append to the end of our target queue.
480
                m.pktCond.L.Unlock()
1,742✔
481

1,742✔
482
                var (
1,742✔
483
                        pktOutbox chan *htlcPacket
1,742✔
484
                        addOutbox chan *htlcPacket
1,742✔
485
                        add       *htlcPacket
1,742✔
486
                        deadline  <-chan time.Time
1,742✔
487
                )
1,742✔
488

1,742✔
489
                // Prioritize delivery of Settle/Fail packets over Adds. This
1,742✔
490
                // ensures that we actively clear the commitment of existing
1,742✔
491
                // HTLCs before trying to add new ones. This can help to improve
1,742✔
492
                // forwarding performance since the time to sign a commitment is
1,742✔
493
                // linear in the number of HTLCs manifested on the commitments.
1,742✔
494
                //
1,742✔
495
                // NOTE: Both types are eventually delivered over the same
1,742✔
496
                // channel, but we can control which is delivered by exclusively
1,742✔
497
                // making one nil and the other non-nil. We know from our loop
1,742✔
498
                // condition that at least one nextRep and nextAdd are non-nil.
1,742✔
499
                if nextRep != nil {
1,836✔
500
                        pktOutbox = m.pktOutbox
94✔
501
                } else {
1,746✔
502
                        addOutbox = m.pktOutbox
1,652✔
503
                }
1,652✔
504

505
                // If we have a pending Add, we'll also construct the deadline
506
                // so we can fail it back if we are unable to deliver any
507
                // message in time. We also dereference the nextAdd's packet,
508
                // since we will need access to it in the case we are delivering
509
                // it and/or if the deadline expires.
510
                //
511
                // NOTE: It's possible after this point for add to be nil, but
512
                // this can only occur when addOutbox is also nil, hence we
513
                // won't accidentally deliver a nil packet.
514
                if nextAdd != nil {
3,398✔
515
                        add = nextAdd.pkt
1,656✔
516
                        deadline = nextAdd.deadline(m.cfg.clock)
1,656✔
517
                }
1,656✔
518

519
                select {
1,742✔
520
                case pktOutbox <- nextRep:
92✔
521
                        m.pktCond.L.Lock()
92✔
522
                        // Only advance the repHead if this Settle or Fail is
92✔
523
                        // still at the head of the queue.
92✔
524
                        if m.repHead != nil && m.repHead == nextRepEl {
177✔
525
                                m.repHead = m.repHead.Next()
85✔
526
                        }
85✔
527
                        m.pktCond.L.Unlock()
92✔
528

529
                case addOutbox <- add:
1,602✔
530
                        m.pktCond.L.Lock()
1,602✔
531
                        // Only advance the addHead if this Add is still at the
1,602✔
532
                        // head of the queue.
1,602✔
533
                        if m.addHead != nil && m.addHead == nextAddEl {
3,198✔
534
                                m.addHead = m.addHead.Next()
1,596✔
535
                        }
1,596✔
536
                        m.pktCond.L.Unlock()
1,602✔
537

538
                case <-deadline:
36✔
539
                        log.Debugf("Expiring add htlc with "+
36✔
540
                                "keystone=%v", add.keystone())
36✔
541
                        m.FailAdd(add)
36✔
542

543
                case pktDone := <-m.pktReset:
14✔
544
                        m.pktCond.L.Lock()
14✔
545
                        m.repHead = m.repPkts.Front()
14✔
546
                        m.addHead = m.addPkts.Front()
14✔
547
                        m.pktCond.L.Unlock()
14✔
548

14✔
549
                        close(pktDone)
14✔
550

551
                case <-m.quit:
10✔
552
                        return
10✔
553
                }
554
        }
555
}
556

557
// AddMessage appends a new message to the end of the message queue.
558
//
559
// NOTE: This method is safe for concrete use and part of the MailBox
560
// interface.
561
func (m *memoryMailBox) AddMessage(msg lnwire.Message) error {
6,819✔
562
        // First, we'll lock the condition, and add the message to the end of
6,819✔
563
        // the wire message inbox.
6,819✔
564
        m.wireCond.L.Lock()
6,819✔
565
        m.wireMessages.PushBack(msg)
6,819✔
566
        m.wireCond.L.Unlock()
6,819✔
567

6,819✔
568
        // With the message added, we signal to the mailCourier that there are
6,819✔
569
        // additional messages to deliver.
6,819✔
570
        m.wireCond.Signal()
6,819✔
571

6,819✔
572
        return nil
6,819✔
573
}
6,819✔
574

575
// AddPacket appends a new message to the end of the packet queue.
576
//
577
// NOTE: This method is safe for concrete use and part of the MailBox
578
// interface.
579
func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
3,143✔
580
        m.pktCond.L.Lock()
3,143✔
581
        switch htlc := pkt.htlc.(type) {
3,143✔
582
        // Split off Settle/Fail packets into the repPkts queue.
583
        case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
96✔
584
                if _, ok := m.repIndex[pkt.inKey()]; ok {
98✔
585
                        m.pktCond.L.Unlock()
2✔
586
                        return ErrPacketAlreadyExists
2✔
587
                }
2✔
588

589
                entry := m.repPkts.PushBack(pkt)
94✔
590
                m.repIndex[pkt.inKey()] = entry
94✔
591
                if m.repHead == nil {
184✔
592
                        m.repHead = entry
90✔
593
                }
90✔
594

595
        // Split off Add packets into the addPkts queue.
596
        case *lnwire.UpdateAddHTLC:
3,051✔
597
                if _, ok := m.addIndex[pkt.inKey()]; ok {
3,052✔
598
                        m.pktCond.L.Unlock()
1✔
599
                        return ErrPacketAlreadyExists
1✔
600
                }
1✔
601

602
                entry := m.addPkts.PushBack(&pktWithExpiry{
3,050✔
603
                        pkt:    pkt,
3,050✔
604
                        expiry: m.cfg.clock.Now().Add(m.cfg.expiry),
3,050✔
605
                })
3,050✔
606
                m.addIndex[pkt.inKey()] = entry
3,050✔
607
                if m.addHead == nil {
3,464✔
608
                        m.addHead = entry
414✔
609
                }
414✔
610

611
        default:
×
612
                m.pktCond.L.Unlock()
×
613
                return fmt.Errorf("unknown htlc type: %T", htlc)
×
614
        }
615
        m.pktCond.L.Unlock()
3,140✔
616

3,140✔
617
        // With the packet added, we signal to the mailCourier that there are
3,140✔
618
        // additional packets to consume.
3,140✔
619
        m.pktCond.Signal()
3,140✔
620

3,140✔
621
        return nil
3,140✔
622
}
623

624
// SetFeeRate sets the memoryMailBox's feerate for use in DustPackets.
625
func (m *memoryMailBox) SetFeeRate(feeRate chainfee.SatPerKWeight) {
223✔
626
        m.pktCond.L.Lock()
223✔
627
        defer m.pktCond.L.Unlock()
223✔
628

223✔
629
        m.feeRate = feeRate
223✔
630
}
223✔
631

632
// SetDustClosure sets the memoryMailBox's dustClosure for use in DustPackets.
633
func (m *memoryMailBox) SetDustClosure(isDust dustClosure) {
339✔
634
        m.pktCond.L.Lock()
339✔
635
        defer m.pktCond.L.Unlock()
339✔
636

339✔
637
        m.isDust = isDust
339✔
638
}
339✔
639

640
// DustPackets returns the dust sum for add packets in the mailbox. The first
641
// return value is the local dust sum and the second is the remote dust sum.
642
// This will keep track of a given dust HTLC from the time it is added via
643
// AddPacket until it is removed via AckPacket.
644
func (m *memoryMailBox) DustPackets() (lnwire.MilliSatoshi,
645
        lnwire.MilliSatoshi) {
1,066✔
646

1,066✔
647
        m.pktCond.L.Lock()
1,066✔
648
        defer m.pktCond.L.Unlock()
1,066✔
649

1,066✔
650
        var (
1,066✔
651
                localDustSum  lnwire.MilliSatoshi
1,066✔
652
                remoteDustSum lnwire.MilliSatoshi
1,066✔
653
        )
1,066✔
654

1,066✔
655
        // Run through the map of HTLC's and determine the dust sum with calls
1,066✔
656
        // to the memoryMailBox's isDust closure. Note that all mailbox packets
1,066✔
657
        // are outgoing so the second argument to isDust will be false.
1,066✔
658
        for _, e := range m.addIndex {
28,616✔
659
                addPkt := e.Value.(*pktWithExpiry).pkt
27,550✔
660

27,550✔
661
                // Evaluate whether this HTLC is dust on the local commitment.
27,550✔
662
                if m.isDust(
27,550✔
663
                        m.feeRate, false, true, addPkt.amount.ToSatoshis(),
27,550✔
664
                ) {
55,094✔
665

27,544✔
666
                        localDustSum += addPkt.amount
27,544✔
667
                }
27,544✔
668

669
                // Evaluate whether this HTLC is dust on the remote commitment.
670
                if m.isDust(
27,550✔
671
                        m.feeRate, false, false, addPkt.amount.ToSatoshis(),
27,550✔
672
                ) {
55,100✔
673

27,550✔
674
                        remoteDustSum += addPkt.amount
27,550✔
675
                }
27,550✔
676
        }
677

678
        return localDustSum, remoteDustSum
1,066✔
679
}
680

681
// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it
682
// from the in-memory replay buffer. This will prevent the packet from being
683
// delivered after the link restarts if the switch has remained online. The
684
// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd
685
// FailureDetail.
686
func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
51✔
687
        // First, remove the packet from mailbox. If we didn't find the packet
51✔
688
        // because it has already been acked, we'll exit early to avoid sending
51✔
689
        // a duplicate fail message through the switch.
51✔
690
        if !m.AckPacket(pkt.inKey()) {
56✔
691
                return
5✔
692
        }
5✔
693

694
        var (
46✔
695
                localFailure = false
46✔
696
                reason       lnwire.OpaqueReason
46✔
697
        )
46✔
698

46✔
699
        // Create a temporary channel failure which we will send back to our
46✔
700
        // peer if this is a forward, or report to the user if the failed
46✔
701
        // payment was locally initiated.
46✔
702
        failure := m.cfg.failMailboxUpdate(
46✔
703
                pkt.originalOutgoingChanID, m.cfg.shortChanID,
46✔
704
        )
46✔
705

46✔
706
        // If the payment was locally initiated (which is indicated by a nil
46✔
707
        // obfuscator), we do not need to encrypt it back to the sender.
46✔
708
        if pkt.obfuscator == nil {
86✔
709
                var b bytes.Buffer
40✔
710
                err := lnwire.EncodeFailure(&b, failure, 0)
40✔
711
                if err != nil {
40✔
712
                        log.Errorf("Unable to encode failure: %v", err)
×
713
                        return
×
714
                }
×
715
                reason = lnwire.OpaqueReason(b.Bytes())
40✔
716
                localFailure = true
40✔
717
        } else {
10✔
718
                // If the packet is part of a forward, (identified by a non-nil
10✔
719
                // obfuscator) we need to encrypt the error back to the source.
10✔
720
                var err error
10✔
721
                reason, err = pkt.obfuscator.EncryptFirstHop(failure)
10✔
722
                if err != nil {
10✔
723
                        log.Errorf("Unable to obfuscate error: %v", err)
×
724
                        return
×
725
                }
×
726
        }
727

728
        // Create a link error containing the temporary channel failure and a
729
        // detail which indicates the we failed to add the htlc.
730
        linkError := NewDetailedLinkError(
46✔
731
                failure, OutgoingFailureDownstreamHtlcAdd,
46✔
732
        )
46✔
733

46✔
734
        failPkt := &htlcPacket{
46✔
735
                incomingChanID: pkt.incomingChanID,
46✔
736
                incomingHTLCID: pkt.incomingHTLCID,
46✔
737
                circuit:        pkt.circuit,
46✔
738
                sourceRef:      pkt.sourceRef,
46✔
739
                hasSource:      true,
46✔
740
                localFailure:   localFailure,
46✔
741
                obfuscator:     pkt.obfuscator,
46✔
742
                linkFailure:    linkError,
46✔
743
                htlc: &lnwire.UpdateFailHTLC{
46✔
744
                        Reason: reason,
46✔
745
                },
46✔
746
        }
46✔
747

46✔
748
        if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
46✔
749
                log.Errorf("Unhandled error while reforwarding packets "+
×
750
                        "settle/fail over htlcswitch: %v", err)
×
751
        }
×
752
}
753

754
// MessageOutBox returns a channel that any new messages ready for delivery
755
// will be sent on.
756
//
757
// NOTE: This method is part of the MailBox interface.
758
func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
235✔
759
        return m.messageOutbox
235✔
760
}
235✔
761

762
// PacketOutBox returns a channel that any new packets ready for delivery will
763
// be sent on.
764
//
765
// NOTE: This method is part of the MailBox interface.
766
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
386✔
767
        return m.pktOutbox
386✔
768
}
386✔
769

770
// mailOrchestrator is responsible for coordinating the creation and lifecycle
771
// of mailboxes used within the switch. It supports the ability to create
772
// mailboxes, reassign their short channel id's, deliver htlc packets, and
773
// queue packets for mailboxes that have not been created due to a link's late
774
// registration.
775
type mailOrchestrator struct {
776
        mu sync.RWMutex
777

778
        cfg *mailOrchConfig
779

780
        // mailboxes caches exactly one mailbox for all known channels.
781
        mailboxes map[lnwire.ChannelID]MailBox
782

783
        // liveIndex maps a live short chan id to the primary mailbox key.
784
        // An index in liveIndex map is only entered under two conditions:
785
        //   1. A link has a non-zero short channel id at time of AddLink.
786
        //   2. A link receives a non-zero short channel via UpdateShortChanID.
787
        liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
788

789
        // TODO(conner): add another pair of indexes:
790
        //   chan_id -> short_chan_id
791
        //   short_chan_id -> mailbox
792
        // so that Deliver can lookup mailbox directly once live,
793
        // but still queryable by channel_id.
794

795
        // unclaimedPackets maps a live short chan id to queue of packets if no
796
        // mailbox has been created.
797
        unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
798
}
799

800
type mailOrchConfig struct {
801
        // forwardPackets send a varidic number of htlcPackets to the switch to
802
        // be routed. A quit channel should be provided so that the call can
803
        // properly exit during shutdown.
804
        forwardPackets func(chan struct{}, ...*htlcPacket) error
805

806
        // clock is a time source for the generated mailboxes.
807
        clock clock.Clock
808

809
        // expiry is the interval after which Adds will be cancelled if they
810
        // have not been yet been delivered. The computed deadline will expiry
811
        // this long after the Adds are added to a mailbox via AddPacket.
812
        expiry time.Duration
813

814
        // failMailboxUpdate is used to fail an expired HTLC and use the
815
        // correct SCID if the underlying channel uses aliases.
816
        failMailboxUpdate func(outScid,
817
                mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage
818
}
819

820
// newMailOrchestrator initializes a fresh mailOrchestrator.
821
func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator {
344✔
822
        return &mailOrchestrator{
344✔
823
                cfg:              cfg,
344✔
824
                mailboxes:        make(map[lnwire.ChannelID]MailBox),
344✔
825
                liveIndex:        make(map[lnwire.ShortChannelID]lnwire.ChannelID),
344✔
826
                unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
344✔
827
        }
344✔
828
}
344✔
829

830
// Stop instructs the orchestrator to stop all active mailboxes.
831
func (mo *mailOrchestrator) Stop() {
312✔
832
        for _, mailbox := range mo.mailboxes {
615✔
833
                mailbox.Stop()
303✔
834
        }
303✔
835
}
836

837
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
838
// creates and returns a new mailbox if none is found.
839
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID,
840
        shortChanID lnwire.ShortChannelID) MailBox {
1,392✔
841

1,392✔
842
        // First, try lookup the mailbox directly using only the shared mutex.
1,392✔
843
        mo.mu.RLock()
1,392✔
844
        mailbox, ok := mo.mailboxes[chanID]
1,392✔
845
        if ok {
2,461✔
846
                mo.mu.RUnlock()
1,069✔
847
                return mailbox
1,069✔
848
        }
1,069✔
849
        mo.mu.RUnlock()
327✔
850

327✔
851
        // Otherwise, we will try again with exclusive lock, creating a mailbox
327✔
852
        // if one still has not been created.
327✔
853
        mo.mu.Lock()
327✔
854
        mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID)
327✔
855
        mo.mu.Unlock()
327✔
856

327✔
857
        return mailbox
327✔
858
}
859

860
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
861
// given channel id. If none is found, a new one is creates, started, and
862
// recorded.
863
//
864
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
865
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
866
        chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox {
327✔
867

327✔
868
        mailbox, ok := mo.mailboxes[chanID]
327✔
869
        if !ok {
654✔
870
                mailbox = newMemoryMailBox(&mailBoxConfig{
327✔
871
                        shortChanID:       shortChanID,
327✔
872
                        forwardPackets:    mo.cfg.forwardPackets,
327✔
873
                        clock:             mo.cfg.clock,
327✔
874
                        expiry:            mo.cfg.expiry,
327✔
875
                        failMailboxUpdate: mo.cfg.failMailboxUpdate,
327✔
876
                })
327✔
877
                mailbox.Start()
327✔
878
                mo.mailboxes[chanID] = mailbox
327✔
879
        }
327✔
880

881
        return mailbox
327✔
882
}
883

884
// BindLiveShortChanID registers that messages bound for a particular short
885
// channel id should be forwarded to the mailbox corresponding to the given
886
// channel id. This method also checks to see if there are any unclaimed
887
// packets for this short_chan_id. If any are found, they are delivered to the
888
// mailbox and removed (marked as claimed).
889
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
890
        cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
338✔
891

338✔
892
        mo.mu.Lock()
338✔
893
        // Update the mapping from short channel id to mailbox's channel id.
338✔
894
        mo.liveIndex[sid] = cid
338✔
895

338✔
896
        // Retrieve any unclaimed packets destined for this mailbox.
338✔
897
        pkts := mo.unclaimedPackets[sid]
338✔
898
        delete(mo.unclaimedPackets, sid)
338✔
899
        mo.mu.Unlock()
338✔
900

338✔
901
        // Deliver the unclaimed packets.
338✔
902
        for _, pkt := range pkts {
347✔
903
                mailbox.AddPacket(pkt)
9✔
904
        }
9✔
905
}
906

907
// Deliver lookups the target mailbox using the live index from short_chan_id
908
// to channel_id. If the mailbox is found, the message is delivered directly.
909
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
910
// mailbox upon the subsequent call to BindLiveShortChanID.
911
func (mo *mailOrchestrator) Deliver(
912
        sid lnwire.ShortChannelID, pkt *htlcPacket) error {
93✔
913

93✔
914
        var (
93✔
915
                mailbox MailBox
93✔
916
                found   bool
93✔
917
        )
93✔
918

93✔
919
        // First, try to find the channel id for the target short_chan_id. If
93✔
920
        // the link is live, we will also look up the created mailbox.
93✔
921
        mo.mu.RLock()
93✔
922
        chanID, isLive := mo.liveIndex[sid]
93✔
923
        if isLive {
181✔
924
                mailbox, found = mo.mailboxes[chanID]
88✔
925
        }
88✔
926
        mo.mu.RUnlock()
93✔
927

93✔
928
        // The link is live and target mailbox was found, deliver immediately.
93✔
929
        if isLive && found {
181✔
930
                return mailbox.AddPacket(pkt)
88✔
931
        }
88✔
932

933
        // If we detected that the link has not been made live, we will acquire
934
        // the exclusive lock preemptively in order to queue this packet in the
935
        // list of unclaimed packets.
936
        mo.mu.Lock()
9✔
937

9✔
938
        // Double check to see if the mailbox has been not made live since the
9✔
939
        // release of the shared lock.
9✔
940
        //
9✔
941
        // NOTE: Checking again with the exclusive lock held prevents a race
9✔
942
        // condition where BindLiveShortChanID is interleaved between the
9✔
943
        // release of the shared lock, and acquiring the exclusive lock. The
9✔
944
        // result would be stuck packets, as they wouldn't be redelivered until
9✔
945
        // the next call to BindLiveShortChanID, which is expected to occur
9✔
946
        // infrequently.
9✔
947
        chanID, isLive = mo.liveIndex[sid]
9✔
948
        if isLive {
9✔
949
                // Reaching this point indicates the mailbox is actually live.
×
950
                // We'll try to load the mailbox using the fresh channel id.
×
951
                //
×
952
                // NOTE: This should never create a new mailbox, as the live
×
953
                // index should only be set if the mailbox had been initialized
×
954
                // beforehand.  However, this does ensure that this case is
×
955
                // handled properly in the event that it could happen.
×
956
                mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid)
×
957
                mo.mu.Unlock()
×
958

×
959
                // Deliver the packet to the mailbox if it was found or created.
×
960
                return mailbox.AddPacket(pkt)
×
961
        }
×
962

963
        // Finally, if the channel id is still not found in the live index,
964
        // we'll add this to the list of unclaimed packets. These will be
965
        // delivered upon the next call to BindLiveShortChanID.
966
        mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
9✔
967
        mo.mu.Unlock()
9✔
968

9✔
969
        return nil
9✔
970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc