• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13043384202

30 Jan 2025 12:49AM UTC coverage: 48.841% (-9.9%) from 58.777%
13043384202

Pull #9459

github

ziggie1984
docs: add release notes.
Pull Request #9459: invoices: amp invoices bugfix.

28 of 45 new or added lines in 3 files covered. (62.22%)

28177 existing lines in 437 files now uncovered.

99712 of 204157 relevant lines covered (48.84%)

1.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.51
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77

78
        // HtlcInterceptor is an interface that allows the invoice registry to
79
        // let clients intercept invoices before they are settled.
80
        HtlcInterceptor HtlcInterceptor
81
}
82

83
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
84
// mpp htlcs for which the complete set didn't arrive in time.
85
type htlcReleaseEvent struct {
86
        // invoiceRef identifiers the invoice this htlc belongs to.
87
        invoiceRef InvoiceRef
88

89
        // key is the circuit key of the htlc to release.
90
        key CircuitKey
91

92
        // releaseTime is the time at which to release the htlc.
93
        releaseTime time.Time
94
}
95

96
// Less is used to order PriorityQueueItem's by their release time such that
97
// items with the older release time are at the top of the queue.
98
//
99
// NOTE: Part of the queue.PriorityQueueItem interface.
100
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
2✔
101
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
2✔
102
}
2✔
103

104
// InvoiceRegistry is a central registry of all the outstanding invoices
105
// created by the daemon. The registry is a thin wrapper around a map in order
106
// to ensure that all updates/reads are thread safe.
107
type InvoiceRegistry struct {
108
        started atomic.Bool
109
        stopped atomic.Bool
110

111
        sync.RWMutex
112

113
        nextClientID uint32 // must be used atomically
114

115
        idb InvoiceDB
116

117
        // cfg contains the registry's configuration parameters.
118
        cfg *RegistryConfig
119

120
        // notificationClientMux locks notificationClients and
121
        // singleNotificationClients. Using a separate mutex for these maps is
122
        // necessary to avoid deadlocks in the registry when processing invoice
123
        // events.
124
        notificationClientMux sync.RWMutex
125

126
        notificationClients map[uint32]*InvoiceSubscription
127

128
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
129
        // performance.
130
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
131

132
        // invoiceEvents is a single channel over which invoice updates are
133
        // carried.
134
        invoiceEvents chan *invoiceEvent
135

136
        // hodlSubscriptionsMux locks the hodlSubscriptions and
137
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
138
        // necessary to avoid deadlocks in the registry when processing invoice
139
        // events.
140
        hodlSubscriptionsMux sync.RWMutex
141

142
        // hodlSubscriptions is a map from a circuit key to a list of
143
        // subscribers. It is used for efficient notification of links.
144
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
145

146
        // reverseSubscriptions tracks circuit keys subscribed to per
147
        // subscriber. This is used to unsubscribe from all hashes efficiently.
148
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
149

150
        // htlcAutoReleaseChan contains the new htlcs that need to be
151
        // auto-released.
152
        htlcAutoReleaseChan chan *htlcReleaseEvent
153

154
        expiryWatcher *InvoiceExpiryWatcher
155

156
        wg   sync.WaitGroup
157
        quit chan struct{}
158
}
159

160
// NewRegistry creates a new invoice registry. The invoice registry
161
// wraps the persistent on-disk invoice storage with an additional in-memory
162
// layer. The in-memory layer is in place such that debug invoices can be added
163
// which are volatile yet available system wide within the daemon.
164
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
165
        cfg *RegistryConfig) *InvoiceRegistry {
2✔
166

2✔
167
        notificationClients := make(map[uint32]*InvoiceSubscription)
2✔
168
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
2✔
169
        return &InvoiceRegistry{
2✔
170
                idb:                       idb,
2✔
171
                notificationClients:       notificationClients,
2✔
172
                singleNotificationClients: singleNotificationClients,
2✔
173
                invoiceEvents:             make(chan *invoiceEvent, 100),
2✔
174
                hodlSubscriptions: make(
2✔
175
                        map[CircuitKey]map[chan<- interface{}]struct{},
2✔
176
                ),
2✔
177
                hodlReverseSubscriptions: make(
2✔
178
                        map[chan<- interface{}]map[CircuitKey]struct{},
2✔
179
                ),
2✔
180
                cfg:                 cfg,
2✔
181
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
2✔
182
                expiryWatcher:       expiryWatcher,
2✔
183
                quit:                make(chan struct{}),
2✔
184
        }
2✔
185
}
2✔
186

187
// scanInvoicesOnStart will scan all invoices on start and add active invoices
188
// to the invoice expiry watcher while also attempting to delete all canceled
189
// invoices.
190
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
2✔
191
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
2✔
192
        if err != nil {
2✔
193
                return err
×
194
        }
×
195

196
        var pending []invoiceExpiry
2✔
197
        for paymentHash, invoice := range pendingInvoices {
4✔
198
                invoice := invoice
2✔
199
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
2✔
200
                if expiryRef != nil {
4✔
201
                        pending = append(pending, expiryRef)
2✔
202
                }
2✔
203
        }
204

205
        log.Debugf("Adding %d pending invoices to the expiry watcher",
2✔
206
                len(pending))
2✔
207
        i.expiryWatcher.AddInvoices(pending...)
2✔
208

2✔
209
        if i.cfg.GcCanceledInvoicesOnStartup {
2✔
UNCOV
210
                log.Infof("Deleting canceled invoices")
×
UNCOV
211
                err = i.idb.DeleteCanceledInvoices(ctx)
×
UNCOV
212
                if err != nil {
×
213
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
214
                        return err
×
215
                }
×
216
        }
217

218
        return nil
2✔
219
}
220

221
// Start starts the registry and all goroutines it needs to carry out its task.
222
func (i *InvoiceRegistry) Start() error {
2✔
223
        var err error
2✔
224

2✔
225
        log.Info("InvoiceRegistry starting...")
2✔
226

2✔
227
        if i.started.Swap(true) {
2✔
228
                return fmt.Errorf("InvoiceRegistry started more than once")
×
229
        }
×
230
        // Start InvoiceExpiryWatcher and prepopulate it with existing
231
        // active invoices.
232
        err = i.expiryWatcher.Start(
2✔
233
                func(hash lntypes.Hash, force bool) error {
4✔
234
                        return i.cancelInvoiceImpl(
2✔
235
                                context.Background(), hash, force,
2✔
236
                        )
2✔
237
                })
2✔
238
        if err != nil {
2✔
239
                return err
×
240
        }
×
241

242
        i.wg.Add(1)
2✔
243
        go i.invoiceEventLoop()
2✔
244

2✔
245
        // Now scan all pending and removable invoices to the expiry
2✔
246
        // watcher or delete them.
2✔
247
        err = i.scanInvoicesOnStart(context.Background())
2✔
248
        if err != nil {
2✔
249
                _ = i.Stop()
×
250
        }
×
251

252
        log.Debug("InvoiceRegistry started")
2✔
253

2✔
254
        return err
2✔
255
}
256

257
// Stop signals the registry for a graceful shutdown.
258
func (i *InvoiceRegistry) Stop() error {
2✔
259
        log.Info("InvoiceRegistry shutting down...")
2✔
260

2✔
261
        if i.stopped.Swap(true) {
2✔
262
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
263
        }
×
264

265
        log.Info("InvoiceRegistry shutting down...")
2✔
266
        defer log.Debug("InvoiceRegistry shutdown complete")
2✔
267

2✔
268
        var err error
2✔
269
        if i.expiryWatcher == nil {
2✔
270
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
271
                        "initialized")
×
272
        } else {
2✔
273
                i.expiryWatcher.Stop()
2✔
274
        }
2✔
275

276
        close(i.quit)
2✔
277

2✔
278
        i.wg.Wait()
2✔
279

2✔
280
        log.Debug("InvoiceRegistry shutdown complete")
2✔
281

2✔
282
        return err
2✔
283
}
284

285
// invoiceEvent represents a new event that has modified on invoice on disk.
286
// Only two event types are currently supported: newly created invoices, and
287
// instance where invoices are settled.
288
type invoiceEvent struct {
289
        hash    lntypes.Hash
290
        invoice *Invoice
291
        setID   *[32]byte
292
}
293

294
// tickAt returns a channel that ticks at the specified time. If the time has
295
// already passed, it will tick immediately.
296
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
2✔
297
        now := i.cfg.Clock.Now()
2✔
298
        return i.cfg.Clock.TickAfter(t.Sub(now))
2✔
299
}
2✔
300

301
// invoiceEventLoop is the dedicated goroutine responsible for accepting
302
// new notification subscriptions, cancelling old subscriptions, and
303
// dispatching new invoice events.
304
func (i *InvoiceRegistry) invoiceEventLoop() {
2✔
305
        defer i.wg.Done()
2✔
306

2✔
307
        // Set up a heap for htlc auto-releases.
2✔
308
        autoReleaseHeap := &queue.PriorityQueue{}
2✔
309

2✔
310
        for {
4✔
311
                // If there is something to release, set up a release tick
2✔
312
                // channel.
2✔
313
                var nextReleaseTick <-chan time.Time
2✔
314
                if autoReleaseHeap.Len() > 0 {
4✔
315
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
2✔
316
                        nextReleaseTick = i.tickAt(head.releaseTime)
2✔
317
                }
2✔
318

319
                select {
2✔
320
                // A sub-systems has just modified the invoice state, so we'll
321
                // dispatch notifications to all registered clients.
322
                case event := <-i.invoiceEvents:
2✔
323
                        // For backwards compatibility, do not notify all
2✔
324
                        // invoice subscribers of cancel and accept events.
2✔
325
                        state := event.invoice.State
2✔
326
                        if state != ContractCanceled &&
2✔
327
                                state != ContractAccepted {
4✔
328

2✔
329
                                i.dispatchToClients(event)
2✔
330
                        }
2✔
331
                        i.dispatchToSingleClients(event)
2✔
332

333
                // A new htlc came in for auto-release.
334
                case event := <-i.htlcAutoReleaseChan:
2✔
335
                        log.Debugf("Scheduling auto-release for htlc: "+
2✔
336
                                "ref=%v, key=%v at %v",
2✔
337
                                event.invoiceRef, event.key, event.releaseTime)
2✔
338

2✔
339
                        // We use an independent timer for every htlc rather
2✔
340
                        // than a set timer that is reset with every htlc coming
2✔
341
                        // in. Otherwise the sender could keep resetting the
2✔
342
                        // timer until the broadcast window is entered and our
2✔
343
                        // channel is force closed.
2✔
344
                        autoReleaseHeap.Push(event)
2✔
345

346
                // The htlc at the top of the heap needs to be auto-released.
UNCOV
347
                case <-nextReleaseTick:
×
UNCOV
348
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
×
UNCOV
349
                        err := i.cancelSingleHtlc(
×
UNCOV
350
                                event.invoiceRef, event.key, ResultMppTimeout,
×
UNCOV
351
                        )
×
UNCOV
352
                        if err != nil {
×
353
                                log.Errorf("HTLC timer: %v", err)
×
354
                        }
×
355

356
                case <-i.quit:
2✔
357
                        return
2✔
358
                }
359
        }
360
}
361

362
// dispatchToSingleClients passes the supplied event to all notification
363
// clients that subscribed to all the invoice this event applies to.
364
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
2✔
365
        // Dispatch to single invoice subscribers.
2✔
366
        clients := i.copySingleClients()
2✔
367
        for _, client := range clients {
4✔
368
                payHash := client.invoiceRef.PayHash()
2✔
369

2✔
370
                if payHash == nil || *payHash != event.hash {
4✔
371
                        continue
2✔
372
                }
373

374
                select {
2✔
375
                case <-client.backlogDelivered:
2✔
376
                        // We won't deliver any events until the backlog has
377
                        // went through first.
378
                case <-i.quit:
×
379
                        return
×
380
                }
381

382
                client.notify(event)
2✔
383
        }
384
}
385

386
// dispatchToClients passes the supplied event to all notification clients that
387
// subscribed to all invoices. Add and settle indices are used to make sure
388
// that clients don't receive duplicate or unwanted events.
389
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
2✔
390
        invoice := event.invoice
2✔
391

2✔
392
        clients := i.copyClients()
2✔
393
        for clientID, client := range clients {
4✔
394
                // Before we dispatch this event, we'll check
2✔
395
                // to ensure that this client hasn't already
2✔
396
                // received this notification in order to
2✔
397
                // ensure we don't duplicate any events.
2✔
398

2✔
399
                // TODO(joostjager): Refactor switches.
2✔
400
                state := event.invoice.State
2✔
401
                switch {
2✔
402
                // If we've already sent this settle event to
403
                // the client, then we can skip this.
404
                case state == ContractSettled &&
405
                        client.settleIndex >= invoice.SettleIndex:
×
406
                        continue
×
407

408
                // Similarly, if we've already sent this add to
409
                // the client then we can skip this one, but only if this isn't
410
                // an AMP invoice. AMP invoices always remain in the settle
411
                // state as a base invoice.
412
                case event.setID == nil && state == ContractOpen &&
413
                        client.addIndex >= invoice.AddIndex:
×
414
                        continue
×
415

416
                // These two states should never happen, but we
417
                // log them just in case so we can detect this
418
                // instance.
419
                case state == ContractOpen &&
420
                        client.addIndex+1 != invoice.AddIndex:
2✔
421
                        log.Warnf("client=%v for invoice "+
2✔
422
                                "notifications missed an update, "+
2✔
423
                                "add_index=%v, new add event index=%v",
2✔
424
                                clientID, client.addIndex,
2✔
425
                                invoice.AddIndex)
2✔
426

427
                case state == ContractSettled &&
428
                        client.settleIndex+1 != invoice.SettleIndex:
2✔
429
                        log.Warnf("client=%v for invoice "+
2✔
430
                                "notifications missed an update, "+
2✔
431
                                "settle_index=%v, new settle event index=%v",
2✔
432
                                clientID, client.settleIndex,
2✔
433
                                invoice.SettleIndex)
2✔
434
                }
435

436
                select {
2✔
437
                case <-client.backlogDelivered:
2✔
438
                        // We won't deliver any events until the backlog has
439
                        // been processed.
440
                case <-i.quit:
×
441
                        return
×
442
                }
443

444
                err := client.notify(&invoiceEvent{
2✔
445
                        invoice: invoice,
2✔
446
                        setID:   event.setID,
2✔
447
                })
2✔
448
                if err != nil {
2✔
449
                        log.Errorf("Failed dispatching to client: %v", err)
×
450
                        return
×
451
                }
×
452

453
                // Each time we send a notification to a client, we'll record
454
                // the latest add/settle index it has. We'll use this to ensure
455
                // we don't send a notification twice, which can happen if a new
456
                // event is added while we're catching up a new client.
457
                invState := event.invoice.State
2✔
458
                switch {
2✔
459
                case invState == ContractSettled:
2✔
460
                        client.settleIndex = invoice.SettleIndex
2✔
461

462
                case invState == ContractOpen && event.setID == nil:
2✔
463
                        client.addIndex = invoice.AddIndex
2✔
464

465
                // If this is an AMP invoice, then we'll need to use the set ID
466
                // to keep track of the settle index of the client. AMP
467
                // invoices never go to the open state, but if a setID is
468
                // passed, then we know it was just settled and will track the
469
                // highest settle index so far.
470
                case invState == ContractOpen && event.setID != nil:
2✔
471
                        setID := *event.setID
2✔
472
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
2✔
473

474
                default:
×
475
                        log.Errorf("unexpected invoice state: %v",
×
476
                                event.invoice.State)
×
477
                }
478
        }
479
}
480

481
// deliverBacklogEvents will attempts to query the invoice database for any
482
// notifications that the client has missed since it reconnected last.
483
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
484
        client *InvoiceSubscription) error {
2✔
485

2✔
486
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
2✔
487
        if err != nil {
2✔
488
                return err
×
489
        }
×
490

491
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
2✔
492
        if err != nil {
2✔
493
                return err
×
494
        }
×
495

496
        // If we have any to deliver, then we'll append them to the end of the
497
        // notification queue in order to catch up the client before delivering
498
        // any new notifications.
499
        for _, addEvent := range addEvents {
4✔
500
                // We re-bind the loop variable to ensure we don't hold onto
2✔
501
                // the loop reference causing is to point to the same item.
2✔
502
                addEvent := addEvent
2✔
503

2✔
504
                select {
2✔
505
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
506
                        invoice: &addEvent,
507
                }:
2✔
508
                case <-i.quit:
×
509
                        return ErrShuttingDown
×
510
                }
511
        }
512

513
        for _, settleEvent := range settleEvents {
4✔
514
                // We re-bind the loop variable to ensure we don't hold onto
2✔
515
                // the loop reference causing is to point to the same item.
2✔
516
                settleEvent := settleEvent
2✔
517

2✔
518
                select {
2✔
519
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
520
                        invoice: &settleEvent,
521
                }:
2✔
522
                case <-i.quit:
×
523
                        return ErrShuttingDown
×
524
                }
525
        }
526

527
        return nil
2✔
528
}
529

530
// deliverSingleBacklogEvents will attempt to query the invoice database to
531
// retrieve the current invoice state and deliver this to the subscriber. Single
532
// invoice subscribers will always receive the current state right after
533
// subscribing. Only in case the invoice does not yet exist, nothing is sent
534
// yet.
535
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
536
        client *SingleInvoiceSubscription) error {
2✔
537

2✔
538
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
2✔
539

2✔
540
        // It is possible that the invoice does not exist yet, but the client is
2✔
541
        // already watching it in anticipation.
2✔
542
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
2✔
543
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
2✔
544
        if isNotFound || isNotCreated {
4✔
545
                return nil
2✔
546
        }
2✔
547
        if err != nil {
2✔
548
                return err
×
549
        }
×
550

551
        payHash := client.invoiceRef.PayHash()
2✔
552
        if payHash == nil {
2✔
553
                return nil
×
554
        }
×
555

556
        err = client.notify(&invoiceEvent{
2✔
557
                hash:    *payHash,
2✔
558
                invoice: &invoice,
2✔
559
        })
2✔
560
        if err != nil {
2✔
561
                return err
×
562
        }
×
563

564
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
2✔
565
                client.id, payHash)
2✔
566

2✔
567
        return nil
2✔
568
}
569

570
// AddInvoice adds a regular invoice for the specified amount, identified by
571
// the passed preimage. Additionally, any memo or receipt data provided will
572
// also be stored on-disk. Once this invoice is added, subsystems within the
573
// daemon add/forward HTLCs are able to obtain the proper preimage required for
574
// redemption in the case that we're the final destination. We also return the
575
// addIndex of the newly created invoice which monotonically increases for each
576
// new invoice added.  A side effect of this function is that it also sets
577
// AddIndex on the invoice argument.
578
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
579
        paymentHash lntypes.Hash) (uint64, error) {
2✔
580

2✔
581
        i.Lock()
2✔
582

2✔
583
        ref := InvoiceRefByHash(paymentHash)
2✔
584
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
2✔
585

2✔
586
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
2✔
587
        if err != nil {
4✔
588
                i.Unlock()
2✔
589
                return 0, err
2✔
590
        }
2✔
591

592
        // Now that we've added the invoice, we'll send dispatch a message to
593
        // notify the clients of this new invoice.
594
        i.notifyClients(paymentHash, invoice, nil)
2✔
595
        i.Unlock()
2✔
596

2✔
597
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
2✔
598
        // to avoid deadlock when a new invoice is added while an other is being
2✔
599
        // canceled.
2✔
600
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
2✔
601
        if invoiceExpiryRef != nil {
4✔
602
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
2✔
603
        }
2✔
604

605
        return addIndex, nil
2✔
606
}
607

608
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
609
// then we're able to pull the funds pending within an HTLC.
610
//
611
// TODO(roasbeef): ignore if settled?
612
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
613
        rHash lntypes.Hash) (Invoice, error) {
2✔
614

2✔
615
        // We'll check the database to see if there's an existing matching
2✔
616
        // invoice.
2✔
617
        ref := InvoiceRefByHash(rHash)
2✔
618
        return i.idb.LookupInvoice(ctx, ref)
2✔
619
}
2✔
620

621
// LookupInvoiceByRef looks up an invoice by the given reference, if found
622
// then we're able to pull the funds pending within an HTLC.
623
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
624
        ref InvoiceRef) (Invoice, error) {
2✔
625

2✔
626
        return i.idb.LookupInvoice(ctx, ref)
2✔
627
}
2✔
628

629
// startHtlcTimer starts a new timer via the invoice registry main loop that
630
// cancels a single htlc on an invoice when the htlc hold duration has passed.
631
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
632
        key CircuitKey, acceptTime time.Time) error {
2✔
633

2✔
634
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
2✔
635
        event := &htlcReleaseEvent{
2✔
636
                invoiceRef:  invoiceRef,
2✔
637
                key:         key,
2✔
638
                releaseTime: releaseTime,
2✔
639
        }
2✔
640

2✔
641
        select {
2✔
642
        case i.htlcAutoReleaseChan <- event:
2✔
643
                return nil
2✔
644

645
        case <-i.quit:
×
646
                return ErrShuttingDown
×
647
        }
648
}
649

650
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
651
// a resolution result which will be used to notify subscribed links and
652
// resolvers of the details of the htlc cancellation.
653
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
UNCOV
654
        key CircuitKey, result FailResolutionResult) error {
×
UNCOV
655

×
UNCOV
656
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
×
UNCOV
657
                // Only allow individual htlc cancellation on open invoices.
×
UNCOV
658
                if invoice.State != ContractOpen {
×
UNCOV
659
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
×
UNCOV
660
                                "open", invoiceRef)
×
UNCOV
661

×
UNCOV
662
                        return nil, nil
×
UNCOV
663
                }
×
664

665
                // Lookup the current status of the htlc in the database.
UNCOV
666
                var (
×
UNCOV
667
                        htlcState HtlcState
×
UNCOV
668
                        setID     *SetID
×
UNCOV
669
                )
×
UNCOV
670
                htlc, ok := invoice.Htlcs[key]
×
UNCOV
671
                if !ok {
×
672
                        // If this is an AMP invoice, then all the HTLCs won't
×
673
                        // be read out, so we'll consult the other mapping to
×
674
                        // try to find the HTLC state in question here.
×
675
                        var found bool
×
676
                        for ampSetID, htlcSet := range invoice.AMPState {
×
677
                                ampSetID := ampSetID
×
678
                                for htlcKey := range htlcSet.InvoiceKeys {
×
679
                                        if htlcKey == key {
×
680
                                                htlcState = htlcSet.State
×
681
                                                setID = &ampSetID
×
682

×
683
                                                found = true
×
684
                                                break
×
685
                                        }
686
                                }
687
                        }
688

689
                        if !found {
×
690
                                return nil, fmt.Errorf("htlc %v not found", key)
×
691
                        }
×
UNCOV
692
                } else {
×
UNCOV
693
                        htlcState = htlc.State
×
UNCOV
694
                }
×
695

696
                // Cancellation is only possible if the htlc wasn't already
697
                // resolved.
UNCOV
698
                if htlcState != HtlcStateAccepted {
×
699
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
700
                                "is already resolved", key, invoiceRef)
×
701

×
702
                        return nil, nil
×
703
                }
×
704

UNCOV
705
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
×
UNCOV
706
                        key, invoiceRef)
×
UNCOV
707

×
UNCOV
708
                // Return an update descriptor that cancels htlc and keeps
×
UNCOV
709
                // invoice open.
×
UNCOV
710
                canceledHtlcs := map[CircuitKey]struct{}{
×
UNCOV
711
                        key: {},
×
UNCOV
712
                }
×
UNCOV
713

×
UNCOV
714
                return &InvoiceUpdateDesc{
×
UNCOV
715
                        UpdateType:  CancelHTLCsUpdate,
×
UNCOV
716
                        CancelHtlcs: canceledHtlcs,
×
UNCOV
717
                        SetID:       setID,
×
UNCOV
718
                }, nil
×
719
        }
720

721
        // Try to mark the specified htlc as canceled in the invoice database.
722
        // Intercept the update descriptor to set the local updated variable. If
723
        // no invoice update is performed, we can return early.
UNCOV
724
        setID := (*SetID)(invoiceRef.SetID())
×
NEW
725
        // If the setID is nil, we make sure we do NOT fetch any AMP HTLCs.
×
NEW
726
        if setID == nil {
×
NEW
727
                set := SetID(BlankPayAddr)
×
NEW
728
                setID = &set
×
NEW
729
        }
×
730

UNCOV
731
        var updated bool
×
UNCOV
732
        invoice, err := i.idb.UpdateInvoice(
×
UNCOV
733
                context.Background(), invoiceRef, setID,
×
UNCOV
734
                func(invoice *Invoice) (
×
UNCOV
735
                        *InvoiceUpdateDesc, error) {
×
UNCOV
736

×
UNCOV
737
                        updateDesc, err := updateInvoice(invoice)
×
UNCOV
738
                        if err != nil {
×
739
                                return nil, err
×
740
                        }
×
UNCOV
741
                        updated = updateDesc != nil
×
UNCOV
742

×
UNCOV
743
                        return updateDesc, err
×
744
                },
745
        )
UNCOV
746
        if err != nil {
×
747
                return err
×
748
        }
×
UNCOV
749
        if !updated {
×
UNCOV
750
                return nil
×
UNCOV
751
        }
×
752

753
        // The invoice has been updated. Notify subscribers of the htlc
754
        // resolution.
UNCOV
755
        htlc, ok := invoice.Htlcs[key]
×
UNCOV
756
        if !ok {
×
757
                return fmt.Errorf("htlc %v not found", key)
×
758
        }
×
UNCOV
759
        if htlc.State == HtlcStateCanceled {
×
UNCOV
760
                resolution := NewFailResolution(
×
UNCOV
761
                        key, int32(htlc.AcceptHeight), result,
×
UNCOV
762
                )
×
UNCOV
763

×
UNCOV
764
                i.notifyHodlSubscribers(resolution)
×
UNCOV
765
        }
×
UNCOV
766
        return nil
×
767
}
768

769
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
770
// htlc.
771
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
2✔
772
        // Retrieve keysend record if present.
2✔
773
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
2✔
774
        if !ok {
4✔
775
                return nil
2✔
776
        }
2✔
777

778
        // Cancel htlc is preimage is invalid.
779
        preimage, err := lntypes.MakePreimage(preimageSlice)
2✔
780
        if err != nil {
2✔
UNCOV
781
                return err
×
UNCOV
782
        }
×
783
        if preimage.Hash() != ctx.hash {
2✔
784
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
785
                        preimage, ctx.hash)
×
786
        }
×
787

788
        // Only allow keysend for non-mpp payments.
789
        if ctx.mpp != nil {
2✔
790
                return errors.New("no mpp keysend supported")
×
791
        }
×
792

793
        // Create an invoice for the htlc amount.
794
        amt := ctx.amtPaid
2✔
795

2✔
796
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
2✔
797
        // be able to pay to it with keysend.
2✔
798
        rawFeatures := lnwire.NewRawFeatureVector(
2✔
799
                lnwire.TLVOnionPayloadRequired,
2✔
800
        )
2✔
801
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
2✔
802

2✔
803
        // Use the minimum block delta that we require for settling htlcs.
2✔
804
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
2✔
805

2✔
806
        // Pre-check expiry here to prevent inserting an invoice that will not
2✔
807
        // be settled.
2✔
808
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
2✔
809
                return errors.New("final expiry too soon")
×
810
        }
×
811

812
        // The invoice database indexes all invoices by payment address, however
813
        // legacy keysend payment do not have one. In order to avoid a new
814
        // payment type on-disk wrt. to indexing, we'll continue to insert a
815
        // blank payment address which is special cased in the insertion logic
816
        // to not be indexed. In the future, once AMP is merged, this should be
817
        // replaced by generating a random payment address on the behalf of the
818
        // sender.
819
        payAddr := BlankPayAddr
2✔
820

2✔
821
        // Create placeholder invoice.
2✔
822
        invoice := &Invoice{
2✔
823
                CreationDate: i.cfg.Clock.Now(),
2✔
824
                Terms: ContractTerm{
2✔
825
                        FinalCltvDelta:  finalCltvDelta,
2✔
826
                        Value:           amt,
2✔
827
                        PaymentPreimage: &preimage,
2✔
828
                        PaymentAddr:     payAddr,
2✔
829
                        Features:        features,
2✔
830
                },
2✔
831
        }
2✔
832

2✔
833
        if i.cfg.KeysendHoldTime != 0 {
2✔
UNCOV
834
                invoice.HodlInvoice = true
×
UNCOV
835
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
×
UNCOV
836
        }
×
837

838
        // Insert invoice into database. Ignore duplicates, because this
839
        // may be a replay.
840
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
2✔
841
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
2✔
842
                return err
×
843
        }
×
844

845
        return nil
2✔
846
}
847

848
// processAMP just-in-time inserts an invoice if this htlc is a keysend
849
// htlc.
850
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
2✔
851
        // AMP payments MUST also include an MPP record.
2✔
852
        if ctx.mpp == nil {
2✔
UNCOV
853
                return errors.New("no MPP record for AMP")
×
UNCOV
854
        }
×
855

856
        // Create an invoice for the total amount expected, provided in the MPP
857
        // record.
858
        amt := ctx.mpp.TotalMsat()
2✔
859

2✔
860
        // Set the TLV required and MPP optional features on the invoice. We'll
2✔
861
        // also make the AMP features required so that it can't be paid by
2✔
862
        // legacy or MPP htlcs.
2✔
863
        rawFeatures := lnwire.NewRawFeatureVector(
2✔
864
                lnwire.TLVOnionPayloadRequired,
2✔
865
                lnwire.PaymentAddrOptional,
2✔
866
                lnwire.AMPRequired,
2✔
867
        )
2✔
868
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
2✔
869

2✔
870
        // Use the minimum block delta that we require for settling htlcs.
2✔
871
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
2✔
872

2✔
873
        // Pre-check expiry here to prevent inserting an invoice that will not
2✔
874
        // be settled.
2✔
875
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
2✔
876
                return errors.New("final expiry too soon")
×
877
        }
×
878

879
        // We'll use the sender-generated payment address provided in the HTLC
880
        // to create our AMP invoice.
881
        payAddr := ctx.mpp.PaymentAddr()
2✔
882

2✔
883
        // Create placeholder invoice.
2✔
884
        invoice := &Invoice{
2✔
885
                CreationDate: i.cfg.Clock.Now(),
2✔
886
                Terms: ContractTerm{
2✔
887
                        FinalCltvDelta:  finalCltvDelta,
2✔
888
                        Value:           amt,
2✔
889
                        PaymentPreimage: nil,
2✔
890
                        PaymentAddr:     payAddr,
2✔
891
                        Features:        features,
2✔
892
                },
2✔
893
        }
2✔
894

2✔
895
        // Insert invoice into database. Ignore duplicates payment hashes and
2✔
896
        // payment addrs, this may be a replay or a different HTLC for the AMP
2✔
897
        // invoice.
2✔
898
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
2✔
899
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
2✔
900
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
2✔
901
        switch {
2✔
902
        case isDuplicatedInvoice || isDuplicatedPayAddr:
2✔
903
                return nil
2✔
904
        default:
2✔
905
                return err
2✔
906
        }
907
}
908

909
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
910
// describes how the htlc should be resolved.
911
//
912
// When the preimage of the invoice is not yet known (hodl invoice), this
913
// function moves the invoice to the accepted state. When SettleHoldInvoice is
914
// called later, a resolution message will be send back to the caller via the
915
// provided hodlChan. Invoice registry sends on this channel what action needs
916
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
917
// the channel is either buffered or received on from another goroutine to
918
// prevent deadlock.
919
//
920
// In the case that the htlc is part of a larger set of htlcs that pay to the
921
// same invoice (multi-path payment), the htlc is held until the set is
922
// complete. If the set doesn't fully arrive in time, a timer will cancel the
923
// held htlc.
924
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
925
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
926
        circuitKey CircuitKey, hodlChan chan<- interface{},
927
        wireCustomRecords lnwire.CustomRecords,
928
        payload Payload) (HtlcResolution, error) {
2✔
929

2✔
930
        // Create the update context containing the relevant details of the
2✔
931
        // incoming htlc.
2✔
932
        ctx := invoiceUpdateCtx{
2✔
933
                hash:                 rHash,
2✔
934
                circuitKey:           circuitKey,
2✔
935
                amtPaid:              amtPaid,
2✔
936
                expiry:               expiry,
2✔
937
                currentHeight:        currentHeight,
2✔
938
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
2✔
939
                wireCustomRecords:    wireCustomRecords,
2✔
940
                customRecords:        payload.CustomRecords(),
2✔
941
                mpp:                  payload.MultiPath(),
2✔
942
                amp:                  payload.AMPRecord(),
2✔
943
                metadata:             payload.Metadata(),
2✔
944
                pathID:               payload.PathID(),
2✔
945
                totalAmtMsat:         payload.TotalAmtMsat(),
2✔
946
        }
2✔
947

2✔
948
        switch {
2✔
949
        // If we are accepting spontaneous AMP payments and this payload
950
        // contains an AMP record, create an AMP invoice that will be settled
951
        // below.
952
        case i.cfg.AcceptAMP && ctx.amp != nil:
2✔
953
                err := i.processAMP(ctx)
2✔
954
                if err != nil {
2✔
UNCOV
955
                        ctx.log(fmt.Sprintf("amp error: %v", err))
×
UNCOV
956

×
UNCOV
957
                        return NewFailResolution(
×
UNCOV
958
                                circuitKey, currentHeight, ResultAmpError,
×
UNCOV
959
                        ), nil
×
UNCOV
960
                }
×
961

962
        // If we are accepting spontaneous keysend payments, create a regular
963
        // invoice that will be settled below. We also enforce that this is only
964
        // done when no AMP payload is present since it will only be settle-able
965
        // by regular HTLCs.
966
        case i.cfg.AcceptKeySend && ctx.amp == nil:
2✔
967
                err := i.processKeySend(ctx)
2✔
968
                if err != nil {
2✔
UNCOV
969
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
×
UNCOV
970

×
UNCOV
971
                        return NewFailResolution(
×
UNCOV
972
                                circuitKey, currentHeight, ResultKeySendError,
×
UNCOV
973
                        ), nil
×
UNCOV
974
                }
×
975
        }
976

977
        // Execute locked notify exit hop logic.
978
        i.Lock()
2✔
979
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
2✔
980
                &ctx, hodlChan,
2✔
981
        )
2✔
982
        i.Unlock()
2✔
983
        if err != nil {
4✔
984
                return nil, err
2✔
985
        }
2✔
986

987
        if invoiceToExpire != nil {
4✔
988
                i.expiryWatcher.AddInvoices(invoiceToExpire)
2✔
989
        }
2✔
990

991
        switch r := resolution.(type) {
2✔
992
        // The htlc is held. Start a timer outside the lock if the htlc should
993
        // be auto-released, because otherwise a deadlock may happen with the
994
        // main event loop.
995
        case *htlcAcceptResolution:
2✔
996
                if r.autoRelease {
4✔
997
                        var invRef InvoiceRef
2✔
998
                        if ctx.amp != nil {
4✔
999
                                invRef = InvoiceRefBySetID(*ctx.setID())
2✔
1000
                        } else {
4✔
1001
                                invRef = ctx.invoiceRef()
2✔
1002
                        }
2✔
1003

1004
                        err := i.startHtlcTimer(
2✔
1005
                                invRef, circuitKey, r.acceptTime,
2✔
1006
                        )
2✔
1007
                        if err != nil {
2✔
1008
                                return nil, err
×
1009
                        }
×
1010
                }
1011

1012
                // We return a nil resolution because htlc acceptances are
1013
                // represented as nil resolutions externally.
1014
                // TODO(carla) update calling code to handle accept resolutions.
1015
                return nil, nil
2✔
1016

1017
        // A direct resolution was received for this htlc.
1018
        case HtlcResolution:
2✔
1019
                return r, nil
2✔
1020

1021
        // Fail if an unknown resolution type was received.
1022
        default:
×
1023
                return nil, errors.New("invalid resolution type")
×
1024
        }
1025
}
1026

1027
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1028
// that should be executed inside the registry lock. The returned invoiceExpiry
1029
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1030
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1031
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1032
        HtlcResolution, invoiceExpiry, error) {
2✔
1033

2✔
1034
        invoiceRef := ctx.invoiceRef()
2✔
1035

2✔
1036
        // If the setID is nil, we make sure we do NOT fetch any AMP HTLCs.
2✔
1037
        setID := (*SetID)(ctx.setID())
2✔
1038
        if setID == nil {
4✔
1039
                set := SetID(BlankPayAddr)
2✔
1040
                setID = &set
2✔
1041
        }
2✔
1042

1043
        // We need to look up the current state of the invoice in order to send
1044
        // the previously accepted/settled HTLCs to the interceptor.
1045
        existingInvoice, err := i.idb.LookupInvoice(
2✔
1046
                context.Background(), invoiceRef,
2✔
1047
        )
2✔
1048
        switch {
2✔
1049
        case errors.Is(err, ErrInvoiceNotFound) ||
1050
                errors.Is(err, ErrNoInvoicesCreated):
2✔
1051

2✔
1052
                // If the invoice was not found, return a failure resolution
2✔
1053
                // with an invoice not found result.
2✔
1054
                return NewFailResolution(
2✔
1055
                        ctx.circuitKey, ctx.currentHeight,
2✔
1056
                        ResultInvoiceNotFound,
2✔
1057
                ), nil, nil
2✔
1058

1059
        case err != nil:
×
1060
                ctx.log(err.Error())
×
1061
                return nil, nil, err
×
1062
        }
1063

1064
        var cancelSet bool
2✔
1065

2✔
1066
        // Provide the invoice to the settlement interceptor to allow
2✔
1067
        // the interceptor's client an opportunity to manipulate the
2✔
1068
        // settlement process.
2✔
1069
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
2✔
1070
                WireCustomRecords:  ctx.wireCustomRecords,
2✔
1071
                ExitHtlcCircuitKey: ctx.circuitKey,
2✔
1072
                ExitHtlcAmt:        ctx.amtPaid,
2✔
1073
                ExitHtlcExpiry:     ctx.expiry,
2✔
1074
                CurrentHeight:      uint32(ctx.currentHeight),
2✔
1075
                Invoice:            existingInvoice,
2✔
1076
        }, func(resp HtlcModifyResponse) {
4✔
1077
                log.Debugf("Received invoice HTLC interceptor response: %v",
2✔
1078
                        resp)
2✔
1079

2✔
1080
                if resp.AmountPaid != 0 {
4✔
1081
                        ctx.amtPaid = resp.AmountPaid
2✔
1082
                }
2✔
1083

1084
                cancelSet = resp.CancelSet
2✔
1085
        })
1086
        if err != nil {
4✔
1087
                err := fmt.Errorf("error during invoice HTLC interception: %w",
2✔
1088
                        err)
2✔
1089
                ctx.log(err.Error())
2✔
1090

2✔
1091
                return nil, nil, err
2✔
1092
        }
2✔
1093

1094
        // We'll attempt to settle an invoice matching this rHash on disk (if
1095
        // one exists). The callback will update the invoice state and/or htlcs.
1096
        var (
2✔
1097
                resolution        HtlcResolution
2✔
1098
                updateSubscribers bool
2✔
1099
        )
2✔
1100
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
4✔
1101
                updateDesc, res, err := updateInvoice(ctx, inv)
2✔
1102
                if err != nil {
2✔
1103
                        return nil, err
×
1104
                }
×
1105

1106
                // Only send an update if the invoice state was changed.
1107
                updateSubscribers = updateDesc != nil &&
2✔
1108
                        updateDesc.State != nil
2✔
1109

2✔
1110
                // Assign resolution to outer scope variable.
2✔
1111
                if cancelSet {
2✔
1112
                        // If a cancel signal was set for the htlc set, we set
×
1113
                        // the resolution as a failure with an underpayment
×
1114
                        // indication. Something was wrong with this htlc, so
×
1115
                        // we probably can't settle the invoice at all.
×
1116
                        resolution = NewFailResolution(
×
1117
                                ctx.circuitKey, ctx.currentHeight,
×
1118
                                ResultAmountTooLow,
×
1119
                        )
×
1120
                } else {
2✔
1121
                        resolution = res
2✔
1122
                }
2✔
1123

1124
                return updateDesc, nil
2✔
1125
        }
1126

1127
        invoice, err := i.idb.UpdateInvoice(
2✔
1128
                context.Background(), invoiceRef, setID, callback,
2✔
1129
        )
2✔
1130

2✔
1131
        var duplicateSetIDErr ErrDuplicateSetID
2✔
1132
        if errors.As(err, &duplicateSetIDErr) {
2✔
1133
                return NewFailResolution(
×
1134
                        ctx.circuitKey, ctx.currentHeight,
×
1135
                        ResultInvoiceNotFound,
×
1136
                ), nil, nil
×
1137
        }
×
1138

1139
        switch {
2✔
1140
        case errors.Is(err, ErrInvoiceNotFound):
×
1141
                // If the invoice was not found, return a failure resolution
×
1142
                // with an invoice not found result.
×
1143
                return NewFailResolution(
×
1144
                        ctx.circuitKey, ctx.currentHeight,
×
1145
                        ResultInvoiceNotFound,
×
1146
                ), nil, nil
×
1147

1148
        case errors.Is(err, ErrInvRefEquivocation):
×
1149
                return NewFailResolution(
×
1150
                        ctx.circuitKey, ctx.currentHeight,
×
1151
                        ResultInvoiceNotFound,
×
1152
                ), nil, nil
×
1153

1154
        case err == nil:
2✔
1155

1156
        default:
×
1157
                ctx.log(err.Error())
×
1158
                return nil, nil, err
×
1159
        }
1160

1161
        var invoiceToExpire invoiceExpiry
2✔
1162

2✔
1163
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
2✔
1164

2✔
1165
        switch res := resolution.(type) {
2✔
1166
        case *HtlcFailResolution:
2✔
1167
                // Inspect latest htlc state on the invoice. If it is found,
2✔
1168
                // we will update the accept height as it was recorded in the
2✔
1169
                // invoice database (which occurs in the case where the htlc
2✔
1170
                // reached the database in a previous call). If the htlc was
2✔
1171
                // not found on the invoice, it was immediately failed so we
2✔
1172
                // send the failure resolution as is, which has the current
2✔
1173
                // height set as the accept height.
2✔
1174
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
2✔
1175
                if ok {
4✔
1176
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
2✔
1177
                }
2✔
1178

1179
                ctx.log(fmt.Sprintf("failure resolution result "+
2✔
1180
                        "outcome: %v, at accept height: %v",
2✔
1181
                        res.Outcome, res.AcceptHeight))
2✔
1182

2✔
1183
                // Some failures apply to the entire HTLC set. Break here if
2✔
1184
                // this isn't one of them.
2✔
1185
                if !res.Outcome.IsSetFailure() {
4✔
1186
                        break
2✔
1187
                }
1188

1189
                // Also cancel any HTLCs in the HTLC set that are also in the
1190
                // canceled state with the same failure result.
UNCOV
1191
                setID := ctx.setID()
×
UNCOV
1192
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
×
UNCOV
1193
                for key, htlc := range canceledHtlcSet {
×
UNCOV
1194
                        htlcFailResolution := NewFailResolution(
×
UNCOV
1195
                                key, int32(htlc.AcceptHeight), res.Outcome,
×
UNCOV
1196
                        )
×
UNCOV
1197

×
UNCOV
1198
                        i.notifyHodlSubscribers(htlcFailResolution)
×
UNCOV
1199
                }
×
1200

1201
        // If the htlc was settled, we will settle any previously accepted
1202
        // htlcs and notify our peer to settle them.
1203
        case *HtlcSettleResolution:
2✔
1204
                ctx.log(fmt.Sprintf("settle resolution result "+
2✔
1205
                        "outcome: %v, at accept height: %v",
2✔
1206
                        res.Outcome, res.AcceptHeight))
2✔
1207

2✔
1208
                // Also settle any previously accepted htlcs. If a htlc is
2✔
1209
                // marked as settled, we should follow now and settle the htlc
2✔
1210
                // with our peer.
2✔
1211
                setID := ctx.setID()
2✔
1212
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
2✔
1213
                for key, htlc := range settledHtlcSet {
4✔
1214
                        preimage := res.Preimage
2✔
1215
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
4✔
1216
                                preimage = *htlc.AMP.Preimage
2✔
1217
                        }
2✔
1218

1219
                        // Notify subscribers that the htlcs should be settled
1220
                        // with our peer. Note that the outcome of the
1221
                        // resolution is set based on the outcome of the single
1222
                        // htlc that we just settled, so may not be accurate
1223
                        // for all htlcs.
1224
                        htlcSettleResolution := NewSettleResolution(
2✔
1225
                                preimage, key,
2✔
1226
                                int32(htlc.AcceptHeight), res.Outcome,
2✔
1227
                        )
2✔
1228

2✔
1229
                        // Notify subscribers that the htlc should be settled
2✔
1230
                        // with our peer.
2✔
1231
                        i.notifyHodlSubscribers(htlcSettleResolution)
2✔
1232
                }
1233

1234
                // If concurrent payments were attempted to this invoice before
1235
                // the current one was ultimately settled, cancel back any of
1236
                // the HTLCs immediately. As a result of the settle, the HTLCs
1237
                // in other HTLC sets are automatically converted to a canceled
1238
                // state when updating the invoice.
1239
                //
1240
                // TODO(roasbeef): can remove now??
1241
                canceledHtlcSet := invoice.HTLCSetCompliment(
2✔
1242
                        setID, HtlcStateCanceled,
2✔
1243
                )
2✔
1244
                for key, htlc := range canceledHtlcSet {
2✔
1245
                        htlcFailResolution := NewFailResolution(
×
1246
                                key, int32(htlc.AcceptHeight),
×
1247
                                ResultInvoiceAlreadySettled,
×
1248
                        )
×
1249

×
1250
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1251
                }
×
1252

1253
        // If we accepted the htlc, subscribe to the hodl invoice and return
1254
        // an accept resolution with the htlc's accept time on it.
1255
        case *htlcAcceptResolution:
2✔
1256
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
2✔
1257
                if !ok {
2✔
1258
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1259
                                " present on invoice: %x", ctx.circuitKey,
×
1260
                                ctx.hash[:])
×
1261
                }
×
1262

1263
                // Determine accepted height of this htlc. If the htlc reached
1264
                // the invoice database (possibly in a previous call to the
1265
                // invoice registry), we'll take the original accepted height
1266
                // as it was recorded in the database.
1267
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
2✔
1268

2✔
1269
                ctx.log(fmt.Sprintf("accept resolution result "+
2✔
1270
                        "outcome: %v, at accept height: %v",
2✔
1271
                        res.outcome, acceptHeight))
2✔
1272

2✔
1273
                // Auto-release the htlc if the invoice is still open. It can
2✔
1274
                // only happen for mpp payments that there are htlcs in state
2✔
1275
                // Accepted while the invoice is Open.
2✔
1276
                if invoice.State == ContractOpen {
4✔
1277
                        res.acceptTime = invoiceHtlc.AcceptTime
2✔
1278
                        res.autoRelease = true
2✔
1279
                }
2✔
1280

1281
                // If we have fully accepted the set of htlcs for this invoice,
1282
                // we can now add it to our invoice expiry watcher. We do not
1283
                // add invoices before they are fully accepted, because it is
1284
                // possible that we MppTimeout the htlcs, and then our relevant
1285
                // expiry height could change.
1286
                if res.outcome == resultAccepted {
4✔
1287
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
2✔
1288
                }
2✔
1289

1290
                // Subscribe to the resolution if the caller specified a
1291
                // notification channel.
1292
                if hodlChan != nil {
4✔
1293
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
2✔
1294
                }
2✔
1295

1296
        default:
×
1297
                panic("unknown action")
×
1298
        }
1299

1300
        // Now that the links have been notified of any state changes to their
1301
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1302
        // state changes.
1303
        if updateSubscribers {
4✔
1304
                // We'll add a setID onto the notification, but only if this is
2✔
1305
                // an AMP invoice being settled.
2✔
1306
                var setID *[32]byte
2✔
1307
                if _, ok := resolution.(*HtlcSettleResolution); ok {
4✔
1308
                        setID = ctx.setID()
2✔
1309
                }
2✔
1310

1311
                i.notifyClients(ctx.hash, invoice, setID)
2✔
1312
        }
1313

1314
        return resolution, invoiceToExpire, nil
2✔
1315
}
1316

1317
// SettleHodlInvoice sets the preimage of a hodl invoice.
1318
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1319
        preimage lntypes.Preimage) error {
2✔
1320

2✔
1321
        i.Lock()
2✔
1322
        defer i.Unlock()
2✔
1323

2✔
1324
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
4✔
1325
                switch invoice.State {
2✔
1326
                case ContractOpen:
×
1327
                        return nil, ErrInvoiceStillOpen
×
1328

1329
                case ContractCanceled:
×
1330
                        return nil, ErrInvoiceAlreadyCanceled
×
1331

UNCOV
1332
                case ContractSettled:
×
UNCOV
1333
                        return nil, ErrInvoiceAlreadySettled
×
1334
                }
1335

1336
                return &InvoiceUpdateDesc{
2✔
1337
                        UpdateType: SettleHodlInvoiceUpdate,
2✔
1338
                        State: &InvoiceStateUpdateDesc{
2✔
1339
                                NewState: ContractSettled,
2✔
1340
                                Preimage: &preimage,
2✔
1341
                        },
2✔
1342
                }, nil
2✔
1343
        }
1344

1345
        hash := preimage.Hash()
2✔
1346
        invoiceRef := InvoiceRefByHash(hash)
2✔
1347

2✔
1348
        // AMP hold invoices are not supported however if we would support them
2✔
1349
        // we would need to fetch all AMP HTLCs here for the invoice.
2✔
1350
        setID := (*SetID)(nil)
2✔
1351

2✔
1352
        invoice, err := i.idb.UpdateInvoice(
2✔
1353
                ctx, invoiceRef, setID, updateInvoice,
2✔
1354
        )
2✔
1355
        if err != nil {
2✔
UNCOV
1356
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
×
UNCOV
1357
                        preimage, err)
×
UNCOV
1358

×
UNCOV
1359
                return err
×
UNCOV
1360
        }
×
1361

1362
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
2✔
1363
                invoice.Terms.PaymentPreimage)
2✔
1364

2✔
1365
        // In the callback, we marked the invoice as settled. UpdateInvoice will
2✔
1366
        // have seen this and should have moved all htlcs that were accepted to
2✔
1367
        // the settled state. In the loop below, we go through all of these and
2✔
1368
        // notify links and resolvers that are waiting for resolution. Any htlcs
2✔
1369
        // that were already settled before, will be notified again. This isn't
2✔
1370
        // necessary but doesn't hurt either.
2✔
1371
        for key, htlc := range invoice.Htlcs {
4✔
1372
                if htlc.State != HtlcStateSettled {
2✔
1373
                        continue
×
1374
                }
1375

1376
                resolution := NewSettleResolution(
2✔
1377
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
2✔
1378
                )
2✔
1379

2✔
1380
                i.notifyHodlSubscribers(resolution)
2✔
1381
        }
1382
        i.notifyClients(hash, invoice, nil)
2✔
1383

2✔
1384
        return nil
2✔
1385
}
1386

1387
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1388
// payment hash.
1389
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1390
        payHash lntypes.Hash) error {
2✔
1391

2✔
1392
        return i.cancelInvoiceImpl(ctx, payHash, true)
2✔
1393
}
2✔
1394

1395
// shouldCancel examines the state of an invoice and whether we want to
1396
// cancel already accepted invoices, taking our force cancel boolean into
1397
// account. This is pulled out into its own function so that tests that mock
1398
// cancelInvoiceImpl can reuse this logic.
1399
func shouldCancel(state ContractState, cancelAccepted bool) bool {
2✔
1400
        if state != ContractAccepted {
4✔
1401
                return true
2✔
1402
        }
2✔
1403

1404
        // If the invoice is accepted, we should only cancel if we want to
1405
        // force cancellation of accepted invoices.
1406
        return cancelAccepted
2✔
1407
}
1408

1409
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1410
// payment hash. Accepted invoices will only be canceled if explicitly
1411
// requested to do so. It notifies subscribing links and resolvers that
1412
// the associated htlcs were canceled if they change state.
1413
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1414
        payHash lntypes.Hash, cancelAccepted bool) error {
2✔
1415

2✔
1416
        i.Lock()
2✔
1417
        defer i.Unlock()
2✔
1418

2✔
1419
        ref := InvoiceRefByHash(payHash)
2✔
1420
        log.Debugf("Invoice%v: canceling invoice", ref)
2✔
1421

2✔
1422
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
4✔
1423
                if !shouldCancel(invoice.State, cancelAccepted) {
2✔
UNCOV
1424
                        return nil, nil
×
UNCOV
1425
                }
×
1426

1427
                // Move invoice to the canceled state. Rely on validation in
1428
                // channeldb to return an error if the invoice is already
1429
                // settled or canceled.
1430
                return &InvoiceUpdateDesc{
2✔
1431
                        UpdateType: CancelInvoiceUpdate,
2✔
1432
                        State: &InvoiceStateUpdateDesc{
2✔
1433
                                NewState: ContractCanceled,
2✔
1434
                        },
2✔
1435
                }, nil
2✔
1436
        }
1437

1438
        // If it's an AMP invoice we need to fetch all AMP HTLCs here so that
1439
        // we can cancel all of them which are in the accepted state.
1440
        setID := (*SetID)(nil)
2✔
1441
        invoiceRef := InvoiceRefByHash(payHash)
2✔
1442
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, setID, updateInvoice)
2✔
1443

2✔
1444
        // Implement idempotency by returning success if the invoice was already
2✔
1445
        // canceled.
2✔
1446
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
2✔
UNCOV
1447
                log.Debugf("Invoice%v: already canceled", ref)
×
UNCOV
1448
                return nil
×
UNCOV
1449
        }
×
1450
        if err != nil {
4✔
1451
                return err
2✔
1452
        }
2✔
1453

1454
        // Return without cancellation if the invoice state is ContractAccepted.
1455
        if invoice.State == ContractAccepted {
2✔
UNCOV
1456
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
×
UNCOV
1457
                        "explicitly requested.", ref)
×
UNCOV
1458
                return nil
×
UNCOV
1459
        }
×
1460

1461
        log.Debugf("Invoice%v: canceled", ref)
2✔
1462

2✔
1463
        // In the callback, some htlcs may have been moved to the canceled
2✔
1464
        // state. We now go through all of these and notify links and resolvers
2✔
1465
        // that are waiting for resolution. Any htlcs that were already canceled
2✔
1466
        // before, will be notified again. This isn't necessary but doesn't hurt
2✔
1467
        // either.
2✔
1468
        // For AMP invoices we fetched all AMP HTLCs for all sub AMP invoices
2✔
1469
        // here so we can clean up all of them.
2✔
1470
        for key, htlc := range invoice.Htlcs {
4✔
1471
                if htlc.State != HtlcStateCanceled {
2✔
1472
                        continue
×
1473
                }
1474

1475
                i.notifyHodlSubscribers(
2✔
1476
                        NewFailResolution(
2✔
1477
                                key, int32(htlc.AcceptHeight), ResultCanceled,
2✔
1478
                        ),
2✔
1479
                )
2✔
1480
        }
1481

1482
        i.notifyClients(payHash, invoice, nil)
2✔
1483

2✔
1484
        // Attempt to also delete the invoice if requested through the registry
2✔
1485
        // config.
2✔
1486
        if i.cfg.GcCanceledInvoicesOnTheFly {
2✔
UNCOV
1487
                // Assemble the delete reference and attempt to delete through
×
UNCOV
1488
                // the invocice from the DB.
×
UNCOV
1489
                deleteRef := InvoiceDeleteRef{
×
UNCOV
1490
                        PayHash:     payHash,
×
UNCOV
1491
                        AddIndex:    invoice.AddIndex,
×
UNCOV
1492
                        SettleIndex: invoice.SettleIndex,
×
UNCOV
1493
                }
×
UNCOV
1494
                if invoice.Terms.PaymentAddr != BlankPayAddr {
×
1495
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1496
                }
×
1497

UNCOV
1498
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
×
UNCOV
1499
                // If by any chance deletion failed, then log it instead of
×
UNCOV
1500
                // returning the error, as the invoice itself has already been
×
UNCOV
1501
                // canceled.
×
UNCOV
1502
                if err != nil {
×
1503
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1504
                                err)
×
1505
                }
×
1506
        }
1507

1508
        return nil
2✔
1509
}
1510

1511
// notifyClients notifies all currently registered invoice notification clients
1512
// of a newly added/settled invoice.
1513
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1514
        invoice *Invoice, setID *[32]byte) {
2✔
1515

2✔
1516
        event := &invoiceEvent{
2✔
1517
                invoice: invoice,
2✔
1518
                hash:    hash,
2✔
1519
                setID:   setID,
2✔
1520
        }
2✔
1521

2✔
1522
        select {
2✔
1523
        case i.invoiceEvents <- event:
2✔
1524
        case <-i.quit:
×
1525
        }
1526
}
1527

1528
// invoiceSubscriptionKit defines that are common to both all invoice
1529
// subscribers and single invoice subscribers.
1530
type invoiceSubscriptionKit struct {
1531
        id uint32 // nolint:structcheck
1532

1533
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1534
        quit chan struct{}
1535

1536
        ntfnQueue *queue.ConcurrentQueue
1537

1538
        canceled   uint32 // To be used atomically.
1539
        cancelChan chan struct{}
1540

1541
        // backlogDelivered is closed when the backlog events have been
1542
        // delivered.
1543
        backlogDelivered chan struct{}
1544
}
1545

1546
// InvoiceSubscription represents an intent to receive updates for newly added
1547
// or settled invoices. For each newly added invoice, a copy of the invoice
1548
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1549
// invoice, a copy of the invoice will be sent over the SettledInvoices
1550
// channel.
1551
type InvoiceSubscription struct {
1552
        invoiceSubscriptionKit
1553

1554
        // NewInvoices is a channel that we'll use to send all newly created
1555
        // invoices with an invoice index greater than the specified
1556
        // StartingInvoiceIndex field.
1557
        NewInvoices chan *Invoice
1558

1559
        // SettledInvoices is a channel that we'll use to send all settled
1560
        // invoices with an invoices index greater than the specified
1561
        // StartingInvoiceIndex field.
1562
        SettledInvoices chan *Invoice
1563

1564
        // addIndex is the highest add index the caller knows of. We'll use
1565
        // this information to send out an event backlog to the notifications
1566
        // subscriber. Any new add events with an index greater than this will
1567
        // be dispatched before any new notifications are sent out.
1568
        addIndex uint64
1569

1570
        // settleIndex is the highest settle index the caller knows of. We'll
1571
        // use this information to send out an event backlog to the
1572
        // notifications subscriber. Any new settle events with an index
1573
        // greater than this will be dispatched before any new notifications
1574
        // are sent out.
1575
        settleIndex uint64
1576
}
1577

1578
// SingleInvoiceSubscription represents an intent to receive updates for a
1579
// specific invoice.
1580
type SingleInvoiceSubscription struct {
1581
        invoiceSubscriptionKit
1582

1583
        invoiceRef InvoiceRef
1584

1585
        // Updates is a channel that we'll use to send all invoice events for
1586
        // the invoice that is subscribed to.
1587
        Updates chan *Invoice
1588
}
1589

1590
// PayHash returns the optional payment hash of the target invoice.
1591
//
1592
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1593
// be deleted as soon as invoiceregistery_test is in the same module.
UNCOV
1594
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
×
UNCOV
1595
        return s.invoiceRef.PayHash()
×
UNCOV
1596
}
×
1597

1598
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1599
// resources.
1600
func (i *invoiceSubscriptionKit) Cancel() {
2✔
1601
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
2✔
1602
                return
×
1603
        }
×
1604

1605
        i.ntfnQueue.Stop()
2✔
1606
        close(i.cancelChan)
2✔
1607
}
1608

1609
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
2✔
1610
        select {
2✔
1611
        case i.ntfnQueue.ChanIn() <- event:
2✔
1612

1613
        case <-i.cancelChan:
×
1614
                // This can only be triggered by delivery of non-backlog
×
1615
                // events.
×
1616
                return ErrShuttingDown
×
1617
        case <-i.quit:
×
1618
                return ErrShuttingDown
×
1619
        }
1620

1621
        return nil
2✔
1622
}
1623

1624
// SubscribeNotifications returns an InvoiceSubscription which allows the
1625
// caller to receive async notifications when any invoices are settled or
1626
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1627
// by first sending out all new events with an invoice index _greater_ than
1628
// this value. Afterwards, we'll send out real-time notifications.
1629
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1630
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
2✔
1631

2✔
1632
        client := &InvoiceSubscription{
2✔
1633
                NewInvoices:     make(chan *Invoice),
2✔
1634
                SettledInvoices: make(chan *Invoice),
2✔
1635
                addIndex:        addIndex,
2✔
1636
                settleIndex:     settleIndex,
2✔
1637
                invoiceSubscriptionKit: invoiceSubscriptionKit{
2✔
1638
                        quit:             i.quit,
2✔
1639
                        ntfnQueue:        queue.NewConcurrentQueue(20),
2✔
1640
                        cancelChan:       make(chan struct{}),
2✔
1641
                        backlogDelivered: make(chan struct{}),
2✔
1642
                },
2✔
1643
        }
2✔
1644
        client.ntfnQueue.Start()
2✔
1645

2✔
1646
        // This notifies other goroutines that the backlog phase is over.
2✔
1647
        defer close(client.backlogDelivered)
2✔
1648

2✔
1649
        // Always increment by 1 first, and our client ID will start with 1,
2✔
1650
        // not 0.
2✔
1651
        client.id = atomic.AddUint32(&i.nextClientID, 1)
2✔
1652

2✔
1653
        // Before we register this new invoice subscription, we'll launch a new
2✔
1654
        // goroutine that will proxy all notifications appended to the end of
2✔
1655
        // the concurrent queue to the two client-side channels the caller will
2✔
1656
        // feed off of.
2✔
1657
        i.wg.Add(1)
2✔
1658
        go func() {
4✔
1659
                defer i.wg.Done()
2✔
1660
                defer i.deleteClient(client.id)
2✔
1661

2✔
1662
                for {
4✔
1663
                        select {
2✔
1664
                        // A new invoice event has been sent by the
1665
                        // invoiceRegistry! We'll figure out if this is an add
1666
                        // event or a settle event, then dispatch the event to
1667
                        // the client.
1668
                        case ntfn := <-client.ntfnQueue.ChanOut():
2✔
1669
                                invoiceEvent := ntfn.(*invoiceEvent)
2✔
1670

2✔
1671
                                var targetChan chan *Invoice
2✔
1672
                                state := invoiceEvent.invoice.State
2✔
1673
                                switch {
2✔
1674
                                // AMP invoices never move to settled, but will
1675
                                // be sent with a set ID if an HTLC set is
1676
                                // being settled.
1677
                                case state == ContractOpen &&
1678
                                        invoiceEvent.setID != nil:
2✔
1679
                                        fallthrough
2✔
1680

1681
                                case state == ContractSettled:
2✔
1682
                                        targetChan = client.SettledInvoices
2✔
1683

1684
                                case state == ContractOpen:
2✔
1685
                                        targetChan = client.NewInvoices
2✔
1686

1687
                                default:
×
1688
                                        log.Errorf("unknown invoice state: %v",
×
1689
                                                state)
×
1690

×
1691
                                        continue
×
1692
                                }
1693

1694
                                select {
2✔
1695
                                case targetChan <- invoiceEvent.invoice:
2✔
1696

1697
                                case <-client.cancelChan:
×
1698
                                        return
×
1699

1700
                                case <-i.quit:
×
1701
                                        return
×
1702
                                }
1703

1704
                        case <-client.cancelChan:
2✔
1705
                                return
2✔
1706

UNCOV
1707
                        case <-i.quit:
×
UNCOV
1708
                                return
×
1709
                        }
1710
                }
1711
        }()
1712

1713
        i.notificationClientMux.Lock()
2✔
1714
        i.notificationClients[client.id] = client
2✔
1715
        i.notificationClientMux.Unlock()
2✔
1716

2✔
1717
        // Query the database to see if based on the provided addIndex and
2✔
1718
        // settledIndex we need to deliver any backlog notifications.
2✔
1719
        err := i.deliverBacklogEvents(ctx, client)
2✔
1720
        if err != nil {
2✔
1721
                return nil, err
×
1722
        }
×
1723

1724
        log.Infof("New invoice subscription client: id=%v", client.id)
2✔
1725

2✔
1726
        return client, nil
2✔
1727
}
1728

1729
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1730
// caller to receive async notifications for a specific invoice.
1731
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1732
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
2✔
1733

2✔
1734
        client := &SingleInvoiceSubscription{
2✔
1735
                Updates: make(chan *Invoice),
2✔
1736
                invoiceSubscriptionKit: invoiceSubscriptionKit{
2✔
1737
                        quit:             i.quit,
2✔
1738
                        ntfnQueue:        queue.NewConcurrentQueue(20),
2✔
1739
                        cancelChan:       make(chan struct{}),
2✔
1740
                        backlogDelivered: make(chan struct{}),
2✔
1741
                },
2✔
1742
                invoiceRef: InvoiceRefByHash(hash),
2✔
1743
        }
2✔
1744
        client.ntfnQueue.Start()
2✔
1745

2✔
1746
        // This notifies other goroutines that the backlog phase is done.
2✔
1747
        defer close(client.backlogDelivered)
2✔
1748

2✔
1749
        // Always increment by 1 first, and our client ID will start with 1,
2✔
1750
        // not 0.
2✔
1751
        client.id = atomic.AddUint32(&i.nextClientID, 1)
2✔
1752

2✔
1753
        // Before we register this new invoice subscription, we'll launch a new
2✔
1754
        // goroutine that will proxy all notifications appended to the end of
2✔
1755
        // the concurrent queue to the two client-side channels the caller will
2✔
1756
        // feed off of.
2✔
1757
        i.wg.Add(1)
2✔
1758
        go func() {
4✔
1759
                defer i.wg.Done()
2✔
1760
                defer i.deleteClient(client.id)
2✔
1761

2✔
1762
                for {
4✔
1763
                        select {
2✔
1764
                        // A new invoice event has been sent by the
1765
                        // invoiceRegistry. We will dispatch the event to the
1766
                        // client.
1767
                        case ntfn := <-client.ntfnQueue.ChanOut():
2✔
1768
                                invoiceEvent := ntfn.(*invoiceEvent)
2✔
1769

2✔
1770
                                select {
2✔
1771
                                case client.Updates <- invoiceEvent.invoice:
2✔
1772

1773
                                case <-client.cancelChan:
×
1774
                                        return
×
1775

1776
                                case <-i.quit:
×
1777
                                        return
×
1778
                                }
1779

1780
                        case <-client.cancelChan:
2✔
1781
                                return
2✔
1782

UNCOV
1783
                        case <-i.quit:
×
UNCOV
1784
                                return
×
1785
                        }
1786
                }
1787
        }()
1788

1789
        i.notificationClientMux.Lock()
2✔
1790
        i.singleNotificationClients[client.id] = client
2✔
1791
        i.notificationClientMux.Unlock()
2✔
1792

2✔
1793
        err := i.deliverSingleBacklogEvents(ctx, client)
2✔
1794
        if err != nil {
2✔
1795
                return nil, err
×
1796
        }
×
1797

1798
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
2✔
1799
                client.id, client.invoiceRef)
2✔
1800

2✔
1801
        return client, nil
2✔
1802
}
1803

1804
// notifyHodlSubscribers sends out the htlc resolution to all current
1805
// subscribers.
1806
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
2✔
1807
        i.hodlSubscriptionsMux.Lock()
2✔
1808
        defer i.hodlSubscriptionsMux.Unlock()
2✔
1809

2✔
1810
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
2✔
1811
        if !ok {
4✔
1812
                return
2✔
1813
        }
2✔
1814

1815
        // Notify all interested subscribers and remove subscription from both
1816
        // maps. The subscription can be removed as there only ever will be a
1817
        // single resolution for each hash.
1818
        for subscriber := range subscribers {
4✔
1819
                select {
2✔
1820
                case subscriber <- htlcResolution:
2✔
1821
                case <-i.quit:
×
1822
                        return
×
1823
                }
1824

1825
                delete(
2✔
1826
                        i.hodlReverseSubscriptions[subscriber],
2✔
1827
                        htlcResolution.CircuitKey(),
2✔
1828
                )
2✔
1829
        }
1830

1831
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
2✔
1832
}
1833

1834
// hodlSubscribe adds a new invoice subscription.
1835
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1836
        circuitKey CircuitKey) {
2✔
1837

2✔
1838
        i.hodlSubscriptionsMux.Lock()
2✔
1839
        defer i.hodlSubscriptionsMux.Unlock()
2✔
1840

2✔
1841
        log.Debugf("Hodl subscribe for %v", circuitKey)
2✔
1842

2✔
1843
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
2✔
1844
        if !ok {
4✔
1845
                subscriptions = make(map[chan<- interface{}]struct{})
2✔
1846
                i.hodlSubscriptions[circuitKey] = subscriptions
2✔
1847
        }
2✔
1848
        subscriptions[subscriber] = struct{}{}
2✔
1849

2✔
1850
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
2✔
1851
        if !ok {
4✔
1852
                reverseSubscriptions = make(map[CircuitKey]struct{})
2✔
1853
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
2✔
1854
        }
2✔
1855
        reverseSubscriptions[circuitKey] = struct{}{}
2✔
1856
}
1857

1858
// HodlUnsubscribeAll cancels the subscription.
1859
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
2✔
1860
        i.hodlSubscriptionsMux.Lock()
2✔
1861
        defer i.hodlSubscriptionsMux.Unlock()
2✔
1862

2✔
1863
        hashes := i.hodlReverseSubscriptions[subscriber]
2✔
1864
        for hash := range hashes {
4✔
1865
                delete(i.hodlSubscriptions[hash], subscriber)
2✔
1866
        }
2✔
1867

1868
        delete(i.hodlReverseSubscriptions, subscriber)
2✔
1869
}
1870

1871
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1872
// useful when we need to iterate the map to send notifications.
1873
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
2✔
1874
        i.notificationClientMux.RLock()
2✔
1875
        defer i.notificationClientMux.RUnlock()
2✔
1876

2✔
1877
        clients := make(map[uint32]*SingleInvoiceSubscription)
2✔
1878
        for k, v := range i.singleNotificationClients {
4✔
1879
                clients[k] = v
2✔
1880
        }
2✔
1881
        return clients
2✔
1882
}
1883

1884
// copyClients copies i.notificationClients inside a lock. This is useful when
1885
// we need to iterate the map to send notifications.
1886
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
2✔
1887
        i.notificationClientMux.RLock()
2✔
1888
        defer i.notificationClientMux.RUnlock()
2✔
1889

2✔
1890
        clients := make(map[uint32]*InvoiceSubscription)
2✔
1891
        for k, v := range i.notificationClients {
4✔
1892
                clients[k] = v
2✔
1893
        }
2✔
1894
        return clients
2✔
1895
}
1896

1897
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1898
// not found.
1899
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
2✔
1900
        i.notificationClientMux.Lock()
2✔
1901
        defer i.notificationClientMux.Unlock()
2✔
1902

2✔
1903
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
2✔
1904
        delete(i.notificationClients, clientID)
2✔
1905
        delete(i.singleNotificationClients, clientID)
2✔
1906
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc