• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.66
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/lntypes"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/queue"
16
        "github.com/lightningnetwork/lnd/record"
17
)
18

19
var (
20
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
21
        // be accepted or settled with not enough blocks remaining.
22
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
23

24
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
25
        // be accepted or settled with an amount that is too low.
26
        ErrInvoiceAmountTooLow = errors.New(
27
                "paid amount less than invoice amount",
28
        )
29

30
        // ErrShuttingDown is returned when an operation failed because the
31
        // invoice registry is shutting down.
32
        ErrShuttingDown = errors.New("invoice registry shutting down")
33
)
34

35
const (
36
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
37
        // are held while waiting for the other set members to arrive.
38
        DefaultHtlcHoldDuration = 120 * time.Second
39
)
40

41
// RegistryConfig contains the configuration parameters for invoice registry.
42
type RegistryConfig struct {
43
        // FinalCltvRejectDelta defines the number of blocks before the expiry
44
        // of the htlc where we no longer settle it as an exit hop and instead
45
        // cancel it back. Normally this value should be lower than the cltv
46
        // expiry of any invoice we create and the code effectuating this should
47
        // not be hit.
48
        FinalCltvRejectDelta int32
49

50
        // HtlcHoldDuration defines for how long mpp htlcs are held while
51
        // waiting for the other set members to arrive.
52
        HtlcHoldDuration time.Duration
53

54
        // Clock holds the clock implementation that is used to provide
55
        // Now() and TickAfter() and is useful to stub out the clock functions
56
        // during testing.
57
        Clock clock.Clock
58

59
        // AcceptKeySend indicates whether we want to accept spontaneous key
60
        // send payments.
61
        AcceptKeySend bool
62

63
        // AcceptAMP indicates whether we want to accept spontaneous AMP
64
        // payments.
65
        AcceptAMP bool
66

67
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
68
        // all canceled invoices upon start.
69
        GcCanceledInvoicesOnStartup bool
70

71
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
72
        // canceled invoices on the fly.
73
        GcCanceledInvoicesOnTheFly bool
74

75
        // KeysendHoldTime indicates for how long we want to accept and hold
76
        // spontaneous keysend payments.
77
        KeysendHoldTime time.Duration
78

79
        // HtlcInterceptor is an interface that allows the invoice registry to
80
        // let clients intercept invoices before they are settled.
81
        HtlcInterceptor HtlcInterceptor
82
}
83

84
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
85
// mpp htlcs for which the complete set didn't arrive in time.
86
type htlcReleaseEvent struct {
87
        // invoiceRef identifiers the invoice this htlc belongs to.
88
        invoiceRef InvoiceRef
89

90
        // key is the circuit key of the htlc to release.
91
        key CircuitKey
92

93
        // releaseTime is the time at which to release the htlc.
94
        releaseTime time.Time
95
}
96

97
// Less is used to order PriorityQueueItem's by their release time such that
98
// items with the older release time are at the top of the queue.
99
//
100
// NOTE: Part of the queue.PriorityQueueItem interface.
101
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
12✔
102
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
12✔
103
}
12✔
104

105
// InvoiceRegistry is a central registry of all the outstanding invoices
106
// created by the daemon. The registry is a thin wrapper around a map in order
107
// to ensure that all updates/reads are thread safe.
108
type InvoiceRegistry struct {
109
        started atomic.Bool
110
        stopped atomic.Bool
111

112
        sync.RWMutex
113

114
        nextClientID uint32 // must be used atomically
115

116
        idb InvoiceDB
117

118
        // cfg contains the registry's configuration parameters.
119
        cfg *RegistryConfig
120

121
        // notificationClientMux locks notificationClients and
122
        // singleNotificationClients. Using a separate mutex for these maps is
123
        // necessary to avoid deadlocks in the registry when processing invoice
124
        // events.
125
        notificationClientMux sync.RWMutex
126

127
        notificationClients map[uint32]*InvoiceSubscription
128

129
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
130
        // performance.
131
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
132

133
        // invoiceEvents is a single channel over which invoice updates are
134
        // carried.
135
        invoiceEvents chan *invoiceEvent
136

137
        // hodlSubscriptionsMux locks the hodlSubscriptions and
138
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
139
        // necessary to avoid deadlocks in the registry when processing invoice
140
        // events.
141
        hodlSubscriptionsMux sync.RWMutex
142

143
        // hodlSubscriptions is a map from a circuit key to a list of
144
        // subscribers. It is used for efficient notification of links.
145
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
146

147
        // reverseSubscriptions tracks circuit keys subscribed to per
148
        // subscriber. This is used to unsubscribe from all hashes efficiently.
149
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
150

151
        // htlcAutoReleaseChan contains the new htlcs that need to be
152
        // auto-released.
153
        htlcAutoReleaseChan chan *htlcReleaseEvent
154

155
        expiryWatcher *InvoiceExpiryWatcher
156

157
        wg   sync.WaitGroup
158
        quit chan struct{}
159
}
160

161
// NewRegistry creates a new invoice registry. The invoice registry
162
// wraps the persistent on-disk invoice storage with an additional in-memory
163
// layer. The in-memory layer is in place such that debug invoices can be added
164
// which are volatile yet available system wide within the daemon.
165
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
166
        cfg *RegistryConfig) *InvoiceRegistry {
645✔
167

645✔
168
        notificationClients := make(map[uint32]*InvoiceSubscription)
645✔
169
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
645✔
170
        return &InvoiceRegistry{
645✔
171
                idb:                       idb,
645✔
172
                notificationClients:       notificationClients,
645✔
173
                singleNotificationClients: singleNotificationClients,
645✔
174
                invoiceEvents:             make(chan *invoiceEvent, 100),
645✔
175
                hodlSubscriptions: make(
645✔
176
                        map[CircuitKey]map[chan<- interface{}]struct{},
645✔
177
                ),
645✔
178
                hodlReverseSubscriptions: make(
645✔
179
                        map[chan<- interface{}]map[CircuitKey]struct{},
645✔
180
                ),
645✔
181
                cfg:                 cfg,
645✔
182
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
645✔
183
                expiryWatcher:       expiryWatcher,
645✔
184
                quit:                make(chan struct{}),
645✔
185
        }
645✔
186
}
645✔
187

188
// scanInvoicesOnStart will scan all invoices on start and add active invoices
189
// to the invoice expiry watcher while also attempting to delete all canceled
190
// invoices.
191
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
645✔
192
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
645✔
193
        if err != nil {
645✔
194
                return err
×
195
        }
×
196

197
        var pending []invoiceExpiry
645✔
198
        for paymentHash, invoice := range pendingInvoices {
675✔
199
                invoice := invoice
30✔
200
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
30✔
201
                if expiryRef != nil {
60✔
202
                        pending = append(pending, expiryRef)
30✔
203
                }
30✔
204
        }
205

206
        log.Debugf("Adding %d pending invoices to the expiry watcher",
645✔
207
                len(pending))
645✔
208
        i.expiryWatcher.AddInvoices(pending...)
645✔
209

645✔
210
        if i.cfg.GcCanceledInvoicesOnStartup {
648✔
211
                log.Infof("Deleting canceled invoices")
3✔
212
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
213
                if err != nil {
3✔
214
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
215
                        return err
×
216
                }
×
217
        }
218

219
        return nil
645✔
220
}
221

222
// Start starts the registry and all goroutines it needs to carry out its task.
223
func (i *InvoiceRegistry) Start() error {
645✔
224
        var err error
645✔
225

645✔
226
        log.Info("InvoiceRegistry starting...")
645✔
227

645✔
228
        if i.started.Swap(true) {
645✔
229
                return fmt.Errorf("InvoiceRegistry started more than once")
×
230
        }
×
231
        // Start InvoiceExpiryWatcher and prepopulate it with existing
232
        // active invoices.
233
        err = i.expiryWatcher.Start(
645✔
234
                func(hash lntypes.Hash, force bool) error {
727✔
235
                        return i.cancelInvoiceImpl(
82✔
236
                                context.Background(), hash, force,
82✔
237
                        )
82✔
238
                })
82✔
239
        if err != nil {
645✔
240
                return err
×
241
        }
×
242

243
        i.wg.Add(1)
645✔
244
        go i.invoiceEventLoop()
645✔
245

645✔
246
        // Now scan all pending and removable invoices to the expiry
645✔
247
        // watcher or delete them.
645✔
248
        err = i.scanInvoicesOnStart(context.Background())
645✔
249
        if err != nil {
645✔
250
                _ = i.Stop()
×
251
        }
×
252

253
        log.Debug("InvoiceRegistry started")
645✔
254

645✔
255
        return err
645✔
256
}
257

258
// Stop signals the registry for a graceful shutdown.
259
func (i *InvoiceRegistry) Stop() error {
384✔
260
        log.Info("InvoiceRegistry shutting down...")
384✔
261

384✔
262
        if i.stopped.Swap(true) {
384✔
263
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
264
        }
×
265

266
        log.Info("InvoiceRegistry shutting down...")
384✔
267
        defer log.Debug("InvoiceRegistry shutdown complete")
384✔
268

384✔
269
        var err error
384✔
270
        if i.expiryWatcher == nil {
384✔
271
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
272
                        "initialized")
×
273
        } else {
384✔
274
                i.expiryWatcher.Stop()
384✔
275
        }
384✔
276

277
        close(i.quit)
384✔
278

384✔
279
        i.wg.Wait()
384✔
280

384✔
281
        log.Debug("InvoiceRegistry shutdown complete")
384✔
282

384✔
283
        return err
384✔
284
}
285

286
// invoiceEvent represents a new event that has modified on invoice on disk.
287
// Only two event types are currently supported: newly created invoices, and
288
// instance where invoices are settled.
289
type invoiceEvent struct {
290
        hash    lntypes.Hash
291
        invoice *Invoice
292
        setID   *[32]byte
293
}
294

295
// tickAt returns a channel that ticks at the specified time. If the time has
296
// already passed, it will tick immediately.
297
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
675✔
298
        now := i.cfg.Clock.Now()
675✔
299
        return i.cfg.Clock.TickAfter(t.Sub(now))
675✔
300
}
675✔
301

302
// invoiceEventLoop is the dedicated goroutine responsible for accepting
303
// new notification subscriptions, cancelling old subscriptions, and
304
// dispatching new invoice events.
305
func (i *InvoiceRegistry) invoiceEventLoop() {
645✔
306
        defer i.wg.Done()
645✔
307

645✔
308
        // Set up a heap for htlc auto-releases.
645✔
309
        autoReleaseHeap := &queue.PriorityQueue{}
645✔
310

645✔
311
        for {
3,077✔
312
                // If there is something to release, set up a release tick
2,432✔
313
                // channel.
2,432✔
314
                var nextReleaseTick <-chan time.Time
2,432✔
315
                if autoReleaseHeap.Len() > 0 {
3,107✔
316
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
675✔
317
                        nextReleaseTick = i.tickAt(head.releaseTime)
675✔
318
                }
675✔
319

320
                select {
2,432✔
321
                // A sub-systems has just modified the invoice state, so we'll
322
                // dispatch notifications to all registered clients.
323
                case event := <-i.invoiceEvents:
1,409✔
324
                        // For backwards compatibility, do not notify all
1,409✔
325
                        // invoice subscribers of cancel and accept events.
1,409✔
326
                        state := event.invoice.State
1,409✔
327
                        if state != ContractCanceled &&
1,409✔
328
                                state != ContractAccepted {
2,653✔
329

1,244✔
330
                                i.dispatchToClients(event)
1,244✔
331
                        }
1,244✔
332
                        i.dispatchToSingleClients(event)
1,409✔
333

334
                // A new htlc came in for auto-release.
335
                case event := <-i.htlcAutoReleaseChan:
348✔
336
                        log.Debugf("Scheduling auto-release for htlc: "+
348✔
337
                                "ref=%v, key=%v at %v",
348✔
338
                                event.invoiceRef, event.key, event.releaseTime)
348✔
339

348✔
340
                        // We use an independent timer for every htlc rather
348✔
341
                        // than a set timer that is reset with every htlc coming
348✔
342
                        // in. Otherwise the sender could keep resetting the
348✔
343
                        // timer until the broadcast window is entered and our
348✔
344
                        // channel is force closed.
348✔
345
                        autoReleaseHeap.Push(event)
348✔
346

347
                // The htlc at the top of the heap needs to be auto-released.
348
                case <-nextReleaseTick:
30✔
349
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
30✔
350
                        err := i.cancelSingleHtlc(
30✔
351
                                event.invoiceRef, event.key, ResultMppTimeout,
30✔
352
                        )
30✔
353
                        if err != nil {
30✔
354
                                log.Errorf("HTLC timer: %v", err)
×
355
                        }
×
356

357
                case <-i.quit:
384✔
358
                        return
384✔
359
                }
360
        }
361
}
362

363
// dispatchToSingleClients passes the supplied event to all notification
364
// clients that subscribed to all the invoice this event applies to.
365
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
1,409✔
366
        // Dispatch to single invoice subscribers.
1,409✔
367
        clients := i.copySingleClients()
1,409✔
368
        for _, client := range clients {
1,445✔
369
                payHash := client.invoiceRef.PayHash()
36✔
370

36✔
371
                if payHash == nil || *payHash != event.hash {
36✔
UNCOV
372
                        continue
×
373
                }
374

375
                select {
36✔
376
                case <-client.backlogDelivered:
36✔
377
                        // We won't deliver any events until the backlog has
378
                        // went through first.
379
                case <-i.quit:
×
380
                        return
×
381
                }
382

383
                client.notify(event)
36✔
384
        }
385
}
386

387
// dispatchToClients passes the supplied event to all notification clients that
388
// subscribed to all invoices. Add and settle indices are used to make sure
389
// that clients don't receive duplicate or unwanted events.
390
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
1,244✔
391
        invoice := event.invoice
1,244✔
392

1,244✔
393
        clients := i.copyClients()
1,244✔
394
        for clientID, client := range clients {
1,310✔
395
                // Before we dispatch this event, we'll check
66✔
396
                // to ensure that this client hasn't already
66✔
397
                // received this notification in order to
66✔
398
                // ensure we don't duplicate any events.
66✔
399

66✔
400
                // TODO(joostjager): Refactor switches.
66✔
401
                state := event.invoice.State
66✔
402
                switch {
66✔
403
                // If we've already sent this settle event to
404
                // the client, then we can skip this.
405
                case state == ContractSettled &&
406
                        client.settleIndex >= invoice.SettleIndex:
×
407
                        continue
×
408

409
                // Similarly, if we've already sent this add to
410
                // the client then we can skip this one, but only if this isn't
411
                // an AMP invoice. AMP invoices always remain in the settle
412
                // state as a base invoice.
413
                case event.setID == nil && state == ContractOpen &&
414
                        client.addIndex >= invoice.AddIndex:
×
415
                        continue
×
416

417
                // These two states should never happen, but we
418
                // log them just in case so we can detect this
419
                // instance.
420
                case state == ContractOpen &&
421
                        client.addIndex+1 != invoice.AddIndex:
7✔
422
                        log.Warnf("client=%v for invoice "+
7✔
423
                                "notifications missed an update, "+
7✔
424
                                "add_index=%v, new add event index=%v",
7✔
425
                                clientID, client.addIndex,
7✔
426
                                invoice.AddIndex)
7✔
427

428
                case state == ContractSettled &&
UNCOV
429
                        client.settleIndex+1 != invoice.SettleIndex:
×
UNCOV
430
                        log.Warnf("client=%v for invoice "+
×
UNCOV
431
                                "notifications missed an update, "+
×
UNCOV
432
                                "settle_index=%v, new settle event index=%v",
×
UNCOV
433
                                clientID, client.settleIndex,
×
UNCOV
434
                                invoice.SettleIndex)
×
435
                }
436

437
                select {
66✔
438
                case <-client.backlogDelivered:
66✔
439
                        // We won't deliver any events until the backlog has
440
                        // been processed.
441
                case <-i.quit:
×
442
                        return
×
443
                }
444

445
                err := client.notify(&invoiceEvent{
66✔
446
                        invoice: invoice,
66✔
447
                        setID:   event.setID,
66✔
448
                })
66✔
449
                if err != nil {
66✔
450
                        log.Errorf("Failed dispatching to client: %v", err)
×
451
                        return
×
452
                }
×
453

454
                // Each time we send a notification to a client, we'll record
455
                // the latest add/settle index it has. We'll use this to ensure
456
                // we don't send a notification twice, which can happen if a new
457
                // event is added while we're catching up a new client.
458
                invState := event.invoice.State
66✔
459
                switch {
66✔
460
                case invState == ContractSettled:
18✔
461
                        client.settleIndex = invoice.SettleIndex
18✔
462

463
                case invState == ContractOpen && event.setID == nil:
42✔
464
                        client.addIndex = invoice.AddIndex
42✔
465

466
                // If this is an AMP invoice, then we'll need to use the set ID
467
                // to keep track of the settle index of the client. AMP
468
                // invoices never go to the open state, but if a setID is
469
                // passed, then we know it was just settled and will track the
470
                // highest settle index so far.
471
                case invState == ContractOpen && event.setID != nil:
6✔
472
                        setID := *event.setID
6✔
473
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
6✔
474

475
                default:
×
476
                        log.Errorf("unexpected invoice state: %v",
×
477
                                event.invoice.State)
×
478
                }
479
        }
480
}
481

482
// deliverBacklogEvents will attempts to query the invoice database for any
483
// notifications that the client has missed since it reconnected last.
484
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
485
        client *InvoiceSubscription) error {
45✔
486

45✔
487
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
45✔
488
        if err != nil {
45✔
489
                return err
×
490
        }
×
491

492
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
45✔
493
        if err != nil {
45✔
494
                return err
×
495
        }
×
496

497
        // If we have any to deliver, then we'll append them to the end of the
498
        // notification queue in order to catch up the client before delivering
499
        // any new notifications.
500
        for _, addEvent := range addEvents {
45✔
UNCOV
501
                // We re-bind the loop variable to ensure we don't hold onto
×
UNCOV
502
                // the loop reference causing is to point to the same item.
×
UNCOV
503
                addEvent := addEvent
×
UNCOV
504

×
UNCOV
505
                select {
×
506
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
507
                        invoice: &addEvent,
UNCOV
508
                }:
×
509
                case <-i.quit:
×
510
                        return ErrShuttingDown
×
511
                }
512
        }
513

514
        for _, settleEvent := range settleEvents {
45✔
UNCOV
515
                // We re-bind the loop variable to ensure we don't hold onto
×
UNCOV
516
                // the loop reference causing is to point to the same item.
×
UNCOV
517
                settleEvent := settleEvent
×
UNCOV
518

×
UNCOV
519
                select {
×
520
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
521
                        invoice: &settleEvent,
UNCOV
522
                }:
×
523
                case <-i.quit:
×
524
                        return ErrShuttingDown
×
525
                }
526
        }
527

528
        return nil
45✔
529
}
530

531
// deliverSingleBacklogEvents will attempt to query the invoice database to
532
// retrieve the current invoice state and deliver this to the subscriber. Single
533
// invoice subscribers will always receive the current state right after
534
// subscribing. Only in case the invoice does not yet exist, nothing is sent
535
// yet.
536
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
537
        client *SingleInvoiceSubscription) error {
18✔
538

18✔
539
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
18✔
540

18✔
541
        // It is possible that the invoice does not exist yet, but the client is
18✔
542
        // already watching it in anticipation.
18✔
543
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
18✔
544
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
18✔
545
        if isNotFound || isNotCreated {
36✔
546
                return nil
18✔
547
        }
18✔
UNCOV
548
        if err != nil {
×
549
                return err
×
550
        }
×
551

UNCOV
552
        payHash := client.invoiceRef.PayHash()
×
UNCOV
553
        if payHash == nil {
×
554
                return nil
×
555
        }
×
556

UNCOV
557
        err = client.notify(&invoiceEvent{
×
UNCOV
558
                hash:    *payHash,
×
UNCOV
559
                invoice: &invoice,
×
UNCOV
560
        })
×
UNCOV
561
        if err != nil {
×
562
                return err
×
563
        }
×
564

UNCOV
565
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
×
UNCOV
566
                client.id, payHash)
×
UNCOV
567

×
UNCOV
568
        return nil
×
569
}
570

571
// AddInvoice adds a regular invoice for the specified amount, identified by
572
// the passed preimage. Additionally, any memo or receipt data provided will
573
// also be stored on-disk. Once this invoice is added, subsystems within the
574
// daemon add/forward HTLCs are able to obtain the proper preimage required for
575
// redemption in the case that we're the final destination. We also return the
576
// addIndex of the newly created invoice which monotonically increases for each
577
// new invoice added.  A side effect of this function is that it also sets
578
// AddIndex on the invoice argument.
579
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
580
        paymentHash lntypes.Hash) (uint64, error) {
729✔
581

729✔
582
        i.Lock()
729✔
583

729✔
584
        ref := InvoiceRefByHash(paymentHash)
729✔
585
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
729✔
586

729✔
587
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
729✔
588
        if err != nil {
744✔
589
                i.Unlock()
15✔
590
                return 0, err
15✔
591
        }
15✔
592

593
        // Now that we've added the invoice, we'll send dispatch a message to
594
        // notify the clients of this new invoice.
595
        i.notifyClients(paymentHash, invoice, nil)
714✔
596
        i.Unlock()
714✔
597

714✔
598
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
714✔
599
        // to avoid deadlock when a new invoice is added while an other is being
714✔
600
        // canceled.
714✔
601
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
714✔
602
        if invoiceExpiryRef != nil {
1,428✔
603
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
714✔
604
        }
714✔
605

606
        return addIndex, nil
714✔
607
}
608

609
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
610
// then we're able to pull the funds pending within an HTLC.
611
//
612
// TODO(roasbeef): ignore if settled?
613
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
614
        rHash lntypes.Hash) (Invoice, error) {
425✔
615

425✔
616
        // We'll check the database to see if there's an existing matching
425✔
617
        // invoice.
425✔
618
        ref := InvoiceRefByHash(rHash)
425✔
619
        return i.idb.LookupInvoice(ctx, ref)
425✔
620
}
425✔
621

622
// LookupInvoiceByRef looks up an invoice by the given reference, if found
623
// then we're able to pull the funds pending within an HTLC.
624
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
UNCOV
625
        ref InvoiceRef) (Invoice, error) {
×
UNCOV
626

×
UNCOV
627
        return i.idb.LookupInvoice(ctx, ref)
×
UNCOV
628
}
×
629

630
// startHtlcTimer starts a new timer via the invoice registry main loop that
631
// cancels a single htlc on an invoice when the htlc hold duration has passed.
632
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
633
        key CircuitKey, acceptTime time.Time) error {
348✔
634

348✔
635
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
348✔
636
        event := &htlcReleaseEvent{
348✔
637
                invoiceRef:  invoiceRef,
348✔
638
                key:         key,
348✔
639
                releaseTime: releaseTime,
348✔
640
        }
348✔
641

348✔
642
        select {
348✔
643
        case i.htlcAutoReleaseChan <- event:
348✔
644
                return nil
348✔
645

646
        case <-i.quit:
×
647
                return ErrShuttingDown
×
648
        }
649
}
650

651
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
652
// a resolution result which will be used to notify subscribed links and
653
// resolvers of the details of the htlc cancellation.
654
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
655
        key CircuitKey, result FailResolutionResult) error {
30✔
656

30✔
657
        updateInvoice := func(invoice *Invoice, setID *SetID) (
30✔
658
                *InvoiceUpdateDesc, error) {
60✔
659

30✔
660
                // Only allow individual htlc cancellation on open invoices.
30✔
661
                if invoice.State != ContractOpen {
37✔
662
                        log.Debugf("CancelSingleHtlc: cannot cancel htlc %v "+
7✔
663
                                "on invoice %v, invoice is no longer open", key,
7✔
664
                                invoiceRef)
7✔
665

7✔
666
                        return nil, nil
7✔
667
                }
7✔
668

669
                // Also for AMP invoices we fetch the relevant HTLCs, so
670
                // the HTLC should be found, otherwise we return an error.
671
                htlc, ok := invoice.Htlcs[key]
23✔
672
                if !ok {
23✔
673
                        return nil, fmt.Errorf("htlc %v not found on "+
×
674
                                "invoice %v", key, invoiceRef)
×
675
                }
×
676

677
                htlcState := htlc.State
23✔
678

23✔
679
                // Cancellation is only possible if the htlc wasn't already
23✔
680
                // resolved.
23✔
681
                if htlcState != HtlcStateAccepted {
25✔
682
                        log.Debugf("CancelSingleHtlc: htlc %v on invoice %v "+
2✔
683
                                "is already resolved", key, invoiceRef)
2✔
684

2✔
685
                        return nil, nil
2✔
686
                }
2✔
687

688
                log.Debugf("CancelSingleHtlc: cancelling htlc %v on invoice %v",
21✔
689
                        key, invoiceRef)
21✔
690

21✔
691
                // Return an update descriptor that cancels htlc and keeps
21✔
692
                // invoice open.
21✔
693
                canceledHtlcs := map[CircuitKey]struct{}{
21✔
694
                        key: {},
21✔
695
                }
21✔
696

21✔
697
                return &InvoiceUpdateDesc{
21✔
698
                        UpdateType:  CancelHTLCsUpdate,
21✔
699
                        CancelHtlcs: canceledHtlcs,
21✔
700
                        SetID:       setID,
21✔
701
                }, nil
21✔
702
        }
703

704
        // Try to mark the specified htlc as canceled in the invoice database.
705
        // Intercept the update descriptor to set the local updated variable. If
706
        // no invoice update is performed, we can return early.
707
        // setID is only set for AMP HTLCs, so it can be nil and it is expected
708
        // to be nil for non-AMP HTLCs.
709
        setID := (*SetID)(invoiceRef.SetID())
30✔
710

30✔
711
        var updated bool
30✔
712
        invoice, err := i.idb.UpdateInvoice(
30✔
713
                context.Background(), invoiceRef, setID,
30✔
714
                func(invoice *Invoice) (
30✔
715
                        *InvoiceUpdateDesc, error) {
60✔
716

30✔
717
                        updateDesc, err := updateInvoice(invoice, setID)
30✔
718
                        if err != nil {
30✔
719
                                return nil, err
×
720
                        }
×
721
                        updated = updateDesc != nil
30✔
722

30✔
723
                        return updateDesc, err
30✔
724
                },
725
        )
726
        if err != nil {
30✔
727
                return err
×
728
        }
×
729
        if !updated {
39✔
730
                return nil
9✔
731
        }
9✔
732

733
        // The invoice has been updated. Notify subscribers of the htlc
734
        // resolution.
735
        htlc, ok := invoice.Htlcs[key]
21✔
736
        if !ok {
21✔
737
                return fmt.Errorf("htlc %v not found", key)
×
738
        }
×
739
        if htlc.State == HtlcStateCanceled {
42✔
740
                resolution := NewFailResolution(
21✔
741
                        key, int32(htlc.AcceptHeight), result,
21✔
742
                )
21✔
743

21✔
744
                log.Debugf("Signaling htlc(%v) cancellation of invoice(%v) "+
21✔
745
                        "with resolution(%v) to the link subsystem", key,
21✔
746
                        invoiceRef, result)
21✔
747

21✔
748
                i.notifyHodlSubscribers(resolution)
21✔
749
        }
21✔
750

751
        return nil
21✔
752
}
753

754
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
755
// htlc.
756
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
18✔
757
        // Retrieve keysend record if present.
18✔
758
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
18✔
759
        if !ok {
18✔
UNCOV
760
                return nil
×
UNCOV
761
        }
×
762

763
        // Cancel htlc is preimage is invalid.
764
        preimage, err := lntypes.MakePreimage(preimageSlice)
18✔
765
        if err != nil {
21✔
766
                return err
3✔
767
        }
3✔
768
        if preimage.Hash() != ctx.hash {
15✔
769
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
770
                        preimage, ctx.hash)
×
771
        }
×
772

773
        // Only allow keysend for non-mpp payments.
774
        if ctx.mpp != nil {
15✔
775
                return errors.New("no mpp keysend supported")
×
776
        }
×
777

778
        // Create an invoice for the htlc amount.
779
        amt := ctx.amtPaid
15✔
780

15✔
781
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
15✔
782
        // be able to pay to it with keysend.
15✔
783
        rawFeatures := lnwire.NewRawFeatureVector(
15✔
784
                lnwire.TLVOnionPayloadRequired,
15✔
785
        )
15✔
786
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
15✔
787

15✔
788
        // Use the minimum block delta that we require for settling htlcs.
15✔
789
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
15✔
790

15✔
791
        // Pre-check expiry here to prevent inserting an invoice that will not
15✔
792
        // be settled.
15✔
793
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
15✔
794
                return errors.New("final expiry too soon")
×
795
        }
×
796

797
        // The invoice database indexes all invoices by payment address, however
798
        // legacy keysend payment do not have one. In order to avoid a new
799
        // payment type on-disk wrt. to indexing, we'll continue to insert a
800
        // blank payment address which is special cased in the insertion logic
801
        // to not be indexed. In the future, once AMP is merged, this should be
802
        // replaced by generating a random payment address on the behalf of the
803
        // sender.
804
        payAddr := BlankPayAddr
15✔
805

15✔
806
        // Create placeholder invoice.
15✔
807
        invoice := &Invoice{
15✔
808
                CreationDate: i.cfg.Clock.Now(),
15✔
809
                Terms: ContractTerm{
15✔
810
                        FinalCltvDelta:  finalCltvDelta,
15✔
811
                        Value:           amt,
15✔
812
                        PaymentPreimage: &preimage,
15✔
813
                        PaymentAddr:     payAddr,
15✔
814
                        Features:        features,
15✔
815
                },
15✔
816
        }
15✔
817

15✔
818
        if i.cfg.KeysendHoldTime != 0 {
21✔
819
                invoice.HodlInvoice = true
6✔
820
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
821
        }
6✔
822

823
        // Insert invoice into database. Ignore duplicates, because this
824
        // may be a replay.
825
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
15✔
826
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
15✔
827
                return err
×
828
        }
×
829

830
        return nil
15✔
831
}
832

833
// processAMP just-in-time inserts an invoice if this htlc is a keysend
834
// htlc.
835
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
27✔
836
        // AMP payments MUST also include an MPP record.
27✔
837
        if ctx.mpp == nil {
30✔
838
                return errors.New("no MPP record for AMP")
3✔
839
        }
3✔
840

841
        // Create an invoice for the total amount expected, provided in the MPP
842
        // record.
843
        amt := ctx.mpp.TotalMsat()
24✔
844

24✔
845
        // Set the TLV required and MPP optional features on the invoice. We'll
24✔
846
        // also make the AMP features required so that it can't be paid by
24✔
847
        // legacy or MPP htlcs.
24✔
848
        rawFeatures := lnwire.NewRawFeatureVector(
24✔
849
                lnwire.TLVOnionPayloadRequired,
24✔
850
                lnwire.PaymentAddrOptional,
24✔
851
                lnwire.AMPRequired,
24✔
852
        )
24✔
853
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
24✔
854

24✔
855
        // Use the minimum block delta that we require for settling htlcs.
24✔
856
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
24✔
857

24✔
858
        // Pre-check expiry here to prevent inserting an invoice that will not
24✔
859
        // be settled.
24✔
860
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
24✔
861
                return errors.New("final expiry too soon")
×
862
        }
×
863

864
        // We'll use the sender-generated payment address provided in the HTLC
865
        // to create our AMP invoice.
866
        payAddr := ctx.mpp.PaymentAddr()
24✔
867

24✔
868
        // Create placeholder invoice.
24✔
869
        invoice := &Invoice{
24✔
870
                CreationDate: i.cfg.Clock.Now(),
24✔
871
                Terms: ContractTerm{
24✔
872
                        FinalCltvDelta:  finalCltvDelta,
24✔
873
                        Value:           amt,
24✔
874
                        PaymentPreimage: nil,
24✔
875
                        PaymentAddr:     payAddr,
24✔
876
                        Features:        features,
24✔
877
                },
24✔
878
        }
24✔
879

24✔
880
        // Insert invoice into database. Ignore duplicates payment hashes and
24✔
881
        // payment addrs, this may be a replay or a different HTLC for the AMP
24✔
882
        // invoice.
24✔
883
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
24✔
884
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
24✔
885
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
24✔
886
        switch {
24✔
887
        case isDuplicatedInvoice || isDuplicatedPayAddr:
12✔
888
                return nil
12✔
889
        default:
12✔
890
                return err
12✔
891
        }
892
}
893

894
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
895
// describes how the htlc should be resolved.
896
//
897
// When the preimage of the invoice is not yet known (hodl invoice), this
898
// function moves the invoice to the accepted state. When SettleHoldInvoice is
899
// called later, a resolution message will be send back to the caller via the
900
// provided hodlChan. Invoice registry sends on this channel what action needs
901
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
902
// the channel is either buffered or received on from another goroutine to
903
// prevent deadlock.
904
//
905
// In the case that the htlc is part of a larger set of htlcs that pay to the
906
// same invoice (multi-path payment), the htlc is held until the set is
907
// complete. If the set doesn't fully arrive in time, a timer will cancel the
908
// held htlc.
909
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
910
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
911
        circuitKey CircuitKey, hodlChan chan<- interface{},
912
        wireCustomRecords lnwire.CustomRecords,
913
        payload Payload) (HtlcResolution, error) {
967✔
914

967✔
915
        // Create the update context containing the relevant details of the
967✔
916
        // incoming htlc.
967✔
917
        ctx := invoiceUpdateCtx{
967✔
918
                hash:                 rHash,
967✔
919
                circuitKey:           circuitKey,
967✔
920
                amtPaid:              amtPaid,
967✔
921
                expiry:               expiry,
967✔
922
                currentHeight:        currentHeight,
967✔
923
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
967✔
924
                wireCustomRecords:    wireCustomRecords,
967✔
925
                customRecords:        payload.CustomRecords(),
967✔
926
                mpp:                  payload.MultiPath(),
967✔
927
                amp:                  payload.AMPRecord(),
967✔
928
                metadata:             payload.Metadata(),
967✔
929
                pathID:               payload.PathID(),
967✔
930
                totalAmtMsat:         payload.TotalAmtMsat(),
967✔
931
        }
967✔
932

967✔
933
        switch {
967✔
934
        // If we are accepting spontaneous AMP payments and this payload
935
        // contains an AMP record, create an AMP invoice that will be settled
936
        // below.
937
        case i.cfg.AcceptAMP && ctx.amp != nil:
27✔
938
                err := i.processAMP(ctx)
27✔
939
                if err != nil {
30✔
940
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
941

3✔
942
                        return NewFailResolution(
3✔
943
                                circuitKey, currentHeight, ResultAmpError,
3✔
944
                        ), nil
3✔
945
                }
3✔
946

947
        // If we are accepting spontaneous keysend payments, create a regular
948
        // invoice that will be settled below. We also enforce that this is only
949
        // done when no AMP payload is present since it will only be settle-able
950
        // by regular HTLCs.
951
        case i.cfg.AcceptKeySend && ctx.amp == nil:
18✔
952
                err := i.processKeySend(ctx)
18✔
953
                if err != nil {
21✔
954
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
955

3✔
956
                        return NewFailResolution(
3✔
957
                                circuitKey, currentHeight, ResultKeySendError,
3✔
958
                        ), nil
3✔
959
                }
3✔
960
        }
961

962
        // Execute locked notify exit hop logic.
963
        i.Lock()
961✔
964
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
961✔
965
                &ctx, hodlChan,
961✔
966
        )
961✔
967
        i.Unlock()
961✔
968
        if err != nil {
961✔
969
                return nil, err
×
970
        }
×
971

972
        if invoiceToExpire != nil {
1,043✔
973
                i.expiryWatcher.AddInvoices(invoiceToExpire)
82✔
974
        }
82✔
975

976
        switch r := resolution.(type) {
961✔
977
        // The htlc is held. Start a timer outside the lock if the htlc should
978
        // be auto-released, because otherwise a deadlock may happen with the
979
        // main event loop.
980
        case *htlcAcceptResolution:
437✔
981
                if r.autoRelease {
785✔
982
                        var invRef InvoiceRef
348✔
983
                        if ctx.amp != nil {
375✔
984
                                invRef = InvoiceRefBySetID(*ctx.setID())
27✔
985
                        } else {
348✔
986
                                invRef = ctx.invoiceRef()
321✔
987
                        }
321✔
988

989
                        err := i.startHtlcTimer(
348✔
990
                                invRef, circuitKey, r.acceptTime,
348✔
991
                        )
348✔
992
                        if err != nil {
348✔
993
                                return nil, err
×
994
                        }
×
995
                }
996

997
                // We return a nil resolution because htlc acceptances are
998
                // represented as nil resolutions externally.
999
                // TODO(carla) update calling code to handle accept resolutions.
1000
                return nil, nil
437✔
1001

1002
        // A direct resolution was received for this htlc.
1003
        case HtlcResolution:
524✔
1004
                return r, nil
524✔
1005

1006
        // Fail if an unknown resolution type was received.
1007
        default:
×
1008
                return nil, errors.New("invalid resolution type")
×
1009
        }
1010
}
1011

1012
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1013
// that should be executed inside the registry lock. The returned invoiceExpiry
1014
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1015
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1016
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1017
        HtlcResolution, invoiceExpiry, error) {
961✔
1018

961✔
1019
        invoiceRef := ctx.invoiceRef()
961✔
1020

961✔
1021
        // This setID is only set for AMP HTLCs, so it can be nil and it is
961✔
1022
        // also expected to be nil for non-AMP HTLCs.
961✔
1023
        setID := (*SetID)(ctx.setID())
961✔
1024

961✔
1025
        // We need to look up the current state of the invoice in order to send
961✔
1026
        // the previously accepted/settled HTLCs to the interceptor.
961✔
1027
        existingInvoice, err := i.idb.LookupInvoice(
961✔
1028
                context.Background(), invoiceRef,
961✔
1029
        )
961✔
1030
        switch {
961✔
1031
        case errors.Is(err, ErrInvoiceNotFound) ||
1032
                errors.Is(err, ErrNoInvoicesCreated):
22✔
1033

22✔
1034
                // If the invoice was not found, return a failure resolution
22✔
1035
                // with an invoice not found result.
22✔
1036
                return NewFailResolution(
22✔
1037
                        ctx.circuitKey, ctx.currentHeight,
22✔
1038
                        ResultInvoiceNotFound,
22✔
1039
                ), nil, nil
22✔
1040

1041
        case err != nil:
×
1042
                ctx.log(err.Error())
×
1043
                return nil, nil, err
×
1044
        }
1045

1046
        var cancelSet bool
939✔
1047

939✔
1048
        // Provide the invoice to the settlement interceptor to allow
939✔
1049
        // the interceptor's client an opportunity to manipulate the
939✔
1050
        // settlement process.
939✔
1051
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
939✔
1052
                WireCustomRecords:  ctx.wireCustomRecords,
939✔
1053
                ExitHtlcCircuitKey: ctx.circuitKey,
939✔
1054
                ExitHtlcAmt:        ctx.amtPaid,
939✔
1055
                ExitHtlcExpiry:     ctx.expiry,
939✔
1056
                CurrentHeight:      uint32(ctx.currentHeight),
939✔
1057
                Invoice:            existingInvoice,
939✔
1058
        }, func(resp HtlcModifyResponse) {
942✔
1059
                log.Debugf("Received invoice HTLC interceptor response: %v",
3✔
1060
                        resp)
3✔
1061

3✔
1062
                if resp.AmountPaid != 0 {
3✔
UNCOV
1063
                        ctx.amtPaid = resp.AmountPaid
×
UNCOV
1064
                }
×
1065

1066
                cancelSet = resp.CancelSet
3✔
1067
        })
1068
        if err != nil {
939✔
1069
                err := fmt.Errorf("error during invoice HTLC interception: %w",
×
1070
                        err)
×
1071
                ctx.log(err.Error())
×
1072

×
1073
                return nil, nil, err
×
1074
        }
×
1075

1076
        // We'll attempt to settle an invoice matching this rHash on disk (if
1077
        // one exists). The callback will update the invoice state and/or htlcs.
1078
        var (
939✔
1079
                resolution        HtlcResolution
939✔
1080
                updateSubscribers bool
939✔
1081
        )
939✔
1082
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1,878✔
1083
                // First check if this is a replayed htlc and resolve it
939✔
1084
                // according to its current state. We cannot decide differently
939✔
1085
                // once the HTLC has already been processed before.
939✔
1086
                isReplayed, res, err := resolveReplayedHtlc(ctx, inv)
939✔
1087
                if err != nil {
939✔
1088
                        return nil, err
×
1089
                }
×
1090
                if isReplayed {
955✔
1091
                        resolution = res
16✔
1092
                        return nil, nil
16✔
1093
                }
16✔
1094

1095
                // In case the HTLC interceptor cancels the HTLC set, we do NOT
1096
                // cancel the invoice however we cancel the complete HTLC set.
1097
                if cancelSet {
926✔
1098
                        // If the invoice is not open, something is wrong, we
3✔
1099
                        // fail just the HTLC with the specific error.
3✔
1100
                        if inv.State != ContractOpen {
3✔
1101
                                log.Errorf("Invoice state (%v) is not OPEN, "+
×
1102
                                        "cancelling HTLC set not allowed by "+
×
1103
                                        "external source", inv.State)
×
1104

×
1105
                                resolution = NewFailResolution(
×
1106
                                        ctx.circuitKey, ctx.currentHeight,
×
1107
                                        ResultInvoiceNotOpen,
×
1108
                                )
×
1109

×
1110
                                return nil, nil
×
1111
                        }
×
1112

1113
                        // The error `ExternalValidationFailed` error
1114
                        // information will be packed in the
1115
                        // `FailIncorrectDetails` msg when sending the msg to
1116
                        // the peer. Error codes are defined by the BOLT 04
1117
                        // specification. The error text can be arbitrary
1118
                        // therefore we return a custom error msg.
1119
                        resolution = NewFailResolution(
3✔
1120
                                ctx.circuitKey, ctx.currentHeight,
3✔
1121
                                ExternalValidationFailed,
3✔
1122
                        )
3✔
1123

3✔
1124
                        // We cancel all HTLCs which are in the accepted state.
3✔
1125
                        //
3✔
1126
                        // NOTE: The current HTLC is not included because it
3✔
1127
                        // was never accepted in the first place.
3✔
1128
                        htlcs := inv.HTLCSet(ctx.setID(), HtlcStateAccepted)
3✔
1129
                        htlcKeys := fn.KeySet[CircuitKey](htlcs)
3✔
1130

3✔
1131
                        // The external source did cancel the htlc set, so we
3✔
1132
                        // cancel all HTLCs in the set. We however keep the
3✔
1133
                        // invoice in the open state.
3✔
1134
                        //
3✔
1135
                        // NOTE: The invoice event loop will still call the
3✔
1136
                        // `cancelSingleHTLC` method for MPP payments, however
3✔
1137
                        // because the HTLCs are already cancled back it will be
3✔
1138
                        // a NOOP.
3✔
1139
                        update := &InvoiceUpdateDesc{
3✔
1140
                                UpdateType:  CancelHTLCsUpdate,
3✔
1141
                                CancelHtlcs: htlcKeys,
3✔
1142
                                SetID:       setID,
3✔
1143
                        }
3✔
1144

3✔
1145
                        return update, nil
3✔
1146
                }
1147

1148
                updateDesc, res, err := updateInvoice(ctx, inv)
920✔
1149
                if err != nil {
920✔
1150
                        return nil, err
×
1151
                }
×
1152

1153
                // Set resolution in outer scope only after successful update.
1154
                resolution = res
920✔
1155

920✔
1156
                // Only send an update if the invoice state was changed.
920✔
1157
                updateSubscribers = updateDesc != nil &&
920✔
1158
                        updateDesc.State != nil
920✔
1159

920✔
1160
                return updateDesc, nil
920✔
1161
        }
1162

1163
        invoice, err := i.idb.UpdateInvoice(
939✔
1164
                context.Background(), invoiceRef, setID, callback,
939✔
1165
        )
939✔
1166

939✔
1167
        var duplicateSetIDErr ErrDuplicateSetID
939✔
1168
        if errors.As(err, &duplicateSetIDErr) {
939✔
1169
                return NewFailResolution(
×
1170
                        ctx.circuitKey, ctx.currentHeight,
×
1171
                        ResultInvoiceNotFound,
×
1172
                ), nil, nil
×
1173
        }
×
1174

1175
        switch {
939✔
1176
        case errors.Is(err, ErrInvoiceNotFound):
×
1177
                // If the invoice was not found, return a failure resolution
×
1178
                // with an invoice not found result.
×
1179
                return NewFailResolution(
×
1180
                        ctx.circuitKey, ctx.currentHeight,
×
1181
                        ResultInvoiceNotFound,
×
1182
                ), nil, nil
×
1183

1184
        case errors.Is(err, ErrInvRefEquivocation):
×
1185
                return NewFailResolution(
×
1186
                        ctx.circuitKey, ctx.currentHeight,
×
1187
                        ResultInvoiceNotFound,
×
1188
                ), nil, nil
×
1189

1190
        case err == nil:
939✔
1191

1192
        default:
×
1193
                ctx.log(err.Error())
×
1194
                return nil, nil, err
×
1195
        }
1196

1197
        var invoiceToExpire invoiceExpiry
939✔
1198

939✔
1199
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
939✔
1200

939✔
1201
        switch res := resolution.(type) {
939✔
1202
        case *HtlcFailResolution:
29✔
1203
                // Inspect latest htlc state on the invoice. If it is found,
29✔
1204
                // we will update the accept height as it was recorded in the
29✔
1205
                // invoice database (which occurs in the case where the htlc
29✔
1206
                // reached the database in a previous call). If the htlc was
29✔
1207
                // not found on the invoice, it was immediately failed so we
29✔
1208
                // send the failure resolution as is, which has the current
29✔
1209
                // height set as the accept height.
29✔
1210
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
29✔
1211
                if ok {
32✔
1212
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1213
                }
3✔
1214

1215
                ctx.log(fmt.Sprintf("failure resolution result "+
29✔
1216
                        "outcome: %v, at accept height: %v",
29✔
1217
                        res.Outcome, res.AcceptHeight))
29✔
1218

29✔
1219
                // Some failures apply to the entire HTLC set. Break here if
29✔
1220
                // this isn't one of them.
29✔
1221
                if !res.Outcome.IsSetFailure() {
49✔
1222
                        break
20✔
1223
                }
1224

1225
                // Also cancel any HTLCs in the HTLC set that are also in the
1226
                // canceled state with the same failure result.
1227
                setID := ctx.setID()
9✔
1228
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
9✔
1229
                for key, htlc := range canceledHtlcSet {
18✔
1230
                        htlcFailResolution := NewFailResolution(
9✔
1231
                                key, int32(htlc.AcceptHeight), res.Outcome,
9✔
1232
                        )
9✔
1233

9✔
1234
                        i.notifyHodlSubscribers(htlcFailResolution)
9✔
1235
                }
9✔
1236

1237
        // If the htlc was settled, we will settle any previously accepted
1238
        // htlcs and notify our peer to settle them.
1239
        case *HtlcSettleResolution:
473✔
1240
                ctx.log(fmt.Sprintf("settle resolution result "+
473✔
1241
                        "outcome: %v, at accept height: %v",
473✔
1242
                        res.Outcome, res.AcceptHeight))
473✔
1243

473✔
1244
                // Also settle any previously accepted htlcs. If a htlc is
473✔
1245
                // marked as settled, we should follow now and settle the htlc
473✔
1246
                // with our peer.
473✔
1247
                setID := ctx.setID()
473✔
1248
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
473✔
1249
                for key, htlc := range settledHtlcSet {
1,258✔
1250
                        preimage := res.Preimage
785✔
1251
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
797✔
1252
                                preimage = *htlc.AMP.Preimage
12✔
1253
                        }
12✔
1254

1255
                        // Notify subscribers that the htlcs should be settled
1256
                        // with our peer. Note that the outcome of the
1257
                        // resolution is set based on the outcome of the single
1258
                        // htlc that we just settled, so may not be accurate
1259
                        // for all htlcs.
1260
                        htlcSettleResolution := NewSettleResolution(
785✔
1261
                                preimage, key,
785✔
1262
                                int32(htlc.AcceptHeight), res.Outcome,
785✔
1263
                        )
785✔
1264

785✔
1265
                        // Notify subscribers that the htlc should be settled
785✔
1266
                        // with our peer.
785✔
1267
                        i.notifyHodlSubscribers(htlcSettleResolution)
785✔
1268
                }
1269

1270
                // If concurrent payments were attempted to this invoice before
1271
                // the current one was ultimately settled, cancel back any of
1272
                // the HTLCs immediately. As a result of the settle, the HTLCs
1273
                // in other HTLC sets are automatically converted to a canceled
1274
                // state when updating the invoice.
1275
                //
1276
                // TODO(roasbeef): can remove now??
1277
                canceledHtlcSet := invoice.HTLCSetCompliment(
473✔
1278
                        setID, HtlcStateCanceled,
473✔
1279
                )
473✔
1280
                for key, htlc := range canceledHtlcSet {
473✔
1281
                        htlcFailResolution := NewFailResolution(
×
1282
                                key, int32(htlc.AcceptHeight),
×
1283
                                ResultInvoiceAlreadySettled,
×
1284
                        )
×
1285

×
1286
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1287
                }
×
1288

1289
        // If we accepted the htlc, subscribe to the hodl invoice and return
1290
        // an accept resolution with the htlc's accept time on it.
1291
        case *htlcAcceptResolution:
437✔
1292
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
437✔
1293
                if !ok {
437✔
1294
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1295
                                " present on invoice: %x", ctx.circuitKey,
×
1296
                                ctx.hash[:])
×
1297
                }
×
1298

1299
                // Determine accepted height of this htlc. If the htlc reached
1300
                // the invoice database (possibly in a previous call to the
1301
                // invoice registry), we'll take the original accepted height
1302
                // as it was recorded in the database.
1303
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
437✔
1304

437✔
1305
                ctx.log(fmt.Sprintf("accept resolution result "+
437✔
1306
                        "outcome: %v, at accept height: %v",
437✔
1307
                        res.outcome, acceptHeight))
437✔
1308

437✔
1309
                // Auto-release the htlc if the invoice is still open. It can
437✔
1310
                // only happen for mpp payments that there are htlcs in state
437✔
1311
                // Accepted while the invoice is Open.
437✔
1312
                if invoice.State == ContractOpen {
785✔
1313
                        res.acceptTime = invoiceHtlc.AcceptTime
348✔
1314
                        res.autoRelease = true
348✔
1315
                }
348✔
1316

1317
                // If we have fully accepted the set of htlcs for this invoice,
1318
                // we can now add it to our invoice expiry watcher. We do not
1319
                // add invoices before they are fully accepted, because it is
1320
                // possible that we MppTimeout the htlcs, and then our relevant
1321
                // expiry height could change.
1322
                if res.outcome == resultAccepted {
519✔
1323
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
82✔
1324
                }
82✔
1325

1326
                // Subscribe to the resolution if the caller specified a
1327
                // notification channel.
1328
                if hodlChan != nil {
874✔
1329
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
437✔
1330
                }
437✔
1331

1332
        default:
×
1333
                panic("unknown action")
×
1334
        }
1335

1336
        // Now that the links have been notified of any state changes to their
1337
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1338
        // state changes.
1339
        if updateSubscribers {
1,491✔
1340
                // We'll add a setID onto the notification, but only if this is
552✔
1341
                // an AMP invoice being settled.
552✔
1342
                var setID *[32]byte
552✔
1343
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,016✔
1344
                        setID = ctx.setID()
464✔
1345
                }
464✔
1346

1347
                i.notifyClients(ctx.hash, invoice, setID)
552✔
1348
        }
1349

1350
        return resolution, invoiceToExpire, nil
939✔
1351
}
1352

1353
// SettleHodlInvoice sets the preimage of a hodl invoice.
1354
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1355
        preimage lntypes.Preimage) error {
69✔
1356

69✔
1357
        i.Lock()
69✔
1358
        defer i.Unlock()
69✔
1359

69✔
1360
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
138✔
1361
                switch invoice.State {
69✔
1362
                case ContractOpen:
×
1363
                        return nil, ErrInvoiceStillOpen
×
1364

1365
                case ContractCanceled:
×
1366
                        return nil, ErrInvoiceAlreadyCanceled
×
1367

1368
                case ContractSettled:
3✔
1369
                        return nil, ErrInvoiceAlreadySettled
3✔
1370
                }
1371

1372
                return &InvoiceUpdateDesc{
66✔
1373
                        UpdateType: SettleHodlInvoiceUpdate,
66✔
1374
                        State: &InvoiceStateUpdateDesc{
66✔
1375
                                NewState: ContractSettled,
66✔
1376
                                Preimage: &preimage,
66✔
1377
                        },
66✔
1378
                }, nil
66✔
1379
        }
1380

1381
        hash := preimage.Hash()
69✔
1382
        invoiceRef := InvoiceRefByHash(hash)
69✔
1383

69✔
1384
        // AMP hold invoices are not supported so we set the setID to nil.
69✔
1385
        // For non-AMP invoices this parameter is ignored during the fetching
69✔
1386
        // of the database state.
69✔
1387
        setID := (*SetID)(nil)
69✔
1388

69✔
1389
        invoice, err := i.idb.UpdateInvoice(
69✔
1390
                ctx, invoiceRef, setID, updateInvoice,
69✔
1391
        )
69✔
1392
        if err != nil {
72✔
1393
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1394
                        preimage, err)
3✔
1395

3✔
1396
                return err
3✔
1397
        }
3✔
1398

1399
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
66✔
1400
                invoice.Terms.PaymentPreimage)
66✔
1401

66✔
1402
        // In the callback, we marked the invoice as settled. UpdateInvoice will
66✔
1403
        // have seen this and should have moved all htlcs that were accepted to
66✔
1404
        // the settled state. In the loop below, we go through all of these and
66✔
1405
        // notify links and resolvers that are waiting for resolution. Any htlcs
66✔
1406
        // that were already settled before, will be notified again. This isn't
66✔
1407
        // necessary but doesn't hurt either.
66✔
1408
        for key, htlc := range invoice.Htlcs {
135✔
1409
                if htlc.State != HtlcStateSettled {
69✔
1410
                        continue
×
1411
                }
1412

1413
                resolution := NewSettleResolution(
69✔
1414
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
69✔
1415
                )
69✔
1416

69✔
1417
                i.notifyHodlSubscribers(resolution)
69✔
1418
        }
1419
        i.notifyClients(hash, invoice, nil)
66✔
1420

66✔
1421
        return nil
66✔
1422
}
1423

1424
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1425
// payment hash.
1426
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1427
        payHash lntypes.Hash) error {
29✔
1428

29✔
1429
        return i.cancelInvoiceImpl(ctx, payHash, true)
29✔
1430
}
29✔
1431

1432
// shouldCancel examines the state of an invoice and whether we want to
1433
// cancel already accepted invoices, taking our force cancel boolean into
1434
// account. This is pulled out into its own function so that tests that mock
1435
// cancelInvoiceImpl can reuse this logic.
1436
func shouldCancel(state ContractState, cancelAccepted bool) bool {
103✔
1437
        if state != ContractAccepted {
178✔
1438
                return true
75✔
1439
        }
75✔
1440

1441
        // If the invoice is accepted, we should only cancel if we want to
1442
        // force cancellation of accepted invoices.
1443
        return cancelAccepted
28✔
1444
}
1445

1446
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1447
// payment hash. Accepted invoices will only be canceled if explicitly
1448
// requested to do so. It notifies subscribing links and resolvers that
1449
// the associated htlcs were canceled if they change state.
1450
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1451
        payHash lntypes.Hash, cancelAccepted bool) error {
111✔
1452

111✔
1453
        i.Lock()
111✔
1454
        defer i.Unlock()
111✔
1455

111✔
1456
        ref := InvoiceRefByHash(payHash)
111✔
1457
        log.Debugf("Invoice%v: canceling invoice", ref)
111✔
1458

111✔
1459
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
214✔
1460
                if !shouldCancel(invoice.State, cancelAccepted) {
115✔
1461
                        return nil, nil
12✔
1462
                }
12✔
1463

1464
                // Move invoice to the canceled state. Rely on validation in
1465
                // channeldb to return an error if the invoice is already
1466
                // settled or canceled.
1467
                return &InvoiceUpdateDesc{
91✔
1468
                        UpdateType: CancelInvoiceUpdate,
91✔
1469
                        State: &InvoiceStateUpdateDesc{
91✔
1470
                                NewState: ContractCanceled,
91✔
1471
                        },
91✔
1472
                }, nil
91✔
1473
        }
1474

1475
        // If it's an AMP invoice we need to fetch all AMP HTLCs here so that
1476
        // we can cancel all of HTLCs which are in the accepted state across
1477
        // different setIDs.
1478
        setID := (*SetID)(nil)
111✔
1479
        invoiceRef := InvoiceRefByHash(payHash)
111✔
1480
        invoice, err := i.idb.UpdateInvoice(
111✔
1481
                ctx, invoiceRef, setID, updateInvoice,
111✔
1482
        )
111✔
1483

111✔
1484
        // Implement idempotency by returning success if the invoice was already
111✔
1485
        // canceled.
111✔
1486
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
114✔
1487
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1488
                return nil
3✔
1489
        }
3✔
1490
        if err != nil {
127✔
1491
                return err
19✔
1492
        }
19✔
1493

1494
        // Return without cancellation if the invoice state is ContractAccepted.
1495
        if invoice.State == ContractAccepted {
101✔
1496
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1497
                        "explicitly requested.", ref)
12✔
1498
                return nil
12✔
1499
        }
12✔
1500

1501
        log.Debugf("Invoice%v: canceled", ref)
77✔
1502

77✔
1503
        // In the callback, some htlcs may have been moved to the canceled
77✔
1504
        // state. We now go through all of these and notify links and resolvers
77✔
1505
        // that are waiting for resolution. Any htlcs that were already canceled
77✔
1506
        // before, will be notified again. This isn't necessary but doesn't hurt
77✔
1507
        // either.
77✔
1508
        // For AMP invoices we fetched all AMP HTLCs for all sub AMP invoices
77✔
1509
        // here so we can clean up all of them.
77✔
1510
        for key, htlc := range invoice.Htlcs {
120✔
1511
                if htlc.State != HtlcStateCanceled {
43✔
1512
                        continue
×
1513
                }
1514

1515
                i.notifyHodlSubscribers(
43✔
1516
                        NewFailResolution(
43✔
1517
                                key, int32(htlc.AcceptHeight), ResultCanceled,
43✔
1518
                        ),
43✔
1519
                )
43✔
1520
        }
1521

1522
        i.notifyClients(payHash, invoice, nil)
77✔
1523

77✔
1524
        // Attempt to also delete the invoice if requested through the registry
77✔
1525
        // config.
77✔
1526
        if i.cfg.GcCanceledInvoicesOnTheFly {
80✔
1527
                // Assemble the delete reference and attempt to delete through
3✔
1528
                // the invocice from the DB.
3✔
1529
                deleteRef := InvoiceDeleteRef{
3✔
1530
                        PayHash:     payHash,
3✔
1531
                        AddIndex:    invoice.AddIndex,
3✔
1532
                        SettleIndex: invoice.SettleIndex,
3✔
1533
                }
3✔
1534
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1535
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1536
                }
×
1537

1538
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1539
                // If by any chance deletion failed, then log it instead of
3✔
1540
                // returning the error, as the invoice itself has already been
3✔
1541
                // canceled.
3✔
1542
                if err != nil {
3✔
1543
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1544
                                err)
×
1545
                }
×
1546
        }
1547

1548
        return nil
77✔
1549
}
1550

1551
// notifyClients notifies all currently registered invoice notification clients
1552
// of a newly added/settled invoice.
1553
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1554
        invoice *Invoice, setID *[32]byte) {
1,409✔
1555

1,409✔
1556
        event := &invoiceEvent{
1,409✔
1557
                invoice: invoice,
1,409✔
1558
                hash:    hash,
1,409✔
1559
                setID:   setID,
1,409✔
1560
        }
1,409✔
1561

1,409✔
1562
        select {
1,409✔
1563
        case i.invoiceEvents <- event:
1,409✔
1564
        case <-i.quit:
×
1565
        }
1566
}
1567

1568
// invoiceSubscriptionKit defines that are common to both all invoice
1569
// subscribers and single invoice subscribers.
1570
type invoiceSubscriptionKit struct {
1571
        id uint32 // nolint:structcheck
1572

1573
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1574
        quit chan struct{}
1575

1576
        ntfnQueue *queue.ConcurrentQueue
1577

1578
        canceled   uint32 // To be used atomically.
1579
        cancelChan chan struct{}
1580

1581
        // backlogDelivered is closed when the backlog events have been
1582
        // delivered.
1583
        backlogDelivered chan struct{}
1584
}
1585

1586
// InvoiceSubscription represents an intent to receive updates for newly added
1587
// or settled invoices. For each newly added invoice, a copy of the invoice
1588
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1589
// invoice, a copy of the invoice will be sent over the SettledInvoices
1590
// channel.
1591
type InvoiceSubscription struct {
1592
        invoiceSubscriptionKit
1593

1594
        // NewInvoices is a channel that we'll use to send all newly created
1595
        // invoices with an invoice index greater than the specified
1596
        // StartingInvoiceIndex field.
1597
        NewInvoices chan *Invoice
1598

1599
        // SettledInvoices is a channel that we'll use to send all settled
1600
        // invoices with an invoices index greater than the specified
1601
        // StartingInvoiceIndex field.
1602
        SettledInvoices chan *Invoice
1603

1604
        // addIndex is the highest add index the caller knows of. We'll use
1605
        // this information to send out an event backlog to the notifications
1606
        // subscriber. Any new add events with an index greater than this will
1607
        // be dispatched before any new notifications are sent out.
1608
        addIndex uint64
1609

1610
        // settleIndex is the highest settle index the caller knows of. We'll
1611
        // use this information to send out an event backlog to the
1612
        // notifications subscriber. Any new settle events with an index
1613
        // greater than this will be dispatched before any new notifications
1614
        // are sent out.
1615
        settleIndex uint64
1616
}
1617

1618
// SingleInvoiceSubscription represents an intent to receive updates for a
1619
// specific invoice.
1620
type SingleInvoiceSubscription struct {
1621
        invoiceSubscriptionKit
1622

1623
        invoiceRef InvoiceRef
1624

1625
        // Updates is a channel that we'll use to send all invoice events for
1626
        // the invoice that is subscribed to.
1627
        Updates chan *Invoice
1628
}
1629

1630
// PayHash returns the optional payment hash of the target invoice.
1631
//
1632
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1633
// be deleted as soon as invoiceregistery_test is in the same module.
1634
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1635
        return s.invoiceRef.PayHash()
18✔
1636
}
18✔
1637

1638
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1639
// resources.
1640
func (i *invoiceSubscriptionKit) Cancel() {
63✔
1641
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
63✔
1642
                return
×
1643
        }
×
1644

1645
        i.ntfnQueue.Stop()
63✔
1646
        close(i.cancelChan)
63✔
1647
}
1648

1649
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
102✔
1650
        select {
102✔
1651
        case i.ntfnQueue.ChanIn() <- event:
102✔
1652

1653
        case <-i.cancelChan:
×
1654
                // This can only be triggered by delivery of non-backlog
×
1655
                // events.
×
1656
                return ErrShuttingDown
×
1657
        case <-i.quit:
×
1658
                return ErrShuttingDown
×
1659
        }
1660

1661
        return nil
102✔
1662
}
1663

1664
// SubscribeNotifications returns an InvoiceSubscription which allows the
1665
// caller to receive async notifications when any invoices are settled or
1666
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1667
// by first sending out all new events with an invoice index _greater_ than
1668
// this value. Afterwards, we'll send out real-time notifications.
1669
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1670
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
45✔
1671

45✔
1672
        client := &InvoiceSubscription{
45✔
1673
                NewInvoices:     make(chan *Invoice),
45✔
1674
                SettledInvoices: make(chan *Invoice),
45✔
1675
                addIndex:        addIndex,
45✔
1676
                settleIndex:     settleIndex,
45✔
1677
                invoiceSubscriptionKit: invoiceSubscriptionKit{
45✔
1678
                        quit:             i.quit,
45✔
1679
                        ntfnQueue:        queue.NewConcurrentQueue(20),
45✔
1680
                        cancelChan:       make(chan struct{}),
45✔
1681
                        backlogDelivered: make(chan struct{}),
45✔
1682
                },
45✔
1683
        }
45✔
1684
        client.ntfnQueue.Start()
45✔
1685

45✔
1686
        // This notifies other goroutines that the backlog phase is over.
45✔
1687
        defer close(client.backlogDelivered)
45✔
1688

45✔
1689
        // Always increment by 1 first, and our client ID will start with 1,
45✔
1690
        // not 0.
45✔
1691
        client.id = atomic.AddUint32(&i.nextClientID, 1)
45✔
1692

45✔
1693
        // Before we register this new invoice subscription, we'll launch a new
45✔
1694
        // goroutine that will proxy all notifications appended to the end of
45✔
1695
        // the concurrent queue to the two client-side channels the caller will
45✔
1696
        // feed off of.
45✔
1697
        i.wg.Add(1)
45✔
1698
        go func() {
90✔
1699
                defer i.wg.Done()
45✔
1700
                defer i.deleteClient(client.id)
45✔
1701

45✔
1702
                for {
156✔
1703
                        select {
111✔
1704
                        // A new invoice event has been sent by the
1705
                        // invoiceRegistry! We'll figure out if this is an add
1706
                        // event or a settle event, then dispatch the event to
1707
                        // the client.
1708
                        case ntfn := <-client.ntfnQueue.ChanOut():
66✔
1709
                                invoiceEvent := ntfn.(*invoiceEvent)
66✔
1710

66✔
1711
                                var targetChan chan *Invoice
66✔
1712
                                state := invoiceEvent.invoice.State
66✔
1713
                                switch {
66✔
1714
                                // AMP invoices never move to settled, but will
1715
                                // be sent with a set ID if an HTLC set is
1716
                                // being settled.
1717
                                case state == ContractOpen &&
1718
                                        invoiceEvent.setID != nil:
6✔
1719
                                        fallthrough
6✔
1720

1721
                                case state == ContractSettled:
24✔
1722
                                        targetChan = client.SettledInvoices
24✔
1723

1724
                                case state == ContractOpen:
42✔
1725
                                        targetChan = client.NewInvoices
42✔
1726

1727
                                default:
×
1728
                                        log.Errorf("unknown invoice state: %v",
×
1729
                                                state)
×
1730

×
1731
                                        continue
×
1732
                                }
1733

1734
                                select {
66✔
1735
                                case targetChan <- invoiceEvent.invoice:
66✔
1736

1737
                                case <-client.cancelChan:
×
1738
                                        return
×
1739

1740
                                case <-i.quit:
×
1741
                                        return
×
1742
                                }
1743

1744
                        case <-client.cancelChan:
45✔
1745
                                return
45✔
1746

UNCOV
1747
                        case <-i.quit:
×
UNCOV
1748
                                return
×
1749
                        }
1750
                }
1751
        }()
1752

1753
        i.notificationClientMux.Lock()
45✔
1754
        i.notificationClients[client.id] = client
45✔
1755
        i.notificationClientMux.Unlock()
45✔
1756

45✔
1757
        // Query the database to see if based on the provided addIndex and
45✔
1758
        // settledIndex we need to deliver any backlog notifications.
45✔
1759
        err := i.deliverBacklogEvents(ctx, client)
45✔
1760
        if err != nil {
45✔
1761
                return nil, err
×
1762
        }
×
1763

1764
        log.Infof("New invoice subscription client: id=%v", client.id)
45✔
1765

45✔
1766
        return client, nil
45✔
1767
}
1768

1769
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1770
// caller to receive async notifications for a specific invoice.
1771
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1772
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
18✔
1773

18✔
1774
        client := &SingleInvoiceSubscription{
18✔
1775
                Updates: make(chan *Invoice),
18✔
1776
                invoiceSubscriptionKit: invoiceSubscriptionKit{
18✔
1777
                        quit:             i.quit,
18✔
1778
                        ntfnQueue:        queue.NewConcurrentQueue(20),
18✔
1779
                        cancelChan:       make(chan struct{}),
18✔
1780
                        backlogDelivered: make(chan struct{}),
18✔
1781
                },
18✔
1782
                invoiceRef: InvoiceRefByHash(hash),
18✔
1783
        }
18✔
1784
        client.ntfnQueue.Start()
18✔
1785

18✔
1786
        // This notifies other goroutines that the backlog phase is done.
18✔
1787
        defer close(client.backlogDelivered)
18✔
1788

18✔
1789
        // Always increment by 1 first, and our client ID will start with 1,
18✔
1790
        // not 0.
18✔
1791
        client.id = atomic.AddUint32(&i.nextClientID, 1)
18✔
1792

18✔
1793
        // Before we register this new invoice subscription, we'll launch a new
18✔
1794
        // goroutine that will proxy all notifications appended to the end of
18✔
1795
        // the concurrent queue to the two client-side channels the caller will
18✔
1796
        // feed off of.
18✔
1797
        i.wg.Add(1)
18✔
1798
        go func() {
36✔
1799
                defer i.wg.Done()
18✔
1800
                defer i.deleteClient(client.id)
18✔
1801

18✔
1802
                for {
72✔
1803
                        select {
54✔
1804
                        // A new invoice event has been sent by the
1805
                        // invoiceRegistry. We will dispatch the event to the
1806
                        // client.
1807
                        case ntfn := <-client.ntfnQueue.ChanOut():
36✔
1808
                                invoiceEvent := ntfn.(*invoiceEvent)
36✔
1809

36✔
1810
                                select {
36✔
1811
                                case client.Updates <- invoiceEvent.invoice:
36✔
1812

1813
                                case <-client.cancelChan:
×
1814
                                        return
×
1815

1816
                                case <-i.quit:
×
1817
                                        return
×
1818
                                }
1819

1820
                        case <-client.cancelChan:
16✔
1821
                                return
16✔
1822

1823
                        case <-i.quit:
2✔
1824
                                return
2✔
1825
                        }
1826
                }
1827
        }()
1828

1829
        i.notificationClientMux.Lock()
18✔
1830
        i.singleNotificationClients[client.id] = client
18✔
1831
        i.notificationClientMux.Unlock()
18✔
1832

18✔
1833
        err := i.deliverSingleBacklogEvents(ctx, client)
18✔
1834
        if err != nil {
18✔
1835
                return nil, err
×
1836
        }
×
1837

1838
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
18✔
1839
                client.id, client.invoiceRef)
18✔
1840

18✔
1841
        return client, nil
18✔
1842
}
1843

1844
// notifyHodlSubscribers sends out the htlc resolution to all current
1845
// subscribers.
1846
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
927✔
1847
        i.hodlSubscriptionsMux.Lock()
927✔
1848
        defer i.hodlSubscriptionsMux.Unlock()
927✔
1849

927✔
1850
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
927✔
1851
        if !ok {
1,424✔
1852
                return
497✔
1853
        }
497✔
1854

1855
        // Notify all interested subscribers and remove subscription from both
1856
        // maps. The subscription can be removed as there only ever will be a
1857
        // single resolution for each hash.
1858
        for subscriber := range subscribers {
860✔
1859
                select {
430✔
1860
                case subscriber <- htlcResolution:
430✔
1861
                case <-i.quit:
×
1862
                        return
×
1863
                }
1864

1865
                delete(
430✔
1866
                        i.hodlReverseSubscriptions[subscriber],
430✔
1867
                        htlcResolution.CircuitKey(),
430✔
1868
                )
430✔
1869
        }
1870

1871
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
430✔
1872
}
1873

1874
// hodlSubscribe adds a new invoice subscription.
1875
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1876
        circuitKey CircuitKey) {
437✔
1877

437✔
1878
        i.hodlSubscriptionsMux.Lock()
437✔
1879
        defer i.hodlSubscriptionsMux.Unlock()
437✔
1880

437✔
1881
        log.Debugf("Hodl subscribe for %v", circuitKey)
437✔
1882

437✔
1883
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
437✔
1884
        if !ok {
867✔
1885
                subscriptions = make(map[chan<- interface{}]struct{})
430✔
1886
                i.hodlSubscriptions[circuitKey] = subscriptions
430✔
1887
        }
430✔
1888
        subscriptions[subscriber] = struct{}{}
437✔
1889

437✔
1890
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
437✔
1891
        if !ok {
812✔
1892
                reverseSubscriptions = make(map[CircuitKey]struct{})
375✔
1893
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
375✔
1894
        }
375✔
1895
        reverseSubscriptions[circuitKey] = struct{}{}
437✔
1896
}
1897

1898
// HodlUnsubscribeAll cancels the subscription.
1899
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
202✔
1900
        i.hodlSubscriptionsMux.Lock()
202✔
1901
        defer i.hodlSubscriptionsMux.Unlock()
202✔
1902

202✔
1903
        hashes := i.hodlReverseSubscriptions[subscriber]
202✔
1904
        for hash := range hashes {
203✔
1905
                delete(i.hodlSubscriptions[hash], subscriber)
1✔
1906
        }
1✔
1907

1908
        delete(i.hodlReverseSubscriptions, subscriber)
202✔
1909
}
1910

1911
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1912
// useful when we need to iterate the map to send notifications.
1913
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
1,409✔
1914
        i.notificationClientMux.RLock()
1,409✔
1915
        defer i.notificationClientMux.RUnlock()
1,409✔
1916

1,409✔
1917
        clients := make(map[uint32]*SingleInvoiceSubscription)
1,409✔
1918
        for k, v := range i.singleNotificationClients {
1,445✔
1919
                clients[k] = v
36✔
1920
        }
36✔
1921
        return clients
1,409✔
1922
}
1923

1924
// copyClients copies i.notificationClients inside a lock. This is useful when
1925
// we need to iterate the map to send notifications.
1926
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
1,244✔
1927
        i.notificationClientMux.RLock()
1,244✔
1928
        defer i.notificationClientMux.RUnlock()
1,244✔
1929

1,244✔
1930
        clients := make(map[uint32]*InvoiceSubscription)
1,244✔
1931
        for k, v := range i.notificationClients {
1,310✔
1932
                clients[k] = v
66✔
1933
        }
66✔
1934
        return clients
1,244✔
1935
}
1936

1937
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1938
// not found.
1939
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
63✔
1940
        i.notificationClientMux.Lock()
63✔
1941
        defer i.notificationClientMux.Unlock()
63✔
1942

63✔
1943
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
63✔
1944
        delete(i.notificationClients, clientID)
63✔
1945
        delete(i.singleNotificationClients, clientID)
63✔
1946
}
63✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc