• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/lntypes"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/queue"
16
        "github.com/lightningnetwork/lnd/record"
17
)
18

19
var (
20
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
21
        // be accepted or settled with not enough blocks remaining.
22
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
23

24
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
25
        // be accepted or settled with an amount that is too low.
26
        ErrInvoiceAmountTooLow = errors.New(
27
                "paid amount less than invoice amount",
28
        )
29

30
        // ErrShuttingDown is returned when an operation failed because the
31
        // invoice registry is shutting down.
32
        ErrShuttingDown = errors.New("invoice registry shutting down")
33
)
34

35
const (
36
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
37
        // are held while waiting for the other set members to arrive.
38
        DefaultHtlcHoldDuration = 120 * time.Second
39
)
40

41
// RegistryConfig contains the configuration parameters for invoice registry.
42
type RegistryConfig struct {
43
        // FinalCltvRejectDelta defines the number of blocks before the expiry
44
        // of the htlc where we no longer settle it as an exit hop and instead
45
        // cancel it back. Normally this value should be lower than the cltv
46
        // expiry of any invoice we create and the code effectuating this should
47
        // not be hit.
48
        FinalCltvRejectDelta int32
49

50
        // HtlcHoldDuration defines for how long mpp htlcs are held while
51
        // waiting for the other set members to arrive.
52
        HtlcHoldDuration time.Duration
53

54
        // Clock holds the clock implementation that is used to provide
55
        // Now() and TickAfter() and is useful to stub out the clock functions
56
        // during testing.
57
        Clock clock.Clock
58

59
        // AcceptKeySend indicates whether we want to accept spontaneous key
60
        // send payments.
61
        AcceptKeySend bool
62

63
        // AcceptAMP indicates whether we want to accept spontaneous AMP
64
        // payments.
65
        AcceptAMP bool
66

67
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
68
        // all canceled invoices upon start.
69
        GcCanceledInvoicesOnStartup bool
70

71
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
72
        // canceled invoices on the fly.
73
        GcCanceledInvoicesOnTheFly bool
74

75
        // KeysendHoldTime indicates for how long we want to accept and hold
76
        // spontaneous keysend payments.
77
        KeysendHoldTime time.Duration
78

79
        // HtlcInterceptor is an interface that allows the invoice registry to
80
        // let clients intercept invoices before they are settled.
81
        HtlcInterceptor HtlcInterceptor
82
}
83

84
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
85
// mpp htlcs for which the complete set didn't arrive in time.
86
type htlcReleaseEvent struct {
87
        // invoiceRef identifiers the invoice this htlc belongs to.
88
        invoiceRef InvoiceRef
89

90
        // key is the circuit key of the htlc to release.
91
        key CircuitKey
92

93
        // releaseTime is the time at which to release the htlc.
94
        releaseTime time.Time
95
}
96

97
// Less is used to order PriorityQueueItem's by their release time such that
98
// items with the older release time are at the top of the queue.
99
//
100
// NOTE: Part of the queue.PriorityQueueItem interface.
101
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
12✔
102
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
12✔
103
}
12✔
104

105
// InvoiceRegistry is a central registry of all the outstanding invoices
106
// created by the daemon. The registry is a thin wrapper around a map in order
107
// to ensure that all updates/reads are thread safe.
108
type InvoiceRegistry struct {
109
        started atomic.Bool
110
        stopped atomic.Bool
111

112
        sync.RWMutex
113

114
        nextClientID uint32 // must be used atomically
115

116
        idb InvoiceDB
117

118
        // cfg contains the registry's configuration parameters.
119
        cfg *RegistryConfig
120

121
        // notificationClientMux locks notificationClients and
122
        // singleNotificationClients. Using a separate mutex for these maps is
123
        // necessary to avoid deadlocks in the registry when processing invoice
124
        // events.
125
        notificationClientMux sync.RWMutex
126

127
        notificationClients map[uint32]*InvoiceSubscription
128

129
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
130
        // performance.
131
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
132

133
        // invoiceEvents is a single channel over which invoice updates are
134
        // carried.
135
        invoiceEvents chan *invoiceEvent
136

137
        // hodlSubscriptionsMux locks the hodlSubscriptions and
138
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
139
        // necessary to avoid deadlocks in the registry when processing invoice
140
        // events.
141
        hodlSubscriptionsMux sync.RWMutex
142

143
        // hodlSubscriptions is a map from a circuit key to a list of
144
        // subscribers. It is used for efficient notification of links.
145
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
146

147
        // reverseSubscriptions tracks circuit keys subscribed to per
148
        // subscriber. This is used to unsubscribe from all hashes efficiently.
149
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
150

151
        // htlcAutoReleaseChan contains the new htlcs that need to be
152
        // auto-released.
153
        htlcAutoReleaseChan chan *htlcReleaseEvent
154

155
        expiryWatcher *InvoiceExpiryWatcher
156

157
        wg   sync.WaitGroup
158
        quit chan struct{}
159
}
160

161
// NewRegistry creates a new invoice registry. The invoice registry
162
// wraps the persistent on-disk invoice storage with an additional in-memory
163
// layer. The in-memory layer is in place such that debug invoices can be added
164
// which are volatile yet available system wide within the daemon.
165
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
166
        cfg *RegistryConfig) *InvoiceRegistry {
645✔
167

645✔
168
        notificationClients := make(map[uint32]*InvoiceSubscription)
645✔
169
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
645✔
170
        return &InvoiceRegistry{
645✔
171
                idb:                       idb,
645✔
172
                notificationClients:       notificationClients,
645✔
173
                singleNotificationClients: singleNotificationClients,
645✔
174
                invoiceEvents:             make(chan *invoiceEvent, 100),
645✔
175
                hodlSubscriptions: make(
645✔
176
                        map[CircuitKey]map[chan<- interface{}]struct{},
645✔
177
                ),
645✔
178
                hodlReverseSubscriptions: make(
645✔
179
                        map[chan<- interface{}]map[CircuitKey]struct{},
645✔
180
                ),
645✔
181
                cfg:                 cfg,
645✔
182
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
645✔
183
                expiryWatcher:       expiryWatcher,
645✔
184
                quit:                make(chan struct{}),
645✔
185
        }
645✔
186
}
645✔
187

188
// scanInvoicesOnStart will scan all invoices on start and add active invoices
189
// to the invoice expiry watcher while also attempting to delete all canceled
190
// invoices.
191
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
645✔
192
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
645✔
193
        if err != nil {
645✔
194
                return err
×
195
        }
×
196

197
        var pending []invoiceExpiry
645✔
198
        for paymentHash, invoice := range pendingInvoices {
675✔
199
                invoice := invoice
30✔
200
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
30✔
201
                if expiryRef != nil {
60✔
202
                        pending = append(pending, expiryRef)
30✔
203
                }
30✔
204
        }
205

206
        log.Debugf("Adding %d pending invoices to the expiry watcher",
645✔
207
                len(pending))
645✔
208
        i.expiryWatcher.AddInvoices(pending...)
645✔
209

645✔
210
        if i.cfg.GcCanceledInvoicesOnStartup {
648✔
211
                log.Infof("Deleting canceled invoices")
3✔
212
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
213
                if err != nil {
3✔
214
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
215
                        return err
×
216
                }
×
217
        }
218

219
        return nil
645✔
220
}
221

222
// Start starts the registry and all goroutines it needs to carry out its task.
223
func (i *InvoiceRegistry) Start() error {
645✔
224
        var err error
645✔
225

645✔
226
        log.Info("InvoiceRegistry starting...")
645✔
227

645✔
228
        if i.started.Swap(true) {
645✔
229
                return fmt.Errorf("InvoiceRegistry started more than once")
×
230
        }
×
231
        // Start InvoiceExpiryWatcher and prepopulate it with existing
232
        // active invoices.
233
        err = i.expiryWatcher.Start(
645✔
234
                func(hash lntypes.Hash, force bool) error {
728✔
235
                        return i.cancelInvoiceImpl(
83✔
236
                                context.Background(), hash, force,
83✔
237
                        )
83✔
238
                })
83✔
239
        if err != nil {
645✔
240
                return err
×
241
        }
×
242

243
        i.wg.Add(1)
645✔
244
        go i.invoiceEventLoop()
645✔
245

645✔
246
        // Now scan all pending and removable invoices to the expiry
645✔
247
        // watcher or delete them.
645✔
248
        err = i.scanInvoicesOnStart(context.Background())
645✔
249
        if err != nil {
645✔
250
                _ = i.Stop()
×
251
        }
×
252

253
        log.Debug("InvoiceRegistry started")
645✔
254

645✔
255
        return err
645✔
256
}
257

258
// Stop signals the registry for a graceful shutdown.
259
func (i *InvoiceRegistry) Stop() error {
384✔
260
        log.Info("InvoiceRegistry shutting down...")
384✔
261

384✔
262
        if i.stopped.Swap(true) {
384✔
263
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
264
        }
×
265

266
        log.Info("InvoiceRegistry shutting down...")
384✔
267
        defer log.Debug("InvoiceRegistry shutdown complete")
384✔
268

384✔
269
        var err error
384✔
270
        if i.expiryWatcher == nil {
384✔
271
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
272
                        "initialized")
×
273
        } else {
384✔
274
                i.expiryWatcher.Stop()
384✔
275
        }
384✔
276

277
        close(i.quit)
384✔
278

384✔
279
        i.wg.Wait()
384✔
280

384✔
281
        log.Debug("InvoiceRegistry shutdown complete")
384✔
282

384✔
283
        return err
384✔
284
}
285

286
// invoiceEvent represents a new event that has modified on invoice on disk.
287
// Only two event types are currently supported: newly created invoices, and
288
// instance where invoices are settled.
289
type invoiceEvent struct {
290
        hash    lntypes.Hash
291
        invoice *Invoice
292
        setID   *[32]byte
293
}
294

295
// tickAt returns a channel that ticks at the specified time. If the time has
296
// already passed, it will tick immediately.
297
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
675✔
298
        now := i.cfg.Clock.Now()
675✔
299
        return i.cfg.Clock.TickAfter(t.Sub(now))
675✔
300
}
675✔
301

302
// invoiceEventLoop is the dedicated goroutine responsible for accepting
303
// new notification subscriptions, cancelling old subscriptions, and
304
// dispatching new invoice events.
305
func (i *InvoiceRegistry) invoiceEventLoop() {
645✔
306
        defer i.wg.Done()
645✔
307

645✔
308
        // Set up a heap for htlc auto-releases.
645✔
309
        autoReleaseHeap := &queue.PriorityQueue{}
645✔
310

645✔
311
        for {
3,077✔
312
                // If there is something to release, set up a release tick
2,432✔
313
                // channel.
2,432✔
314
                var nextReleaseTick <-chan time.Time
2,432✔
315
                if autoReleaseHeap.Len() > 0 {
3,107✔
316
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
675✔
317
                        nextReleaseTick = i.tickAt(head.releaseTime)
675✔
318
                }
675✔
319

320
                select {
2,432✔
321
                // A sub-systems has just modified the invoice state, so we'll
322
                // dispatch notifications to all registered clients.
323
                case event := <-i.invoiceEvents:
1,409✔
324
                        // For backwards compatibility, do not notify all
1,409✔
325
                        // invoice subscribers of cancel and accept events.
1,409✔
326
                        state := event.invoice.State
1,409✔
327
                        if state != ContractCanceled &&
1,409✔
328
                                state != ContractAccepted {
2,653✔
329

1,244✔
330
                                i.dispatchToClients(event)
1,244✔
331
                        }
1,244✔
332
                        i.dispatchToSingleClients(event)
1,409✔
333

334
                // A new htlc came in for auto-release.
335
                case event := <-i.htlcAutoReleaseChan:
348✔
336
                        log.Debugf("Scheduling auto-release for htlc: "+
348✔
337
                                "ref=%v, key=%v at %v",
348✔
338
                                event.invoiceRef, event.key, event.releaseTime)
348✔
339

348✔
340
                        // We use an independent timer for every htlc rather
348✔
341
                        // than a set timer that is reset with every htlc coming
348✔
342
                        // in. Otherwise the sender could keep resetting the
348✔
343
                        // timer until the broadcast window is entered and our
348✔
344
                        // channel is force closed.
348✔
345
                        autoReleaseHeap.Push(event)
348✔
346

347
                // The htlc at the top of the heap needs to be auto-released.
348
                case <-nextReleaseTick:
30✔
349
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
30✔
350
                        err := i.cancelSingleHtlc(
30✔
351
                                event.invoiceRef, event.key, ResultMppTimeout,
30✔
352
                        )
30✔
353
                        if err != nil {
30✔
354
                                log.Errorf("HTLC timer: %v", err)
×
355
                        }
×
356

357
                case <-i.quit:
384✔
358
                        return
384✔
359
                }
360
        }
361
}
362

363
// dispatchToSingleClients passes the supplied event to all notification
364
// clients that subscribed to all the invoice this event applies to.
365
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
1,409✔
366
        // Dispatch to single invoice subscribers.
1,409✔
367
        clients := i.copySingleClients()
1,409✔
368
        for _, client := range clients {
1,445✔
369
                payHash := client.invoiceRef.PayHash()
36✔
370

36✔
371
                if payHash == nil || *payHash != event.hash {
36✔
372
                        continue
×
373
                }
374

375
                select {
36✔
376
                case <-client.backlogDelivered:
36✔
377
                        // We won't deliver any events until the backlog has
378
                        // went through first.
379
                case <-i.quit:
×
380
                        return
×
381
                }
382

383
                client.notify(event)
36✔
384
        }
385
}
386

387
// dispatchToClients passes the supplied event to all notification clients that
388
// subscribed to all invoices. Add and settle indices are used to make sure
389
// that clients don't receive duplicate or unwanted events.
390
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
1,244✔
391
        invoice := event.invoice
1,244✔
392

1,244✔
393
        clients := i.copyClients()
1,244✔
394
        for clientID, client := range clients {
1,310✔
395
                // Before we dispatch this event, we'll check
66✔
396
                // to ensure that this client hasn't already
66✔
397
                // received this notification in order to
66✔
398
                // ensure we don't duplicate any events.
66✔
399

66✔
400
                // TODO(joostjager): Refactor switches.
66✔
401
                state := event.invoice.State
66✔
402
                switch {
66✔
403
                // If we've already sent this settle event to
404
                // the client, then we can skip this.
405
                case state == ContractSettled &&
406
                        client.settleIndex >= invoice.SettleIndex:
×
407
                        continue
×
408

409
                // Similarly, if we've already sent this add to
410
                // the client then we can skip this one, but only if this isn't
411
                // an AMP invoice. AMP invoices always remain in the settle
412
                // state as a base invoice.
413
                case event.setID == nil && state == ContractOpen &&
414
                        client.addIndex >= invoice.AddIndex:
×
415
                        continue
×
416

417
                // These two states should never happen, but we
418
                // log them just in case so we can detect this
419
                // instance.
420
                case state == ContractOpen &&
421
                        client.addIndex+1 != invoice.AddIndex:
7✔
422
                        log.Warnf("client=%v for invoice "+
7✔
423
                                "notifications missed an update, "+
7✔
424
                                "add_index=%v, new add event index=%v",
7✔
425
                                clientID, client.addIndex,
7✔
426
                                invoice.AddIndex)
7✔
427

428
                case state == ContractSettled &&
429
                        client.settleIndex+1 != invoice.SettleIndex:
×
430
                        log.Warnf("client=%v for invoice "+
×
431
                                "notifications missed an update, "+
×
432
                                "settle_index=%v, new settle event index=%v",
×
433
                                clientID, client.settleIndex,
×
434
                                invoice.SettleIndex)
×
435
                }
436

437
                select {
66✔
438
                case <-client.backlogDelivered:
66✔
439
                        // We won't deliver any events until the backlog has
440
                        // been processed.
441
                case <-i.quit:
×
442
                        return
×
443
                }
444

445
                err := client.notify(&invoiceEvent{
66✔
446
                        invoice: invoice,
66✔
447
                        setID:   event.setID,
66✔
448
                })
66✔
449
                if err != nil {
66✔
450
                        log.Errorf("Failed dispatching to client: %v", err)
×
451
                        return
×
452
                }
×
453

454
                // Each time we send a notification to a client, we'll record
455
                // the latest add/settle index it has. We'll use this to ensure
456
                // we don't send a notification twice, which can happen if a new
457
                // event is added while we're catching up a new client.
458
                invState := event.invoice.State
66✔
459
                switch {
66✔
460
                case invState == ContractSettled:
18✔
461
                        client.settleIndex = invoice.SettleIndex
18✔
462

463
                case invState == ContractOpen && event.setID == nil:
42✔
464
                        client.addIndex = invoice.AddIndex
42✔
465

466
                // If this is an AMP invoice, then we'll need to use the set ID
467
                // to keep track of the settle index of the client. AMP
468
                // invoices never go to the open state, but if a setID is
469
                // passed, then we know it was just settled and will track the
470
                // highest settle index so far.
471
                case invState == ContractOpen && event.setID != nil:
6✔
472
                        setID := *event.setID
6✔
473
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
6✔
474

475
                default:
×
476
                        log.Errorf("unexpected invoice state: %v",
×
477
                                event.invoice.State)
×
478
                }
479
        }
480
}
481

482
// deliverBacklogEvents will attempts to query the invoice database for any
483
// notifications that the client has missed since it reconnected last.
484
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
485
        client *InvoiceSubscription) error {
45✔
486

45✔
487
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
45✔
488
        if err != nil {
45✔
489
                return err
×
490
        }
×
491

492
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
45✔
493
        if err != nil {
45✔
494
                return err
×
495
        }
×
496

497
        // If we have any to deliver, then we'll append them to the end of the
498
        // notification queue in order to catch up the client before delivering
499
        // any new notifications.
500
        for _, addEvent := range addEvents {
45✔
501
                // We re-bind the loop variable to ensure we don't hold onto
×
502
                // the loop reference causing is to point to the same item.
×
503
                addEvent := addEvent
×
504

×
505
                select {
×
506
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
507
                        invoice: &addEvent,
508
                }:
×
509
                case <-i.quit:
×
510
                        return ErrShuttingDown
×
511
                }
512
        }
513

514
        for _, settleEvent := range settleEvents {
45✔
515
                // We re-bind the loop variable to ensure we don't hold onto
×
516
                // the loop reference causing is to point to the same item.
×
517
                settleEvent := settleEvent
×
518

×
519
                select {
×
520
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
521
                        invoice: &settleEvent,
522
                }:
×
523
                case <-i.quit:
×
524
                        return ErrShuttingDown
×
525
                }
526
        }
527

528
        return nil
45✔
529
}
530

531
// deliverSingleBacklogEvents will attempt to query the invoice database to
532
// retrieve the current invoice state and deliver this to the subscriber. Single
533
// invoice subscribers will always receive the current state right after
534
// subscribing. Only in case the invoice does not yet exist, nothing is sent
535
// yet.
536
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
537
        client *SingleInvoiceSubscription) error {
18✔
538

18✔
539
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
18✔
540

18✔
541
        // It is possible that the invoice does not exist yet, but the client is
18✔
542
        // already watching it in anticipation.
18✔
543
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
18✔
544
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
18✔
545
        if isNotFound || isNotCreated {
36✔
546
                return nil
18✔
547
        }
18✔
548
        if err != nil {
×
549
                return err
×
550
        }
×
551

552
        payHash := client.invoiceRef.PayHash()
×
553
        if payHash == nil {
×
554
                return nil
×
555
        }
×
556

557
        err = client.notify(&invoiceEvent{
×
558
                hash:    *payHash,
×
559
                invoice: &invoice,
×
560
        })
×
561
        if err != nil {
×
562
                return err
×
563
        }
×
564

565
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
×
566
                client.id, payHash)
×
567

×
568
        return nil
×
569
}
570

571
// AddInvoice adds a regular invoice for the specified amount, identified by
572
// the passed preimage. Additionally, any memo or receipt data provided will
573
// also be stored on-disk. Once this invoice is added, subsystems within the
574
// daemon add/forward HTLCs are able to obtain the proper preimage required for
575
// redemption in the case that we're the final destination. We also return the
576
// addIndex of the newly created invoice which monotonically increases for each
577
// new invoice added.  A side effect of this function is that it also sets
578
// AddIndex on the invoice argument.
579
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
580
        paymentHash lntypes.Hash) (uint64, error) {
729✔
581

729✔
582
        i.Lock()
729✔
583

729✔
584
        ref := InvoiceRefByHash(paymentHash)
729✔
585
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
729✔
586

729✔
587
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
729✔
588
        if err != nil {
744✔
589
                i.Unlock()
15✔
590
                return 0, err
15✔
591
        }
15✔
592

593
        // Now that we've added the invoice, we'll send dispatch a message to
594
        // notify the clients of this new invoice.
595
        i.notifyClients(paymentHash, invoice, nil)
714✔
596
        i.Unlock()
714✔
597

714✔
598
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
714✔
599
        // to avoid deadlock when a new invoice is added while an other is being
714✔
600
        // canceled.
714✔
601
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
714✔
602
        if invoiceExpiryRef != nil {
1,428✔
603
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
714✔
604
        }
714✔
605

606
        return addIndex, nil
714✔
607
}
608

609
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
610
// then we're able to pull the funds pending within an HTLC.
611
//
612
// TODO(roasbeef): ignore if settled?
613
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
614
        rHash lntypes.Hash) (Invoice, error) {
425✔
615

425✔
616
        // We'll check the database to see if there's an existing matching
425✔
617
        // invoice.
425✔
618
        ref := InvoiceRefByHash(rHash)
425✔
619
        return i.idb.LookupInvoice(ctx, ref)
425✔
620
}
425✔
621

622
// LookupInvoiceByRef looks up an invoice by the given reference, if found
623
// then we're able to pull the funds pending within an HTLC.
624
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
625
        ref InvoiceRef) (Invoice, error) {
×
626

×
627
        return i.idb.LookupInvoice(ctx, ref)
×
628
}
×
629

630
// startHtlcTimer starts a new timer via the invoice registry main loop that
631
// cancels a single htlc on an invoice when the htlc hold duration has passed.
632
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
633
        key CircuitKey, acceptTime time.Time) error {
348✔
634

348✔
635
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
348✔
636
        event := &htlcReleaseEvent{
348✔
637
                invoiceRef:  invoiceRef,
348✔
638
                key:         key,
348✔
639
                releaseTime: releaseTime,
348✔
640
        }
348✔
641

348✔
642
        select {
348✔
643
        case i.htlcAutoReleaseChan <- event:
348✔
644
                return nil
348✔
645

646
        case <-i.quit:
×
647
                return ErrShuttingDown
×
648
        }
649
}
650

651
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
652
// a resolution result which will be used to notify subscribed links and
653
// resolvers of the details of the htlc cancellation.
654
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
655
        key CircuitKey, result FailResolutionResult) error {
30✔
656

30✔
657
        updateInvoice := func(invoice *Invoice, setID *SetID) (
30✔
658
                *InvoiceUpdateDesc, error) {
60✔
659

30✔
660
                // Only allow individual htlc cancellation on open invoices.
30✔
661
                if invoice.State != ContractOpen {
36✔
662
                        log.Debugf("CancelSingleHtlc: cannot cancel htlc %v "+
6✔
663
                                "on invoice %v, invoice is no longer open", key,
6✔
664
                                invoiceRef)
6✔
665

6✔
666
                        return nil, nil
6✔
667
                }
6✔
668

669
                // Also for AMP invoices we fetch the relevant HTLCs, so
670
                // the HTLC should be found, otherwise we return an error.
671
                htlc, ok := invoice.Htlcs[key]
24✔
672
                if !ok {
24✔
673
                        return nil, fmt.Errorf("htlc %v not found on "+
×
674
                                "invoice %v", key, invoiceRef)
×
675
                }
×
676

677
                htlcState := htlc.State
24✔
678

24✔
679
                // Cancellation is only possible if the htlc wasn't already
24✔
680
                // resolved.
24✔
681
                if htlcState != HtlcStateAccepted {
27✔
682
                        log.Debugf("CancelSingleHtlc: htlc %v on invoice %v "+
3✔
683
                                "is already resolved", key, invoiceRef)
3✔
684

3✔
685
                        return nil, nil
3✔
686
                }
3✔
687

688
                log.Debugf("CancelSingleHtlc: cancelling htlc %v on invoice %v",
21✔
689
                        key, invoiceRef)
21✔
690

21✔
691
                // Return an update descriptor that cancels htlc and keeps
21✔
692
                // invoice open.
21✔
693
                canceledHtlcs := map[CircuitKey]struct{}{
21✔
694
                        key: {},
21✔
695
                }
21✔
696

21✔
697
                return &InvoiceUpdateDesc{
21✔
698
                        UpdateType:  CancelHTLCsUpdate,
21✔
699
                        CancelHtlcs: canceledHtlcs,
21✔
700
                        SetID:       setID,
21✔
701
                }, nil
21✔
702
        }
703

704
        // Try to mark the specified htlc as canceled in the invoice database.
705
        // Intercept the update descriptor to set the local updated variable. If
706
        // no invoice update is performed, we can return early.
707
        // setID is only set for AMP HTLCs, so it can be nil and it is expected
708
        // to be nil for non-AMP HTLCs.
709
        setID := (*SetID)(invoiceRef.SetID())
30✔
710

30✔
711
        var updated bool
30✔
712
        invoice, err := i.idb.UpdateInvoice(
30✔
713
                context.Background(), invoiceRef, setID,
30✔
714
                func(invoice *Invoice) (
30✔
715
                        *InvoiceUpdateDesc, error) {
60✔
716

30✔
717
                        updateDesc, err := updateInvoice(invoice, setID)
30✔
718
                        if err != nil {
30✔
719
                                return nil, err
×
720
                        }
×
721
                        updated = updateDesc != nil
30✔
722

30✔
723
                        return updateDesc, err
30✔
724
                },
725
        )
726
        if err != nil {
30✔
727
                return err
×
728
        }
×
729
        if !updated {
39✔
730
                return nil
9✔
731
        }
9✔
732

733
        // The invoice has been updated. Notify subscribers of the htlc
734
        // resolution.
735
        htlc, ok := invoice.Htlcs[key]
21✔
736
        if !ok {
21✔
737
                return fmt.Errorf("htlc %v not found", key)
×
738
        }
×
739
        if htlc.State == HtlcStateCanceled {
42✔
740
                resolution := NewFailResolution(
21✔
741
                        key, int32(htlc.AcceptHeight), result,
21✔
742
                )
21✔
743

21✔
744
                log.Debugf("Signaling htlc(%v) cancellation of invoice(%v) "+
21✔
745
                        "with resolution(%v) to the link subsystem", key,
21✔
746
                        invoiceRef, result)
21✔
747

21✔
748
                i.notifyHodlSubscribers(resolution)
21✔
749
        }
21✔
750

751
        return nil
21✔
752
}
753

754
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
755
// htlc.
756
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
18✔
757
        // Retrieve keysend record if present.
18✔
758
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
18✔
759
        if !ok {
18✔
760
                return nil
×
761
        }
×
762

763
        // Cancel htlc is preimage is invalid.
764
        preimage, err := lntypes.MakePreimage(preimageSlice)
18✔
765
        if err != nil {
21✔
766
                return err
3✔
767
        }
3✔
768
        if preimage.Hash() != ctx.hash {
15✔
769
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
770
                        preimage, ctx.hash)
×
771
        }
×
772

773
        // Only allow keysend for non-mpp payments.
774
        if ctx.mpp != nil {
15✔
775
                return errors.New("no mpp keysend supported")
×
776
        }
×
777

778
        // Create an invoice for the htlc amount.
779
        amt := ctx.amtPaid
15✔
780

15✔
781
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
15✔
782
        // be able to pay to it with keysend.
15✔
783
        rawFeatures := lnwire.NewRawFeatureVector(
15✔
784
                lnwire.TLVOnionPayloadRequired,
15✔
785
        )
15✔
786
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
15✔
787

15✔
788
        // Use the minimum block delta that we require for settling htlcs.
15✔
789
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
15✔
790

15✔
791
        // Pre-check expiry here to prevent inserting an invoice that will not
15✔
792
        // be settled.
15✔
793
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
15✔
794
                return errors.New("final expiry too soon")
×
795
        }
×
796

797
        // The invoice database indexes all invoices by payment address, however
798
        // legacy keysend payment do not have one. In order to avoid a new
799
        // payment type on-disk wrt. to indexing, we'll continue to insert a
800
        // blank payment address which is special cased in the insertion logic
801
        // to not be indexed. In the future, once AMP is merged, this should be
802
        // replaced by generating a random payment address on the behalf of the
803
        // sender.
804
        payAddr := BlankPayAddr
15✔
805

15✔
806
        // Create placeholder invoice.
15✔
807
        invoice := &Invoice{
15✔
808
                CreationDate: i.cfg.Clock.Now(),
15✔
809
                Terms: ContractTerm{
15✔
810
                        FinalCltvDelta:  finalCltvDelta,
15✔
811
                        Value:           amt,
15✔
812
                        PaymentPreimage: &preimage,
15✔
813
                        PaymentAddr:     payAddr,
15✔
814
                        Features:        features,
15✔
815
                },
15✔
816
        }
15✔
817

15✔
818
        if i.cfg.KeysendHoldTime != 0 {
21✔
819
                invoice.HodlInvoice = true
6✔
820
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
821
        }
6✔
822

823
        // Insert invoice into database. Ignore duplicates, because this
824
        // may be a replay.
825
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
15✔
826
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
15✔
827
                return err
×
828
        }
×
829

830
        return nil
15✔
831
}
832

833
// processAMP just-in-time inserts an invoice if this htlc is a keysend
834
// htlc.
835
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
27✔
836
        // AMP payments MUST also include an MPP record.
27✔
837
        if ctx.mpp == nil {
30✔
838
                return errors.New("no MPP record for AMP")
3✔
839
        }
3✔
840

841
        // Create an invoice for the total amount expected, provided in the MPP
842
        // record.
843
        amt := ctx.mpp.TotalMsat()
24✔
844

24✔
845
        // Set the TLV required and MPP optional features on the invoice. We'll
24✔
846
        // also make the AMP features required so that it can't be paid by
24✔
847
        // legacy or MPP htlcs.
24✔
848
        rawFeatures := lnwire.NewRawFeatureVector(
24✔
849
                lnwire.TLVOnionPayloadRequired,
24✔
850
                lnwire.PaymentAddrOptional,
24✔
851
                lnwire.AMPRequired,
24✔
852
        )
24✔
853
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
24✔
854

24✔
855
        // Use the minimum block delta that we require for settling htlcs.
24✔
856
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
24✔
857

24✔
858
        // Pre-check expiry here to prevent inserting an invoice that will not
24✔
859
        // be settled.
24✔
860
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
24✔
861
                return errors.New("final expiry too soon")
×
862
        }
×
863

864
        // We'll use the sender-generated payment address provided in the HTLC
865
        // to create our AMP invoice.
866
        payAddr := ctx.mpp.PaymentAddr()
24✔
867

24✔
868
        // Create placeholder invoice.
24✔
869
        invoice := &Invoice{
24✔
870
                CreationDate: i.cfg.Clock.Now(),
24✔
871
                Terms: ContractTerm{
24✔
872
                        FinalCltvDelta:  finalCltvDelta,
24✔
873
                        Value:           amt,
24✔
874
                        PaymentPreimage: nil,
24✔
875
                        PaymentAddr:     payAddr,
24✔
876
                        Features:        features,
24✔
877
                },
24✔
878
        }
24✔
879

24✔
880
        // Insert invoice into database. Ignore duplicates payment hashes and
24✔
881
        // payment addrs, this may be a replay or a different HTLC for the AMP
24✔
882
        // invoice.
24✔
883
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
24✔
884
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
24✔
885
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
24✔
886
        switch {
24✔
887
        case isDuplicatedInvoice || isDuplicatedPayAddr:
12✔
888
                return nil
12✔
889
        default:
12✔
890
                return err
12✔
891
        }
892
}
893

894
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
895
// describes how the htlc should be resolved.
896
//
897
// When the preimage of the invoice is not yet known (hodl invoice), this
898
// function moves the invoice to the accepted state. When SettleHoldInvoice is
899
// called later, a resolution message will be send back to the caller via the
900
// provided hodlChan. Invoice registry sends on this channel what action needs
901
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
902
// the channel is either buffered or received on from another goroutine to
903
// prevent deadlock.
904
//
905
// In the case that the htlc is part of a larger set of htlcs that pay to the
906
// same invoice (multi-path payment), the htlc is held until the set is
907
// complete. If the set doesn't fully arrive in time, a timer will cancel the
908
// held htlc.
909
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
910
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
911
        circuitKey CircuitKey, hodlChan chan<- interface{},
912
        wireCustomRecords lnwire.CustomRecords,
913
        payload Payload) (HtlcResolution, error) {
967✔
914

967✔
915
        // Create the update context containing the relevant details of the
967✔
916
        // incoming htlc.
967✔
917
        ctx := invoiceUpdateCtx{
967✔
918
                hash:                 rHash,
967✔
919
                circuitKey:           circuitKey,
967✔
920
                amtPaid:              amtPaid,
967✔
921
                expiry:               expiry,
967✔
922
                currentHeight:        currentHeight,
967✔
923
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
967✔
924
                wireCustomRecords:    wireCustomRecords,
967✔
925
                customRecords:        payload.CustomRecords(),
967✔
926
                mpp:                  payload.MultiPath(),
967✔
927
                amp:                  payload.AMPRecord(),
967✔
928
                metadata:             payload.Metadata(),
967✔
929
                pathID:               payload.PathID(),
967✔
930
                totalAmtMsat:         payload.TotalAmtMsat(),
967✔
931
        }
967✔
932

967✔
933
        switch {
967✔
934
        // If we are accepting spontaneous AMP payments and this payload
935
        // contains an AMP record, create an AMP invoice that will be settled
936
        // below.
937
        case i.cfg.AcceptAMP && ctx.amp != nil:
27✔
938
                err := i.processAMP(ctx)
27✔
939
                if err != nil {
30✔
940
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
941

3✔
942
                        return NewFailResolution(
3✔
943
                                circuitKey, currentHeight, ResultAmpError,
3✔
944
                        ), nil
3✔
945
                }
3✔
946

947
        // If we are accepting spontaneous keysend payments, create a regular
948
        // invoice that will be settled below. We also enforce that this is only
949
        // done when no AMP payload is present since it will only be settle-able
950
        // by regular HTLCs.
951
        case i.cfg.AcceptKeySend && ctx.amp == nil:
18✔
952
                err := i.processKeySend(ctx)
18✔
953
                if err != nil {
21✔
954
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
955

3✔
956
                        return NewFailResolution(
3✔
957
                                circuitKey, currentHeight, ResultKeySendError,
3✔
958
                        ), nil
3✔
959
                }
3✔
960
        }
961

962
        // Execute locked notify exit hop logic.
963
        i.Lock()
961✔
964
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
961✔
965
                &ctx, hodlChan,
961✔
966
        )
961✔
967
        i.Unlock()
961✔
968
        if err != nil {
961✔
969
                return nil, err
×
970
        }
×
971

972
        if invoiceToExpire != nil {
1,043✔
973
                i.expiryWatcher.AddInvoices(invoiceToExpire)
82✔
974
        }
82✔
975

976
        switch r := resolution.(type) {
961✔
977
        // The htlc is held. Start a timer outside the lock if the htlc should
978
        // be auto-released, because otherwise a deadlock may happen with the
979
        // main event loop.
980
        case *htlcAcceptResolution:
437✔
981
                if r.autoRelease {
785✔
982
                        var invRef InvoiceRef
348✔
983
                        if ctx.amp != nil {
375✔
984
                                invRef = InvoiceRefBySetID(*ctx.setID())
27✔
985
                        } else {
348✔
986
                                invRef = ctx.invoiceRef()
321✔
987
                        }
321✔
988

989
                        err := i.startHtlcTimer(
348✔
990
                                invRef, circuitKey, r.acceptTime,
348✔
991
                        )
348✔
992
                        if err != nil {
348✔
993
                                return nil, err
×
994
                        }
×
995
                }
996

997
                // We return a nil resolution because htlc acceptances are
998
                // represented as nil resolutions externally.
999
                // TODO(carla) update calling code to handle accept resolutions.
1000
                return nil, nil
437✔
1001

1002
        // A direct resolution was received for this htlc.
1003
        case HtlcResolution:
524✔
1004
                return r, nil
524✔
1005

1006
        // Fail if an unknown resolution type was received.
1007
        default:
×
1008
                return nil, errors.New("invalid resolution type")
×
1009
        }
1010
}
1011

1012
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1013
// that should be executed inside the registry lock. The returned invoiceExpiry
1014
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1015
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1016
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1017
        HtlcResolution, invoiceExpiry, error) {
961✔
1018

961✔
1019
        invoiceRef := ctx.invoiceRef()
961✔
1020

961✔
1021
        // This setID is only set for AMP HTLCs, so it can be nil and it is
961✔
1022
        // also expected to be nil for non-AMP HTLCs.
961✔
1023
        setID := (*SetID)(ctx.setID())
961✔
1024

961✔
1025
        // We need to look up the current state of the invoice in order to send
961✔
1026
        // the previously accepted/settled HTLCs to the interceptor.
961✔
1027
        existingInvoice, err := i.idb.LookupInvoice(
961✔
1028
                context.Background(), invoiceRef,
961✔
1029
        )
961✔
1030
        switch {
961✔
1031
        case errors.Is(err, ErrInvoiceNotFound) ||
1032
                errors.Is(err, ErrNoInvoicesCreated):
22✔
1033

22✔
1034
                // If the invoice was not found, return a failure resolution
22✔
1035
                // with an invoice not found result.
22✔
1036
                return NewFailResolution(
22✔
1037
                        ctx.circuitKey, ctx.currentHeight,
22✔
1038
                        ResultInvoiceNotFound,
22✔
1039
                ), nil, nil
22✔
1040

1041
        case err != nil:
×
1042
                ctx.log(err.Error())
×
1043
                return nil, nil, err
×
1044
        }
1045

1046
        var cancelSet bool
939✔
1047

939✔
1048
        // Provide the invoice to the settlement interceptor to allow
939✔
1049
        // the interceptor's client an opportunity to manipulate the
939✔
1050
        // settlement process.
939✔
1051
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
939✔
1052
                WireCustomRecords:  ctx.wireCustomRecords,
939✔
1053
                ExitHtlcCircuitKey: ctx.circuitKey,
939✔
1054
                ExitHtlcAmt:        ctx.amtPaid,
939✔
1055
                ExitHtlcExpiry:     ctx.expiry,
939✔
1056
                CurrentHeight:      uint32(ctx.currentHeight),
939✔
1057
                Invoice:            existingInvoice,
939✔
1058
        }, func(resp HtlcModifyResponse) {
942✔
1059
                log.Debugf("Received invoice HTLC interceptor response: %v",
3✔
1060
                        resp)
3✔
1061

3✔
1062
                if resp.AmountPaid != 0 {
3✔
1063
                        ctx.amtPaid = resp.AmountPaid
×
1064
                }
×
1065

1066
                cancelSet = resp.CancelSet
3✔
1067
        })
1068
        if err != nil {
939✔
1069
                err := fmt.Errorf("error during invoice HTLC interception: %w",
×
1070
                        err)
×
1071
                ctx.log(err.Error())
×
1072

×
1073
                return nil, nil, err
×
1074
        }
×
1075

1076
        // We'll attempt to settle an invoice matching this rHash on disk (if
1077
        // one exists). The callback will update the invoice state and/or htlcs.
1078
        var (
939✔
1079
                resolution        HtlcResolution
939✔
1080
                updateSubscribers bool
939✔
1081
        )
939✔
1082
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1,878✔
1083
                // First check if this is a replayed htlc and resolve it
939✔
1084
                // according to its current state. We cannot decide differently
939✔
1085
                // once the HTLC has already been processed before.
939✔
1086
                isReplayed, res, err := resolveReplayedHtlc(ctx, inv)
939✔
1087
                if err != nil {
939✔
1088
                        return nil, err
×
1089
                }
×
1090
                if isReplayed {
955✔
1091
                        resolution = res
16✔
1092
                        return nil, nil
16✔
1093
                }
16✔
1094

1095
                // In case the HTLC interceptor cancels the HTLC set, we do NOT
1096
                // cancel the invoice however we cancel the complete HTLC set.
1097
                if cancelSet {
926✔
1098
                        // If the invoice is not open, something is wrong, we
3✔
1099
                        // fail just the HTLC with the specific error.
3✔
1100
                        if inv.State != ContractOpen {
3✔
1101
                                log.Errorf("Invoice state (%v) is not OPEN, "+
×
1102
                                        "cancelling HTLC set not allowed by "+
×
1103
                                        "external source", inv.State)
×
1104

×
1105
                                resolution = NewFailResolution(
×
1106
                                        ctx.circuitKey, ctx.currentHeight,
×
1107
                                        ResultInvoiceNotOpen,
×
1108
                                )
×
1109

×
1110
                                return nil, nil
×
1111
                        }
×
1112

1113
                        // The error `ExternalValidationFailed` error
1114
                        // information will be packed in the
1115
                        // `FailIncorrectDetails` msg when sending the msg to
1116
                        // the peer. Error codes are defined by the BOLT 04
1117
                        // specification. The error text can be arbitrary
1118
                        // therefore we return a custom error msg.
1119
                        resolution = NewFailResolution(
3✔
1120
                                ctx.circuitKey, ctx.currentHeight,
3✔
1121
                                ExternalValidationFailed,
3✔
1122
                        )
3✔
1123

3✔
1124
                        // We cancel all HTLCs which are in the accepted state.
3✔
1125
                        //
3✔
1126
                        // NOTE: The current HTLC is not included because it
3✔
1127
                        // was never accepted in the first place.
3✔
1128
                        htlcs := inv.HTLCSet(ctx.setID(), HtlcStateAccepted)
3✔
1129
                        htlcKeys := fn.KeySet[CircuitKey](htlcs)
3✔
1130

3✔
1131
                        // The external source did cancel the htlc set, so we
3✔
1132
                        // cancel all HTLCs in the set. We however keep the
3✔
1133
                        // invoice in the open state.
3✔
1134
                        //
3✔
1135
                        // NOTE: The invoice event loop will still call the
3✔
1136
                        // `cancelSingleHTLC` method for MPP payments, however
3✔
1137
                        // because the HTLCs are already cancled back it will be
3✔
1138
                        // a NOOP.
3✔
1139
                        update := &InvoiceUpdateDesc{
3✔
1140
                                UpdateType:  CancelHTLCsUpdate,
3✔
1141
                                CancelHtlcs: htlcKeys,
3✔
1142
                                SetID:       setID,
3✔
1143
                        }
3✔
1144

3✔
1145
                        return update, nil
3✔
1146
                }
1147

1148
                updateDesc, res, err := updateInvoice(ctx, inv)
920✔
1149
                if err != nil {
920✔
1150
                        return nil, err
×
1151
                }
×
1152

1153
                // Set resolution in outer scope only after successful update.
1154
                resolution = res
920✔
1155

920✔
1156
                // Only send an update if the invoice state was changed.
920✔
1157
                updateSubscribers = updateDesc != nil &&
920✔
1158
                        updateDesc.State != nil
920✔
1159

920✔
1160
                return updateDesc, nil
920✔
1161
        }
1162

1163
        invoice, err := i.idb.UpdateInvoice(
939✔
1164
                context.Background(), invoiceRef, setID, callback,
939✔
1165
        )
939✔
1166

939✔
1167
        var duplicateSetIDErr ErrDuplicateSetID
939✔
1168
        if errors.As(err, &duplicateSetIDErr) {
939✔
1169
                return NewFailResolution(
×
1170
                        ctx.circuitKey, ctx.currentHeight,
×
1171
                        ResultInvoiceNotFound,
×
1172
                ), nil, nil
×
1173
        }
×
1174

1175
        switch {
939✔
1176
        case errors.Is(err, ErrInvoiceNotFound):
×
1177
                // If the invoice was not found, return a failure resolution
×
1178
                // with an invoice not found result.
×
1179
                return NewFailResolution(
×
1180
                        ctx.circuitKey, ctx.currentHeight,
×
1181
                        ResultInvoiceNotFound,
×
1182
                ), nil, nil
×
1183

1184
        case errors.Is(err, ErrInvRefEquivocation):
×
1185
                return NewFailResolution(
×
1186
                        ctx.circuitKey, ctx.currentHeight,
×
1187
                        ResultInvoiceNotFound,
×
1188
                ), nil, nil
×
1189

1190
        case err == nil:
939✔
1191

1192
        default:
×
1193
                ctx.log(err.Error())
×
1194
                return nil, nil, err
×
1195
        }
1196

1197
        var invoiceToExpire invoiceExpiry
939✔
1198

939✔
1199
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
939✔
1200

939✔
1201
        switch res := resolution.(type) {
939✔
1202
        case *HtlcFailResolution:
29✔
1203
                // Inspect latest htlc state on the invoice. If it is found,
29✔
1204
                // we will update the accept height as it was recorded in the
29✔
1205
                // invoice database (which occurs in the case where the htlc
29✔
1206
                // reached the database in a previous call). If the htlc was
29✔
1207
                // not found on the invoice, it was immediately failed so we
29✔
1208
                // send the failure resolution as is, which has the current
29✔
1209
                // height set as the accept height.
29✔
1210
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
29✔
1211
                if ok {
32✔
1212
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1213
                }
3✔
1214

1215
                ctx.log(fmt.Sprintf("failure resolution result "+
29✔
1216
                        "outcome: %v, at accept height: %v",
29✔
1217
                        res.Outcome, res.AcceptHeight))
29✔
1218

29✔
1219
                // Some failures apply to the entire HTLC set. Break here if
29✔
1220
                // this isn't one of them.
29✔
1221
                if !res.Outcome.IsSetFailure() {
49✔
1222
                        break
20✔
1223
                }
1224

1225
                // Also cancel any HTLCs in the HTLC set that are also in the
1226
                // canceled state with the same failure result.
1227
                setID := ctx.setID()
9✔
1228
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
9✔
1229
                for key, htlc := range canceledHtlcSet {
18✔
1230
                        htlcFailResolution := NewFailResolution(
9✔
1231
                                key, int32(htlc.AcceptHeight), res.Outcome,
9✔
1232
                        )
9✔
1233

9✔
1234
                        i.notifyHodlSubscribers(htlcFailResolution)
9✔
1235
                }
9✔
1236

1237
        // If the htlc was settled, we will settle any previously accepted
1238
        // htlcs and notify our peer to settle them.
1239
        case *HtlcSettleResolution:
473✔
1240
                ctx.log(fmt.Sprintf("settle resolution result "+
473✔
1241
                        "outcome: %v, at accept height: %v",
473✔
1242
                        res.Outcome, res.AcceptHeight))
473✔
1243

473✔
1244
                // Also settle any previously accepted htlcs. If a htlc is
473✔
1245
                // marked as settled, we should follow now and settle the htlc
473✔
1246
                // with our peer.
473✔
1247
                setID := ctx.setID()
473✔
1248
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
473✔
1249
                for key, htlc := range settledHtlcSet {
1,258✔
1250
                        preimage := res.Preimage
785✔
1251
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
797✔
1252
                                preimage = *htlc.AMP.Preimage
12✔
1253
                        }
12✔
1254

1255
                        // Notify subscribers that the htlcs should be settled
1256
                        // with our peer. Note that the outcome of the
1257
                        // resolution is set based on the outcome of the single
1258
                        // htlc that we just settled, so may not be accurate
1259
                        // for all htlcs.
1260
                        htlcSettleResolution := NewSettleResolution(
785✔
1261
                                preimage, key,
785✔
1262
                                int32(htlc.AcceptHeight), res.Outcome,
785✔
1263
                        )
785✔
1264

785✔
1265
                        // Notify subscribers that the htlc should be settled
785✔
1266
                        // with our peer.
785✔
1267
                        i.notifyHodlSubscribers(htlcSettleResolution)
785✔
1268
                }
1269

1270
                // If concurrent payments were attempted to this invoice before
1271
                // the current one was ultimately settled, cancel back any of
1272
                // the HTLCs immediately. As a result of the settle, the HTLCs
1273
                // in other HTLC sets are automatically converted to a canceled
1274
                // state when updating the invoice.
1275
                //
1276
                // TODO(roasbeef): can remove now??
1277
                canceledHtlcSet := invoice.HTLCSetCompliment(
473✔
1278
                        setID, HtlcStateCanceled,
473✔
1279
                )
473✔
1280
                for key, htlc := range canceledHtlcSet {
473✔
1281
                        htlcFailResolution := NewFailResolution(
×
1282
                                key, int32(htlc.AcceptHeight),
×
1283
                                ResultInvoiceAlreadySettled,
×
1284
                        )
×
1285

×
1286
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1287
                }
×
1288

1289
        // If we accepted the htlc, subscribe to the hodl invoice and return
1290
        // an accept resolution with the htlc's accept time on it.
1291
        case *htlcAcceptResolution:
437✔
1292
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
437✔
1293
                if !ok {
437✔
1294
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1295
                                " present on invoice: %x", ctx.circuitKey,
×
1296
                                ctx.hash[:])
×
1297
                }
×
1298

1299
                // Determine accepted height of this htlc. If the htlc reached
1300
                // the invoice database (possibly in a previous call to the
1301
                // invoice registry), we'll take the original accepted height
1302
                // as it was recorded in the database.
1303
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
437✔
1304

437✔
1305
                ctx.log(fmt.Sprintf("accept resolution result "+
437✔
1306
                        "outcome: %v, at accept height: %v",
437✔
1307
                        res.outcome, acceptHeight))
437✔
1308

437✔
1309
                // Auto-release the htlc if the invoice is still open. It can
437✔
1310
                // only happen for mpp payments that there are htlcs in state
437✔
1311
                // Accepted while the invoice is Open.
437✔
1312
                if invoice.State == ContractOpen {
785✔
1313
                        res.acceptTime = invoiceHtlc.AcceptTime
348✔
1314
                        res.autoRelease = true
348✔
1315
                }
348✔
1316

1317
                // If we have fully accepted the set of htlcs for this invoice,
1318
                // we can now add it to our invoice expiry watcher. We do not
1319
                // add invoices before they are fully accepted, because it is
1320
                // possible that we MppTimeout the htlcs, and then our relevant
1321
                // expiry height could change.
1322
                if res.outcome == resultAccepted {
519✔
1323
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
82✔
1324
                }
82✔
1325

1326
                // Subscribe to the resolution if the caller specified a
1327
                // notification channel.
1328
                if hodlChan != nil {
874✔
1329
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
437✔
1330
                }
437✔
1331

1332
        default:
×
1333
                panic("unknown action")
×
1334
        }
1335

1336
        // Now that the links have been notified of any state changes to their
1337
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1338
        // state changes.
1339
        if updateSubscribers {
1,491✔
1340
                // We'll add a setID onto the notification, but only if this is
552✔
1341
                // an AMP invoice being settled.
552✔
1342
                var setID *[32]byte
552✔
1343
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,016✔
1344
                        setID = ctx.setID()
464✔
1345
                }
464✔
1346

1347
                i.notifyClients(ctx.hash, invoice, setID)
552✔
1348
        }
1349

1350
        return resolution, invoiceToExpire, nil
939✔
1351
}
1352

1353
// SettleHodlInvoice sets the preimage of a hodl invoice.
1354
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1355
        preimage lntypes.Preimage) error {
69✔
1356

69✔
1357
        i.Lock()
69✔
1358
        defer i.Unlock()
69✔
1359

69✔
1360
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
138✔
1361
                switch invoice.State {
69✔
1362
                case ContractOpen:
×
1363
                        return nil, ErrInvoiceStillOpen
×
1364

1365
                case ContractCanceled:
×
1366
                        return nil, ErrInvoiceAlreadyCanceled
×
1367

1368
                case ContractSettled:
3✔
1369
                        return nil, ErrInvoiceAlreadySettled
3✔
1370
                }
1371

1372
                return &InvoiceUpdateDesc{
66✔
1373
                        UpdateType: SettleHodlInvoiceUpdate,
66✔
1374
                        State: &InvoiceStateUpdateDesc{
66✔
1375
                                NewState: ContractSettled,
66✔
1376
                                Preimage: &preimage,
66✔
1377
                        },
66✔
1378
                }, nil
66✔
1379
        }
1380

1381
        hash := preimage.Hash()
69✔
1382
        invoiceRef := InvoiceRefByHash(hash)
69✔
1383

69✔
1384
        // AMP hold invoices are not supported so we set the setID to nil.
69✔
1385
        // For non-AMP invoices this parameter is ignored during the fetching
69✔
1386
        // of the database state.
69✔
1387
        setID := (*SetID)(nil)
69✔
1388

69✔
1389
        invoice, err := i.idb.UpdateInvoice(
69✔
1390
                ctx, invoiceRef, setID, updateInvoice,
69✔
1391
        )
69✔
1392
        if err != nil {
72✔
1393
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1394
                        preimage, err)
3✔
1395

3✔
1396
                return err
3✔
1397
        }
3✔
1398

1399
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
66✔
1400
                invoice.Terms.PaymentPreimage)
66✔
1401

66✔
1402
        // In the callback, we marked the invoice as settled. UpdateInvoice will
66✔
1403
        // have seen this and should have moved all htlcs that were accepted to
66✔
1404
        // the settled state. In the loop below, we go through all of these and
66✔
1405
        // notify links and resolvers that are waiting for resolution. Any htlcs
66✔
1406
        // that were already settled before, will be notified again. This isn't
66✔
1407
        // necessary but doesn't hurt either.
66✔
1408
        for key, htlc := range invoice.Htlcs {
135✔
1409
                if htlc.State != HtlcStateSettled {
69✔
1410
                        continue
×
1411
                }
1412

1413
                resolution := NewSettleResolution(
69✔
1414
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
69✔
1415
                )
69✔
1416

69✔
1417
                i.notifyHodlSubscribers(resolution)
69✔
1418
        }
1419
        i.notifyClients(hash, invoice, nil)
66✔
1420

66✔
1421
        return nil
66✔
1422
}
1423

1424
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1425
// payment hash.
1426
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1427
        payHash lntypes.Hash) error {
29✔
1428

29✔
1429
        return i.cancelInvoiceImpl(ctx, payHash, true)
29✔
1430
}
29✔
1431

1432
// shouldCancel examines the state of an invoice and whether we want to
1433
// cancel already accepted invoices, taking our force cancel boolean into
1434
// account. This is pulled out into its own function so that tests that mock
1435
// cancelInvoiceImpl can reuse this logic.
1436
func shouldCancel(state ContractState, cancelAccepted bool) bool {
104✔
1437
        if state != ContractAccepted {
180✔
1438
                return true
76✔
1439
        }
76✔
1440

1441
        // If the invoice is accepted, we should only cancel if we want to
1442
        // force cancellation of accepted invoices.
1443
        return cancelAccepted
28✔
1444
}
1445

1446
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1447
// payment hash. Accepted invoices will only be canceled if explicitly
1448
// requested to do so. It notifies subscribing links and resolvers that
1449
// the associated htlcs were canceled if they change state.
1450
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1451
        payHash lntypes.Hash, cancelAccepted bool) error {
112✔
1452

112✔
1453
        i.Lock()
112✔
1454
        defer i.Unlock()
112✔
1455

112✔
1456
        ref := InvoiceRefByHash(payHash)
112✔
1457
        log.Debugf("Invoice%v: canceling invoice", ref)
112✔
1458

112✔
1459
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
216✔
1460
                if !shouldCancel(invoice.State, cancelAccepted) {
116✔
1461
                        return nil, nil
12✔
1462
                }
12✔
1463

1464
                // Move invoice to the canceled state. Rely on validation in
1465
                // channeldb to return an error if the invoice is already
1466
                // settled or canceled.
1467
                return &InvoiceUpdateDesc{
92✔
1468
                        UpdateType: CancelInvoiceUpdate,
92✔
1469
                        State: &InvoiceStateUpdateDesc{
92✔
1470
                                NewState: ContractCanceled,
92✔
1471
                        },
92✔
1472
                }, nil
92✔
1473
        }
1474

1475
        // If it's an AMP invoice we need to fetch all AMP HTLCs here so that
1476
        // we can cancel all of HTLCs which are in the accepted state across
1477
        // different setIDs.
1478
        setID := (*SetID)(nil)
112✔
1479
        invoiceRef := InvoiceRefByHash(payHash)
112✔
1480
        invoice, err := i.idb.UpdateInvoice(
112✔
1481
                ctx, invoiceRef, setID, updateInvoice,
112✔
1482
        )
112✔
1483

112✔
1484
        // Implement idempotency by returning success if the invoice was already
112✔
1485
        // canceled.
112✔
1486
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
115✔
1487
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1488
                return nil
3✔
1489
        }
3✔
1490
        if err != nil {
129✔
1491
                return err
20✔
1492
        }
20✔
1493

1494
        // Return without cancellation if the invoice state is ContractAccepted.
1495
        if invoice.State == ContractAccepted {
101✔
1496
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1497
                        "explicitly requested.", ref)
12✔
1498
                return nil
12✔
1499
        }
12✔
1500

1501
        log.Debugf("Invoice%v: canceled", ref)
77✔
1502

77✔
1503
        // In the callback, some htlcs may have been moved to the canceled
77✔
1504
        // state. We now go through all of these and notify links and resolvers
77✔
1505
        // that are waiting for resolution. Any htlcs that were already canceled
77✔
1506
        // before, will be notified again. This isn't necessary but doesn't hurt
77✔
1507
        // either.
77✔
1508
        // For AMP invoices we fetched all AMP HTLCs for all sub AMP invoices
77✔
1509
        // here so we can clean up all of them.
77✔
1510
        for key, htlc := range invoice.Htlcs {
120✔
1511
                if htlc.State != HtlcStateCanceled {
43✔
1512
                        continue
×
1513
                }
1514

1515
                i.notifyHodlSubscribers(
43✔
1516
                        NewFailResolution(
43✔
1517
                                key, int32(htlc.AcceptHeight), ResultCanceled,
43✔
1518
                        ),
43✔
1519
                )
43✔
1520
        }
1521

1522
        i.notifyClients(payHash, invoice, nil)
77✔
1523

77✔
1524
        // Attempt to also delete the invoice if requested through the registry
77✔
1525
        // config.
77✔
1526
        if i.cfg.GcCanceledInvoicesOnTheFly {
80✔
1527
                // Assemble the delete reference and attempt to delete through
3✔
1528
                // the invocice from the DB.
3✔
1529
                deleteRef := InvoiceDeleteRef{
3✔
1530
                        PayHash:     payHash,
3✔
1531
                        AddIndex:    invoice.AddIndex,
3✔
1532
                        SettleIndex: invoice.SettleIndex,
3✔
1533
                }
3✔
1534
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1535
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1536
                }
×
1537

1538
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1539
                // If by any chance deletion failed, then log it instead of
3✔
1540
                // returning the error, as the invoice itself has already been
3✔
1541
                // canceled.
3✔
1542
                if err != nil {
3✔
1543
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1544
                                err)
×
1545
                }
×
1546
        }
1547

1548
        return nil
77✔
1549
}
1550

1551
// notifyClients notifies all currently registered invoice notification clients
1552
// of a newly added/settled invoice.
1553
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1554
        invoice *Invoice, setID *[32]byte) {
1,409✔
1555

1,409✔
1556
        event := &invoiceEvent{
1,409✔
1557
                invoice: invoice,
1,409✔
1558
                hash:    hash,
1,409✔
1559
                setID:   setID,
1,409✔
1560
        }
1,409✔
1561

1,409✔
1562
        select {
1,409✔
1563
        case i.invoiceEvents <- event:
1,409✔
1564
        case <-i.quit:
×
1565
        }
1566
}
1567

1568
// invoiceSubscriptionKit defines that are common to both all invoice
1569
// subscribers and single invoice subscribers.
1570
type invoiceSubscriptionKit struct {
1571
        id uint32 // nolint:structcheck
1572

1573
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1574
        quit chan struct{}
1575

1576
        ntfnQueue *queue.ConcurrentQueue
1577

1578
        canceled   uint32 // To be used atomically.
1579
        cancelChan chan struct{}
1580

1581
        // backlogDelivered is closed when the backlog events have been
1582
        // delivered.
1583
        backlogDelivered chan struct{}
1584
}
1585

1586
// InvoiceSubscription represents an intent to receive updates for newly added
1587
// or settled invoices. For each newly added invoice, a copy of the invoice
1588
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1589
// invoice, a copy of the invoice will be sent over the SettledInvoices
1590
// channel.
1591
type InvoiceSubscription struct {
1592
        invoiceSubscriptionKit
1593

1594
        // NewInvoices is a channel that we'll use to send all newly created
1595
        // invoices with an invoice index greater than the specified
1596
        // StartingInvoiceIndex field.
1597
        NewInvoices chan *Invoice
1598

1599
        // SettledInvoices is a channel that we'll use to send all settled
1600
        // invoices with an invoices index greater than the specified
1601
        // StartingInvoiceIndex field.
1602
        SettledInvoices chan *Invoice
1603

1604
        // addIndex is the highest add index the caller knows of. We'll use
1605
        // this information to send out an event backlog to the notifications
1606
        // subscriber. Any new add events with an index greater than this will
1607
        // be dispatched before any new notifications are sent out.
1608
        addIndex uint64
1609

1610
        // settleIndex is the highest settle index the caller knows of. We'll
1611
        // use this information to send out an event backlog to the
1612
        // notifications subscriber. Any new settle events with an index
1613
        // greater than this will be dispatched before any new notifications
1614
        // are sent out.
1615
        settleIndex uint64
1616
}
1617

1618
// SingleInvoiceSubscription represents an intent to receive updates for a
1619
// specific invoice.
1620
type SingleInvoiceSubscription struct {
1621
        invoiceSubscriptionKit
1622

1623
        invoiceRef InvoiceRef
1624

1625
        // Updates is a channel that we'll use to send all invoice events for
1626
        // the invoice that is subscribed to.
1627
        Updates chan *Invoice
1628
}
1629

1630
// PayHash returns the optional payment hash of the target invoice.
1631
//
1632
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1633
// be deleted as soon as invoiceregistery_test is in the same module.
1634
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1635
        return s.invoiceRef.PayHash()
18✔
1636
}
18✔
1637

1638
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1639
// resources.
1640
func (i *invoiceSubscriptionKit) Cancel() {
63✔
1641
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
63✔
1642
                return
×
1643
        }
×
1644

1645
        i.ntfnQueue.Stop()
63✔
1646
        close(i.cancelChan)
63✔
1647
}
1648

1649
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
102✔
1650
        select {
102✔
1651
        case i.ntfnQueue.ChanIn() <- event:
102✔
1652

1653
        case <-i.cancelChan:
×
1654
                // This can only be triggered by delivery of non-backlog
×
1655
                // events.
×
1656
                return ErrShuttingDown
×
1657
        case <-i.quit:
×
1658
                return ErrShuttingDown
×
1659
        }
1660

1661
        return nil
102✔
1662
}
1663

1664
// SubscribeNotifications returns an InvoiceSubscription which allows the
1665
// caller to receive async notifications when any invoices are settled or
1666
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1667
// by first sending out all new events with an invoice index _greater_ than
1668
// this value. Afterwards, we'll send out real-time notifications.
1669
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1670
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
45✔
1671

45✔
1672
        client := &InvoiceSubscription{
45✔
1673
                NewInvoices:     make(chan *Invoice),
45✔
1674
                SettledInvoices: make(chan *Invoice),
45✔
1675
                addIndex:        addIndex,
45✔
1676
                settleIndex:     settleIndex,
45✔
1677
                invoiceSubscriptionKit: invoiceSubscriptionKit{
45✔
1678
                        quit:             i.quit,
45✔
1679
                        ntfnQueue:        queue.NewConcurrentQueue(20),
45✔
1680
                        cancelChan:       make(chan struct{}),
45✔
1681
                        backlogDelivered: make(chan struct{}),
45✔
1682
                },
45✔
1683
        }
45✔
1684
        client.ntfnQueue.Start()
45✔
1685

45✔
1686
        // This notifies other goroutines that the backlog phase is over.
45✔
1687
        defer close(client.backlogDelivered)
45✔
1688

45✔
1689
        // Always increment by 1 first, and our client ID will start with 1,
45✔
1690
        // not 0.
45✔
1691
        client.id = atomic.AddUint32(&i.nextClientID, 1)
45✔
1692

45✔
1693
        // Before we register this new invoice subscription, we'll launch a new
45✔
1694
        // goroutine that will proxy all notifications appended to the end of
45✔
1695
        // the concurrent queue to the two client-side channels the caller will
45✔
1696
        // feed off of.
45✔
1697
        i.wg.Add(1)
45✔
1698
        go func() {
90✔
1699
                defer i.wg.Done()
45✔
1700
                defer i.deleteClient(client.id)
45✔
1701

45✔
1702
                for {
156✔
1703
                        select {
111✔
1704
                        // A new invoice event has been sent by the
1705
                        // invoiceRegistry! We'll figure out if this is an add
1706
                        // event or a settle event, then dispatch the event to
1707
                        // the client.
1708
                        case ntfn := <-client.ntfnQueue.ChanOut():
66✔
1709
                                invoiceEvent := ntfn.(*invoiceEvent)
66✔
1710

66✔
1711
                                var targetChan chan *Invoice
66✔
1712
                                state := invoiceEvent.invoice.State
66✔
1713
                                switch {
66✔
1714
                                // AMP invoices never move to settled, but will
1715
                                // be sent with a set ID if an HTLC set is
1716
                                // being settled.
1717
                                case state == ContractOpen &&
1718
                                        invoiceEvent.setID != nil:
6✔
1719
                                        fallthrough
6✔
1720

1721
                                case state == ContractSettled:
24✔
1722
                                        targetChan = client.SettledInvoices
24✔
1723

1724
                                case state == ContractOpen:
42✔
1725
                                        targetChan = client.NewInvoices
42✔
1726

1727
                                default:
×
1728
                                        log.Errorf("unknown invoice state: %v",
×
1729
                                                state)
×
1730

×
1731
                                        continue
×
1732
                                }
1733

1734
                                select {
66✔
1735
                                case targetChan <- invoiceEvent.invoice:
66✔
1736

1737
                                case <-client.cancelChan:
×
1738
                                        return
×
1739

1740
                                case <-i.quit:
×
1741
                                        return
×
1742
                                }
1743

1744
                        case <-client.cancelChan:
45✔
1745
                                return
45✔
1746

1747
                        case <-i.quit:
×
1748
                                return
×
1749
                        }
1750
                }
1751
        }()
1752

1753
        i.notificationClientMux.Lock()
45✔
1754
        i.notificationClients[client.id] = client
45✔
1755
        i.notificationClientMux.Unlock()
45✔
1756

45✔
1757
        // Query the database to see if based on the provided addIndex and
45✔
1758
        // settledIndex we need to deliver any backlog notifications.
45✔
1759
        err := i.deliverBacklogEvents(ctx, client)
45✔
1760
        if err != nil {
45✔
1761
                return nil, err
×
1762
        }
×
1763

1764
        log.Infof("New invoice subscription client: id=%v", client.id)
45✔
1765

45✔
1766
        return client, nil
45✔
1767
}
1768

1769
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1770
// caller to receive async notifications for a specific invoice.
1771
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1772
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
18✔
1773

18✔
1774
        client := &SingleInvoiceSubscription{
18✔
1775
                Updates: make(chan *Invoice),
18✔
1776
                invoiceSubscriptionKit: invoiceSubscriptionKit{
18✔
1777
                        quit:             i.quit,
18✔
1778
                        ntfnQueue:        queue.NewConcurrentQueue(20),
18✔
1779
                        cancelChan:       make(chan struct{}),
18✔
1780
                        backlogDelivered: make(chan struct{}),
18✔
1781
                },
18✔
1782
                invoiceRef: InvoiceRefByHash(hash),
18✔
1783
        }
18✔
1784
        client.ntfnQueue.Start()
18✔
1785

18✔
1786
        // This notifies other goroutines that the backlog phase is done.
18✔
1787
        defer close(client.backlogDelivered)
18✔
1788

18✔
1789
        // Always increment by 1 first, and our client ID will start with 1,
18✔
1790
        // not 0.
18✔
1791
        client.id = atomic.AddUint32(&i.nextClientID, 1)
18✔
1792

18✔
1793
        // Before we register this new invoice subscription, we'll launch a new
18✔
1794
        // goroutine that will proxy all notifications appended to the end of
18✔
1795
        // the concurrent queue to the two client-side channels the caller will
18✔
1796
        // feed off of.
18✔
1797
        i.wg.Add(1)
18✔
1798
        go func() {
36✔
1799
                defer i.wg.Done()
18✔
1800
                defer i.deleteClient(client.id)
18✔
1801

18✔
1802
                for {
72✔
1803
                        select {
54✔
1804
                        // A new invoice event has been sent by the
1805
                        // invoiceRegistry. We will dispatch the event to the
1806
                        // client.
1807
                        case ntfn := <-client.ntfnQueue.ChanOut():
36✔
1808
                                invoiceEvent := ntfn.(*invoiceEvent)
36✔
1809

36✔
1810
                                select {
36✔
1811
                                case client.Updates <- invoiceEvent.invoice:
36✔
1812

1813
                                case <-client.cancelChan:
×
1814
                                        return
×
1815

1816
                                case <-i.quit:
×
1817
                                        return
×
1818
                                }
1819

1820
                        case <-client.cancelChan:
18✔
1821
                                return
18✔
1822

1823
                        case <-i.quit:
×
1824
                                return
×
1825
                        }
1826
                }
1827
        }()
1828

1829
        i.notificationClientMux.Lock()
18✔
1830
        i.singleNotificationClients[client.id] = client
18✔
1831
        i.notificationClientMux.Unlock()
18✔
1832

18✔
1833
        err := i.deliverSingleBacklogEvents(ctx, client)
18✔
1834
        if err != nil {
18✔
1835
                return nil, err
×
1836
        }
×
1837

1838
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
18✔
1839
                client.id, client.invoiceRef)
18✔
1840

18✔
1841
        return client, nil
18✔
1842
}
1843

1844
// notifyHodlSubscribers sends out the htlc resolution to all current
1845
// subscribers.
1846
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
927✔
1847
        i.hodlSubscriptionsMux.Lock()
927✔
1848
        defer i.hodlSubscriptionsMux.Unlock()
927✔
1849

927✔
1850
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
927✔
1851
        if !ok {
1,424✔
1852
                return
497✔
1853
        }
497✔
1854

1855
        // Notify all interested subscribers and remove subscription from both
1856
        // maps. The subscription can be removed as there only ever will be a
1857
        // single resolution for each hash.
1858
        for subscriber := range subscribers {
860✔
1859
                select {
430✔
1860
                case subscriber <- htlcResolution:
430✔
1861
                case <-i.quit:
×
1862
                        return
×
1863
                }
1864

1865
                delete(
430✔
1866
                        i.hodlReverseSubscriptions[subscriber],
430✔
1867
                        htlcResolution.CircuitKey(),
430✔
1868
                )
430✔
1869
        }
1870

1871
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
430✔
1872
}
1873

1874
// hodlSubscribe adds a new invoice subscription.
1875
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1876
        circuitKey CircuitKey) {
437✔
1877

437✔
1878
        i.hodlSubscriptionsMux.Lock()
437✔
1879
        defer i.hodlSubscriptionsMux.Unlock()
437✔
1880

437✔
1881
        log.Debugf("Hodl subscribe for %v", circuitKey)
437✔
1882

437✔
1883
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
437✔
1884
        if !ok {
867✔
1885
                subscriptions = make(map[chan<- interface{}]struct{})
430✔
1886
                i.hodlSubscriptions[circuitKey] = subscriptions
430✔
1887
        }
430✔
1888
        subscriptions[subscriber] = struct{}{}
437✔
1889

437✔
1890
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
437✔
1891
        if !ok {
812✔
1892
                reverseSubscriptions = make(map[CircuitKey]struct{})
375✔
1893
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
375✔
1894
        }
375✔
1895
        reverseSubscriptions[circuitKey] = struct{}{}
437✔
1896
}
1897

1898
// HodlUnsubscribeAll cancels the subscription.
1899
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
202✔
1900
        i.hodlSubscriptionsMux.Lock()
202✔
1901
        defer i.hodlSubscriptionsMux.Unlock()
202✔
1902

202✔
1903
        hashes := i.hodlReverseSubscriptions[subscriber]
202✔
1904
        for hash := range hashes {
203✔
1905
                delete(i.hodlSubscriptions[hash], subscriber)
1✔
1906
        }
1✔
1907

1908
        delete(i.hodlReverseSubscriptions, subscriber)
202✔
1909
}
1910

1911
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1912
// useful when we need to iterate the map to send notifications.
1913
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
1,409✔
1914
        i.notificationClientMux.RLock()
1,409✔
1915
        defer i.notificationClientMux.RUnlock()
1,409✔
1916

1,409✔
1917
        clients := make(map[uint32]*SingleInvoiceSubscription)
1,409✔
1918
        for k, v := range i.singleNotificationClients {
1,445✔
1919
                clients[k] = v
36✔
1920
        }
36✔
1921
        return clients
1,409✔
1922
}
1923

1924
// copyClients copies i.notificationClients inside a lock. This is useful when
1925
// we need to iterate the map to send notifications.
1926
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
1,244✔
1927
        i.notificationClientMux.RLock()
1,244✔
1928
        defer i.notificationClientMux.RUnlock()
1,244✔
1929

1,244✔
1930
        clients := make(map[uint32]*InvoiceSubscription)
1,244✔
1931
        for k, v := range i.notificationClients {
1,310✔
1932
                clients[k] = v
66✔
1933
        }
66✔
1934
        return clients
1,244✔
1935
}
1936

1937
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1938
// not found.
1939
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
63✔
1940
        i.notificationClientMux.Lock()
63✔
1941
        defer i.notificationClientMux.Unlock()
63✔
1942

63✔
1943
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
63✔
1944
        delete(i.notificationClients, clientID)
63✔
1945
        delete(i.singleNotificationClients, clientID)
63✔
1946
}
63✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc