• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13076686584

31 Jan 2025 04:23PM UTC coverage: 58.788% (-0.005%) from 58.793%
13076686584

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

136082 of 231481 relevant lines covered (58.79%)

19284.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.73
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77

78
        // HtlcInterceptor is an interface that allows the invoice registry to
79
        // let clients intercept invoices before they are settled.
80
        HtlcInterceptor HtlcInterceptor
81
}
82

83
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
84
// mpp htlcs for which the complete set didn't arrive in time.
85
type htlcReleaseEvent struct {
86
        // invoiceRef identifiers the invoice this htlc belongs to.
87
        invoiceRef InvoiceRef
88

89
        // key is the circuit key of the htlc to release.
90
        key CircuitKey
91

92
        // releaseTime is the time at which to release the htlc.
93
        releaseTime time.Time
94
}
95

96
// Less is used to order PriorityQueueItem's by their release time such that
97
// items with the older release time are at the top of the queue.
98
//
99
// NOTE: Part of the queue.PriorityQueueItem interface.
100
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
101
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
12✔
102
}
12✔
103

12✔
104
// InvoiceRegistry is a central registry of all the outstanding invoices
105
// created by the daemon. The registry is a thin wrapper around a map in order
106
// to ensure that all updates/reads are thread safe.
107
type InvoiceRegistry struct {
108
        started atomic.Bool
109
        stopped atomic.Bool
110

111
        sync.RWMutex
112

113
        nextClientID uint32 // must be used atomically
114

115
        idb InvoiceDB
116

117
        // cfg contains the registry's configuration parameters.
118
        cfg *RegistryConfig
119

120
        // notificationClientMux locks notificationClients and
121
        // singleNotificationClients. Using a separate mutex for these maps is
122
        // necessary to avoid deadlocks in the registry when processing invoice
123
        // events.
124
        notificationClientMux sync.RWMutex
125

126
        notificationClients map[uint32]*InvoiceSubscription
127

128
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
129
        // performance.
130
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
131

132
        // invoiceEvents is a single channel over which invoice updates are
133
        // carried.
134
        invoiceEvents chan *invoiceEvent
135

136
        // hodlSubscriptionsMux locks the hodlSubscriptions and
137
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
138
        // necessary to avoid deadlocks in the registry when processing invoice
139
        // events.
140
        hodlSubscriptionsMux sync.RWMutex
141

142
        // hodlSubscriptions is a map from a circuit key to a list of
143
        // subscribers. It is used for efficient notification of links.
144
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
145

146
        // reverseSubscriptions tracks circuit keys subscribed to per
147
        // subscriber. This is used to unsubscribe from all hashes efficiently.
148
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
149

150
        // htlcAutoReleaseChan contains the new htlcs that need to be
151
        // auto-released.
152
        htlcAutoReleaseChan chan *htlcReleaseEvent
153

154
        expiryWatcher *InvoiceExpiryWatcher
155

156
        wg   sync.WaitGroup
157
        quit chan struct{}
158
}
159

160
// NewRegistry creates a new invoice registry. The invoice registry
161
// wraps the persistent on-disk invoice storage with an additional in-memory
162
// layer. The in-memory layer is in place such that debug invoices can be added
163
// which are volatile yet available system wide within the daemon.
164
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
165
        cfg *RegistryConfig) *InvoiceRegistry {
166

645✔
167
        notificationClients := make(map[uint32]*InvoiceSubscription)
645✔
168
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
645✔
169
        return &InvoiceRegistry{
645✔
170
                idb:                       idb,
645✔
171
                notificationClients:       notificationClients,
645✔
172
                singleNotificationClients: singleNotificationClients,
645✔
173
                invoiceEvents:             make(chan *invoiceEvent, 100),
645✔
174
                hodlSubscriptions: make(
645✔
175
                        map[CircuitKey]map[chan<- interface{}]struct{},
645✔
176
                ),
645✔
177
                hodlReverseSubscriptions: make(
645✔
178
                        map[chan<- interface{}]map[CircuitKey]struct{},
645✔
179
                ),
645✔
180
                cfg:                 cfg,
645✔
181
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
645✔
182
                expiryWatcher:       expiryWatcher,
645✔
183
                quit:                make(chan struct{}),
645✔
184
        }
645✔
185
}
645✔
186

645✔
187
// scanInvoicesOnStart will scan all invoices on start and add active invoices
188
// to the invoice expiry watcher while also attempting to delete all canceled
189
// invoices.
190
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
191
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
645✔
192
        if err != nil {
645✔
193
                return err
645✔
194
        }
×
195

×
196
        var pending []invoiceExpiry
197
        for paymentHash, invoice := range pendingInvoices {
645✔
198
                invoice := invoice
678✔
199
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
33✔
200
                if expiryRef != nil {
33✔
201
                        pending = append(pending, expiryRef)
66✔
202
                }
33✔
203
        }
33✔
204

205
        log.Debugf("Adding %d pending invoices to the expiry watcher",
206
                len(pending))
645✔
207
        i.expiryWatcher.AddInvoices(pending...)
645✔
208

645✔
209
        if i.cfg.GcCanceledInvoicesOnStartup {
645✔
210
                log.Infof("Deleting canceled invoices")
648✔
211
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
212
                if err != nil {
3✔
213
                        log.Warnf("Deleting canceled invoices failed: %v", err)
3✔
214
                        return err
×
215
                }
×
216
        }
×
217

218
        return nil
219
}
645✔
220

221
// Start starts the registry and all goroutines it needs to carry out its task.
222
func (i *InvoiceRegistry) Start() error {
223
        var err error
645✔
224

645✔
225
        log.Info("InvoiceRegistry starting...")
645✔
226

645✔
227
        if i.started.Swap(true) {
645✔
228
                return fmt.Errorf("InvoiceRegistry started more than once")
645✔
229
        }
×
230
        // Start InvoiceExpiryWatcher and prepopulate it with existing
×
231
        // active invoices.
232
        err = i.expiryWatcher.Start(
233
                func(hash lntypes.Hash, force bool) error {
645✔
234
                        return i.cancelInvoiceImpl(
728✔
235
                                context.Background(), hash, force,
83✔
236
                        )
83✔
237
                })
83✔
238
        if err != nil {
83✔
239
                return err
645✔
240
        }
×
241

×
242
        i.wg.Add(1)
243
        go i.invoiceEventLoop()
645✔
244

645✔
245
        // Now scan all pending and removable invoices to the expiry
645✔
246
        // watcher or delete them.
645✔
247
        err = i.scanInvoicesOnStart(context.Background())
645✔
248
        if err != nil {
645✔
249
                _ = i.Stop()
645✔
250
        }
×
251

×
252
        log.Debug("InvoiceRegistry started")
253

645✔
254
        return err
645✔
255
}
645✔
256

257
// Stop signals the registry for a graceful shutdown.
258
func (i *InvoiceRegistry) Stop() error {
259
        log.Info("InvoiceRegistry shutting down...")
384✔
260

384✔
261
        if i.stopped.Swap(true) {
384✔
262
                return fmt.Errorf("InvoiceRegistry stopped more than once")
384✔
263
        }
×
264

×
265
        log.Info("InvoiceRegistry shutting down...")
266
        defer log.Debug("InvoiceRegistry shutdown complete")
384✔
267

384✔
268
        var err error
384✔
269
        if i.expiryWatcher == nil {
384✔
270
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
384✔
271
                        "initialized")
×
272
        } else {
×
273
                i.expiryWatcher.Stop()
384✔
274
        }
384✔
275

384✔
276
        close(i.quit)
277

384✔
278
        i.wg.Wait()
384✔
279

384✔
280
        log.Debug("InvoiceRegistry shutdown complete")
384✔
281

384✔
282
        return err
384✔
283
}
384✔
284

285
// invoiceEvent represents a new event that has modified on invoice on disk.
286
// Only two event types are currently supported: newly created invoices, and
287
// instance where invoices are settled.
288
type invoiceEvent struct {
289
        hash    lntypes.Hash
290
        invoice *Invoice
291
        setID   *[32]byte
292
}
293

294
// tickAt returns a channel that ticks at the specified time. If the time has
295
// already passed, it will tick immediately.
296
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
297
        now := i.cfg.Clock.Now()
669✔
298
        return i.cfg.Clock.TickAfter(t.Sub(now))
669✔
299
}
669✔
300

669✔
301
// invoiceEventLoop is the dedicated goroutine responsible for accepting
302
// new notification subscriptions, cancelling old subscriptions, and
303
// dispatching new invoice events.
304
func (i *InvoiceRegistry) invoiceEventLoop() {
305
        defer i.wg.Done()
645✔
306

645✔
307
        // Set up a heap for htlc auto-releases.
645✔
308
        autoReleaseHeap := &queue.PriorityQueue{}
645✔
309

645✔
310
        for {
645✔
311
                // If there is something to release, set up a release tick
3,058✔
312
                // channel.
2,413✔
313
                var nextReleaseTick <-chan time.Time
2,413✔
314
                if autoReleaseHeap.Len() > 0 {
2,413✔
315
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
3,082✔
316
                        nextReleaseTick = i.tickAt(head.releaseTime)
669✔
317
                }
669✔
318

669✔
319
                select {
320
                // A sub-systems has just modified the invoice state, so we'll
2,413✔
321
                // dispatch notifications to all registered clients.
322
                case event := <-i.invoiceEvents:
323
                        // For backwards compatibility, do not notify all
1,405✔
324
                        // invoice subscribers of cancel and accept events.
1,405✔
325
                        state := event.invoice.State
1,405✔
326
                        if state != ContractCanceled &&
1,405✔
327
                                state != ContractAccepted {
1,405✔
328

2,648✔
329
                                i.dispatchToClients(event)
1,243✔
330
                        }
1,243✔
331
                        i.dispatchToSingleClients(event)
1,243✔
332

1,405✔
333
                // A new htlc came in for auto-release.
334
                case event := <-i.htlcAutoReleaseChan:
335
                        log.Debugf("Scheduling auto-release for htlc: "+
345✔
336
                                "ref=%v, key=%v at %v",
345✔
337
                                event.invoiceRef, event.key, event.releaseTime)
345✔
338

345✔
339
                        // We use an independent timer for every htlc rather
345✔
340
                        // than a set timer that is reset with every htlc coming
345✔
341
                        // in. Otherwise the sender could keep resetting the
345✔
342
                        // timer until the broadcast window is entered and our
345✔
343
                        // channel is force closed.
345✔
344
                        autoReleaseHeap.Push(event)
345✔
345

345✔
346
                // The htlc at the top of the heap needs to be auto-released.
347
                case <-nextReleaseTick:
348
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
24✔
349
                        err := i.cancelSingleHtlc(
24✔
350
                                event.invoiceRef, event.key, ResultMppTimeout,
24✔
351
                        )
24✔
352
                        if err != nil {
24✔
353
                                log.Errorf("HTLC timer: %v", err)
24✔
354
                        }
×
355

×
356
                case <-i.quit:
357
                        return
384✔
358
                }
384✔
359
        }
360
}
361

362
// dispatchToSingleClients passes the supplied event to all notification
363
// clients that subscribed to all the invoice this event applies to.
364
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
365
        // Dispatch to single invoice subscribers.
1,405✔
366
        clients := i.copySingleClients()
1,405✔
367
        for _, client := range clients {
1,405✔
368
                payHash := client.invoiceRef.PayHash()
1,444✔
369

39✔
370
                if payHash == nil || *payHash != event.hash {
39✔
371
                        continue
42✔
372
                }
3✔
373

374
                select {
375
                case <-client.backlogDelivered:
39✔
376
                        // We won't deliver any events until the backlog has
39✔
377
                        // went through first.
378
                case <-i.quit:
379
                        return
×
380
                }
×
381

382
                client.notify(event)
383
        }
39✔
384
}
385

386
// dispatchToClients passes the supplied event to all notification clients that
387
// subscribed to all invoices. Add and settle indices are used to make sure
388
// that clients don't receive duplicate or unwanted events.
389
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
390
        invoice := event.invoice
1,243✔
391

1,243✔
392
        clients := i.copyClients()
1,243✔
393
        for clientID, client := range clients {
1,243✔
394
                // Before we dispatch this event, we'll check
1,312✔
395
                // to ensure that this client hasn't already
69✔
396
                // received this notification in order to
69✔
397
                // ensure we don't duplicate any events.
69✔
398

69✔
399
                // TODO(joostjager): Refactor switches.
69✔
400
                state := event.invoice.State
69✔
401
                switch {
69✔
402
                // If we've already sent this settle event to
69✔
403
                // the client, then we can skip this.
404
                case state == ContractSettled &&
405
                        client.settleIndex >= invoice.SettleIndex:
406
                        continue
×
407

×
408
                // Similarly, if we've already sent this add to
409
                // the client then we can skip this one, but only if this isn't
410
                // an AMP invoice. AMP invoices always remain in the settle
411
                // state as a base invoice.
412
                case event.setID == nil && state == ContractOpen &&
413
                        client.addIndex >= invoice.AddIndex:
414
                        continue
×
415

×
416
                // These two states should never happen, but we
417
                // log them just in case so we can detect this
418
                // instance.
419
                case state == ContractOpen &&
420
                        client.addIndex+1 != invoice.AddIndex:
421
                        log.Warnf("client=%v for invoice "+
10✔
422
                                "notifications missed an update, "+
10✔
423
                                "add_index=%v, new add event index=%v",
10✔
424
                                clientID, client.addIndex,
10✔
425
                                invoice.AddIndex)
10✔
426

10✔
427
                case state == ContractSettled &&
428
                        client.settleIndex+1 != invoice.SettleIndex:
429
                        log.Warnf("client=%v for invoice "+
3✔
430
                                "notifications missed an update, "+
3✔
431
                                "settle_index=%v, new settle event index=%v",
3✔
432
                                clientID, client.settleIndex,
3✔
433
                                invoice.SettleIndex)
3✔
434
                }
3✔
435

436
                select {
437
                case <-client.backlogDelivered:
69✔
438
                        // We won't deliver any events until the backlog has
69✔
439
                        // been processed.
440
                case <-i.quit:
441
                        return
×
442
                }
×
443

444
                err := client.notify(&invoiceEvent{
445
                        invoice: invoice,
69✔
446
                        setID:   event.setID,
69✔
447
                })
69✔
448
                if err != nil {
69✔
449
                        log.Errorf("Failed dispatching to client: %v", err)
69✔
450
                        return
×
451
                }
×
452

×
453
                // Each time we send a notification to a client, we'll record
454
                // the latest add/settle index it has. We'll use this to ensure
455
                // we don't send a notification twice, which can happen if a new
456
                // event is added while we're catching up a new client.
457
                invState := event.invoice.State
458
                switch {
69✔
459
                case invState == ContractSettled:
69✔
460
                        client.settleIndex = invoice.SettleIndex
21✔
461

21✔
462
                case invState == ContractOpen && event.setID == nil:
463
                        client.addIndex = invoice.AddIndex
45✔
464

45✔
465
                // If this is an AMP invoice, then we'll need to use the set ID
466
                // to keep track of the settle index of the client. AMP
467
                // invoices never go to the open state, but if a setID is
468
                // passed, then we know it was just settled and will track the
469
                // highest settle index so far.
470
                case invState == ContractOpen && event.setID != nil:
471
                        setID := *event.setID
9✔
472
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
9✔
473

9✔
474
                default:
475
                        log.Errorf("unexpected invoice state: %v",
×
476
                                event.invoice.State)
×
477
                }
×
478
        }
479
}
480

481
// deliverBacklogEvents will attempts to query the invoice database for any
482
// notifications that the client has missed since it reconnected last.
483
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
484
        client *InvoiceSubscription) error {
485

48✔
486
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
48✔
487
        if err != nil {
48✔
488
                return err
48✔
489
        }
×
490

×
491
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
492
        if err != nil {
48✔
493
                return err
48✔
494
        }
×
495

×
496
        // If we have any to deliver, then we'll append them to the end of the
497
        // notification queue in order to catch up the client before delivering
498
        // any new notifications.
499
        for _, addEvent := range addEvents {
500
                // We re-bind the loop variable to ensure we don't hold onto
51✔
501
                // the loop reference causing is to point to the same item.
3✔
502
                addEvent := addEvent
3✔
503

3✔
504
                select {
3✔
505
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
3✔
506
                        invoice: &addEvent,
507
                }:
508
                case <-i.quit:
3✔
509
                        return ErrShuttingDown
×
510
                }
×
511
        }
512

513
        for _, settleEvent := range settleEvents {
514
                // We re-bind the loop variable to ensure we don't hold onto
51✔
515
                // the loop reference causing is to point to the same item.
3✔
516
                settleEvent := settleEvent
3✔
517

3✔
518
                select {
3✔
519
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
3✔
520
                        invoice: &settleEvent,
521
                }:
522
                case <-i.quit:
3✔
523
                        return ErrShuttingDown
×
524
                }
×
525
        }
526

527
        return nil
528
}
48✔
529

530
// deliverSingleBacklogEvents will attempt to query the invoice database to
531
// retrieve the current invoice state and deliver this to the subscriber. Single
532
// invoice subscribers will always receive the current state right after
533
// subscribing. Only in case the invoice does not yet exist, nothing is sent
534
// yet.
535
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
536
        client *SingleInvoiceSubscription) error {
537

21✔
538
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
21✔
539

21✔
540
        // It is possible that the invoice does not exist yet, but the client is
21✔
541
        // already watching it in anticipation.
21✔
542
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
21✔
543
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
21✔
544
        if isNotFound || isNotCreated {
21✔
545
                return nil
42✔
546
        }
21✔
547
        if err != nil {
21✔
548
                return err
3✔
549
        }
×
550

×
551
        payHash := client.invoiceRef.PayHash()
552
        if payHash == nil {
3✔
553
                return nil
3✔
554
        }
×
555

×
556
        err = client.notify(&invoiceEvent{
557
                hash:    *payHash,
3✔
558
                invoice: &invoice,
3✔
559
        })
3✔
560
        if err != nil {
3✔
561
                return err
3✔
562
        }
×
563

×
564
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
565
                client.id, payHash)
3✔
566

3✔
567
        return nil
3✔
568
}
3✔
569

570
// AddInvoice adds a regular invoice for the specified amount, identified by
571
// the passed preimage. Additionally, any memo or receipt data provided will
572
// also be stored on-disk. Once this invoice is added, subsystems within the
573
// daemon add/forward HTLCs are able to obtain the proper preimage required for
574
// redemption in the case that we're the final destination. We also return the
575
// addIndex of the newly created invoice which monotonically increases for each
576
// new invoice added.  A side effect of this function is that it also sets
577
// AddIndex on the invoice argument.
578
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
579
        paymentHash lntypes.Hash) (uint64, error) {
580

729✔
581
        i.Lock()
729✔
582

729✔
583
        ref := InvoiceRefByHash(paymentHash)
729✔
584
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
729✔
585

729✔
586
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
729✔
587
        if err != nil {
729✔
588
                i.Unlock()
747✔
589
                return 0, err
18✔
590
        }
18✔
591

18✔
592
        // Now that we've added the invoice, we'll send dispatch a message to
593
        // notify the clients of this new invoice.
594
        i.notifyClients(paymentHash, invoice, nil)
595
        i.Unlock()
714✔
596

714✔
597
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
714✔
598
        // to avoid deadlock when a new invoice is added while an other is being
714✔
599
        // canceled.
714✔
600
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
714✔
601
        if invoiceExpiryRef != nil {
714✔
602
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
1,428✔
603
        }
714✔
604

714✔
605
        return addIndex, nil
606
}
714✔
607

608
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
609
// then we're able to pull the funds pending within an HTLC.
610
//
611
// TODO(roasbeef): ignore if settled?
612
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
613
        rHash lntypes.Hash) (Invoice, error) {
614

419✔
615
        // We'll check the database to see if there's an existing matching
419✔
616
        // invoice.
419✔
617
        ref := InvoiceRefByHash(rHash)
419✔
618
        return i.idb.LookupInvoice(ctx, ref)
419✔
619
}
419✔
620

419✔
621
// LookupInvoiceByRef looks up an invoice by the given reference, if found
622
// then we're able to pull the funds pending within an HTLC.
623
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
624
        ref InvoiceRef) (Invoice, error) {
625

3✔
626
        return i.idb.LookupInvoice(ctx, ref)
3✔
627
}
3✔
628

3✔
629
// startHtlcTimer starts a new timer via the invoice registry main loop that
630
// cancels a single htlc on an invoice when the htlc hold duration has passed.
631
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
632
        key CircuitKey, acceptTime time.Time) error {
633

345✔
634
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
345✔
635
        event := &htlcReleaseEvent{
345✔
636
                invoiceRef:  invoiceRef,
345✔
637
                key:         key,
345✔
638
                releaseTime: releaseTime,
345✔
639
        }
345✔
640

345✔
641
        select {
345✔
642
        case i.htlcAutoReleaseChan <- event:
345✔
643
                return nil
345✔
644

345✔
645
        case <-i.quit:
646
                return ErrShuttingDown
×
647
        }
×
648
}
649

650
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
651
// a resolution result which will be used to notify subscribed links and
652
// resolvers of the details of the htlc cancellation.
653
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
654
        key CircuitKey, result FailResolutionResult) error {
655

24✔
656
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
24✔
657
                // Only allow individual htlc cancellation on open invoices.
24✔
658
                if invoice.State != ContractOpen {
48✔
659
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
24✔
660
                                "open", invoiceRef)
24✔
661

31✔
662
                        return nil, nil
7✔
663
                }
7✔
664

7✔
665
                // Lookup the current status of the htlc in the database.
7✔
666
                var (
7✔
667
                        htlcState HtlcState
7✔
668
                        setID     *SetID
669
                )
670
                htlc, ok := invoice.Htlcs[key]
671
                if !ok {
17✔
672
                        // If this is an AMP invoice, then all the HTLCs won't
17✔
673
                        // be read out, so we'll consult the other mapping to
×
674
                        // try to find the HTLC state in question here.
×
675
                        var found bool
×
676
                        for ampSetID, htlcSet := range invoice.AMPState {
677
                                ampSetID := ampSetID
17✔
678
                                for htlcKey := range htlcSet.InvoiceKeys {
17✔
679
                                        if htlcKey == key {
17✔
680
                                                htlcState = htlcSet.State
17✔
681
                                                setID = &ampSetID
19✔
682

2✔
683
                                                found = true
2✔
684
                                                break
2✔
685
                                        }
2✔
686
                                }
2✔
687
                        }
688

15✔
689
                        if !found {
15✔
690
                                return nil, fmt.Errorf("htlc %v not found", key)
15✔
691
                        }
15✔
692
                } else {
15✔
693
                        htlcState = htlc.State
15✔
694
                }
15✔
695

15✔
696
                // Cancellation is only possible if the htlc wasn't already
15✔
697
                // resolved.
15✔
698
                if htlcState != HtlcStateAccepted {
15✔
699
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
15✔
700
                                "is already resolved", key, invoiceRef)
15✔
701

15✔
702
                        return nil, nil
703
                }
704

705
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
706
                        key, invoiceRef)
707

24✔
708
                // Return an update descriptor that cancels htlc and keeps
24✔
709
                // invoice open.
24✔
710
                canceledHtlcs := map[CircuitKey]struct{}{
24✔
711
                        key: {},
24✔
712
                }
48✔
713

24✔
714
                return &InvoiceUpdateDesc{
24✔
715
                        UpdateType:  CancelHTLCsUpdate,
24✔
716
                        CancelHtlcs: canceledHtlcs,
×
717
                        SetID:       setID,
×
718
                }, nil
24✔
719
        }
24✔
720

24✔
721
        // Try to mark the specified htlc as canceled in the invoice database.
722
        // Intercept the update descriptor to set the local updated variable. If
723
        // no invoice update is performed, we can return early.
24✔
724
        setID := (*SetID)(invoiceRef.SetID())
×
725
        var updated bool
×
726
        invoice, err := i.idb.UpdateInvoice(
33✔
727
                context.Background(), invoiceRef, setID,
9✔
728
                func(invoice *Invoice) (
9✔
729
                        *InvoiceUpdateDesc, error) {
730

731
                        updateDesc, err := updateInvoice(invoice)
732
                        if err != nil {
15✔
733
                                return nil, err
15✔
734
                        }
×
735
                        updated = updateDesc != nil
×
736

30✔
737
                        return updateDesc, err
15✔
738
                },
15✔
739
        )
15✔
740
        if err != nil {
15✔
741
                return err
15✔
742
        }
15✔
743
        if !updated {
15✔
744
                return nil
15✔
745
        }
15✔
746

15✔
747
        // The invoice has been updated. Notify subscribers of the htlc
748
        // resolution.
15✔
749
        htlc, ok := invoice.Htlcs[key]
750
        if !ok {
751
                return fmt.Errorf("htlc %v not found", key)
752
        }
753
        if htlc.State == HtlcStateCanceled {
21✔
754
                resolution := NewFailResolution(
21✔
755
                        key, int32(htlc.AcceptHeight), result,
21✔
756
                )
24✔
757

3✔
758
                i.notifyHodlSubscribers(resolution)
3✔
759
        }
760
        return nil
761
}
21✔
762

24✔
763
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
3✔
764
// htlc.
3✔
765
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
18✔
766
        // Retrieve keysend record if present.
×
767
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
×
768
        if !ok {
×
769
                return nil
770
        }
771

18✔
772
        // Cancel htlc is preimage is invalid.
×
773
        preimage, err := lntypes.MakePreimage(preimageSlice)
×
774
        if err != nil {
775
                return err
776
        }
18✔
777
        if preimage.Hash() != ctx.hash {
18✔
778
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
18✔
779
                        preimage, ctx.hash)
18✔
780
        }
18✔
781

18✔
782
        // Only allow keysend for non-mpp payments.
18✔
783
        if ctx.mpp != nil {
18✔
784
                return errors.New("no mpp keysend supported")
18✔
785
        }
18✔
786

18✔
787
        // Create an invoice for the htlc amount.
18✔
788
        amt := ctx.amtPaid
18✔
789

18✔
790
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
18✔
791
        // be able to pay to it with keysend.
×
792
        rawFeatures := lnwire.NewRawFeatureVector(
×
793
                lnwire.TLVOnionPayloadRequired,
794
        )
795
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
796

797
        // Use the minimum block delta that we require for settling htlcs.
798
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
799

800
        // Pre-check expiry here to prevent inserting an invoice that will not
801
        // be settled.
18✔
802
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
18✔
803
                return errors.New("final expiry too soon")
18✔
804
        }
18✔
805

18✔
806
        // The invoice database indexes all invoices by payment address, however
18✔
807
        // legacy keysend payment do not have one. In order to avoid a new
18✔
808
        // payment type on-disk wrt. to indexing, we'll continue to insert a
18✔
809
        // blank payment address which is special cased in the insertion logic
18✔
810
        // to not be indexed. In the future, once AMP is merged, this should be
18✔
811
        // replaced by generating a random payment address on the behalf of the
18✔
812
        // sender.
18✔
813
        payAddr := BlankPayAddr
18✔
814

18✔
815
        // Create placeholder invoice.
24✔
816
        invoice := &Invoice{
6✔
817
                CreationDate: i.cfg.Clock.Now(),
6✔
818
                Terms: ContractTerm{
6✔
819
                        FinalCltvDelta:  finalCltvDelta,
820
                        Value:           amt,
821
                        PaymentPreimage: &preimage,
822
                        PaymentAddr:     payAddr,
18✔
823
                        Features:        features,
18✔
824
                },
×
825
        }
×
826

827
        if i.cfg.KeysendHoldTime != 0 {
18✔
828
                invoice.HodlInvoice = true
829
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
830
        }
831

832
        // Insert invoice into database. Ignore duplicates, because this
30✔
833
        // may be a replay.
30✔
834
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
33✔
835
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
3✔
836
                return err
3✔
837
        }
838

839
        return nil
840
}
27✔
841

27✔
842
// processAMP just-in-time inserts an invoice if this htlc is a keysend
27✔
843
// htlc.
27✔
844
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
27✔
845
        // AMP payments MUST also include an MPP record.
27✔
846
        if ctx.mpp == nil {
27✔
847
                return errors.New("no MPP record for AMP")
27✔
848
        }
27✔
849

27✔
850
        // Create an invoice for the total amount expected, provided in the MPP
27✔
851
        // record.
27✔
852
        amt := ctx.mpp.TotalMsat()
27✔
853

27✔
854
        // Set the TLV required and MPP optional features on the invoice. We'll
27✔
855
        // also make the AMP features required so that it can't be paid by
27✔
856
        // legacy or MPP htlcs.
27✔
857
        rawFeatures := lnwire.NewRawFeatureVector(
27✔
858
                lnwire.TLVOnionPayloadRequired,
×
859
                lnwire.PaymentAddrOptional,
×
860
                lnwire.AMPRequired,
861
        )
862
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
863

27✔
864
        // Use the minimum block delta that we require for settling htlcs.
27✔
865
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
27✔
866

27✔
867
        // Pre-check expiry here to prevent inserting an invoice that will not
27✔
868
        // be settled.
27✔
869
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
27✔
870
                return errors.New("final expiry too soon")
27✔
871
        }
27✔
872

27✔
873
        // We'll use the sender-generated payment address provided in the HTLC
27✔
874
        // to create our AMP invoice.
27✔
875
        payAddr := ctx.mpp.PaymentAddr()
27✔
876

27✔
877
        // Create placeholder invoice.
27✔
878
        invoice := &Invoice{
27✔
879
                CreationDate: i.cfg.Clock.Now(),
27✔
880
                Terms: ContractTerm{
27✔
881
                        FinalCltvDelta:  finalCltvDelta,
27✔
882
                        Value:           amt,
27✔
883
                        PaymentPreimage: nil,
27✔
884
                        PaymentAddr:     payAddr,
15✔
885
                        Features:        features,
15✔
886
                },
15✔
887
        }
15✔
888

889
        // Insert invoice into database. Ignore duplicates payment hashes and
890
        // payment addrs, this may be a replay or a different HTLC for the AMP
891
        // invoice.
892
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
893
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
894
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
895
        switch {
896
        case isDuplicatedInvoice || isDuplicatedPayAddr:
897
                return nil
898
        default:
899
                return err
900
        }
901
}
902

903
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
904
// describes how the htlc should be resolved.
905
//
906
// When the preimage of the invoice is not yet known (hodl invoice), this
907
// function moves the invoice to the accepted state. When SettleHoldInvoice is
908
// called later, a resolution message will be send back to the caller via the
909
// provided hodlChan. Invoice registry sends on this channel what action needs
910
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
964✔
911
// the channel is either buffered or received on from another goroutine to
964✔
912
// prevent deadlock.
964✔
913
//
964✔
914
// In the case that the htlc is part of a larger set of htlcs that pay to the
964✔
915
// same invoice (multi-path payment), the htlc is held until the set is
964✔
916
// complete. If the set doesn't fully arrive in time, a timer will cancel the
964✔
917
// held htlc.
964✔
918
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
964✔
919
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
964✔
920
        circuitKey CircuitKey, hodlChan chan<- interface{},
964✔
921
        wireCustomRecords lnwire.CustomRecords,
964✔
922
        payload Payload) (HtlcResolution, error) {
964✔
923

964✔
924
        // Create the update context containing the relevant details of the
964✔
925
        // incoming htlc.
964✔
926
        ctx := invoiceUpdateCtx{
964✔
927
                hash:                 rHash,
964✔
928
                circuitKey:           circuitKey,
964✔
929
                amtPaid:              amtPaid,
964✔
930
                expiry:               expiry,
964✔
931
                currentHeight:        currentHeight,
932
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
933
                wireCustomRecords:    wireCustomRecords,
934
                customRecords:        payload.CustomRecords(),
30✔
935
                mpp:                  payload.MultiPath(),
30✔
936
                amp:                  payload.AMPRecord(),
33✔
937
                metadata:             payload.Metadata(),
3✔
938
                pathID:               payload.PathID(),
3✔
939
                totalAmtMsat:         payload.TotalAmtMsat(),
3✔
940
        }
3✔
941

3✔
942
        switch {
3✔
943
        // If we are accepting spontaneous AMP payments and this payload
944
        // contains an AMP record, create an AMP invoice that will be settled
945
        // below.
946
        case i.cfg.AcceptAMP && ctx.amp != nil:
947
                err := i.processAMP(ctx)
948
                if err != nil {
21✔
949
                        ctx.log(fmt.Sprintf("amp error: %v", err))
21✔
950

24✔
951
                        return NewFailResolution(
3✔
952
                                circuitKey, currentHeight, ResultAmpError,
3✔
953
                        ), nil
3✔
954
                }
3✔
955

3✔
956
        // If we are accepting spontaneous keysend payments, create a regular
3✔
957
        // invoice that will be settled below. We also enforce that this is only
958
        // done when no AMP payload is present since it will only be settle-able
959
        // by regular HTLCs.
960
        case i.cfg.AcceptKeySend && ctx.amp == nil:
958✔
961
                err := i.processKeySend(ctx)
958✔
962
                if err != nil {
958✔
963
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
958✔
964

958✔
965
                        return NewFailResolution(
958✔
966
                                circuitKey, currentHeight, ResultKeySendError,
×
967
                        ), nil
×
968
                }
969
        }
1,043✔
970

85✔
971
        // Execute locked notify exit hop logic.
85✔
972
        i.Lock()
973
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
958✔
974
                &ctx, hodlChan,
975
        )
976
        i.Unlock()
977
        if err != nil {
434✔
978
                return nil, err
779✔
979
        }
345✔
980

369✔
981
        if invoiceToExpire != nil {
24✔
982
                i.expiryWatcher.AddInvoices(invoiceToExpire)
348✔
983
        }
324✔
984

324✔
985
        switch r := resolution.(type) {
986
        // The htlc is held. Start a timer outside the lock if the htlc should
345✔
987
        // be auto-released, because otherwise a deadlock may happen with the
345✔
988
        // main event loop.
345✔
989
        case *htlcAcceptResolution:
345✔
990
                if r.autoRelease {
×
991
                        var invRef InvoiceRef
×
992
                        if ctx.amp != nil {
993
                                invRef = InvoiceRefBySetID(*ctx.setID())
994
                        } else {
995
                                invRef = ctx.invoiceRef()
996
                        }
997

434✔
998
                        err := i.startHtlcTimer(
999
                                invRef, circuitKey, r.acceptTime,
1000
                        )
527✔
1001
                        if err != nil {
527✔
1002
                                return nil, err
1003
                        }
1004
                }
×
1005

×
1006
                // We return a nil resolution because htlc acceptances are
1007
                // represented as nil resolutions externally.
1008
                // TODO(carla) update calling code to handle accept resolutions.
1009
                return nil, nil
1010

1011
        // A direct resolution was received for this htlc.
1012
        case HtlcResolution:
1013
                return r, nil
1014

958✔
1015
        // Fail if an unknown resolution type was received.
958✔
1016
        default:
958✔
1017
                return nil, errors.New("invalid resolution type")
958✔
1018
        }
958✔
1019
}
958✔
1020

958✔
1021
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
958✔
1022
// that should be executed inside the registry lock. The returned invoiceExpiry
958✔
1023
// (if not nil) needs to be added to the expiry watcher outside of the lock.
958✔
1024
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
958✔
1025
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1026
        HtlcResolution, invoiceExpiry, error) {
25✔
1027

25✔
1028
        invoiceRef := ctx.invoiceRef()
25✔
1029
        setID := (*SetID)(ctx.setID())
25✔
1030

25✔
1031
        // We need to look up the current state of the invoice in order to send
25✔
1032
        // the previously accepted/settled HTLCs to the interceptor.
25✔
1033
        existingInvoice, err := i.idb.LookupInvoice(
25✔
1034
                context.Background(), invoiceRef,
1035
        )
×
1036
        switch {
×
1037
        case errors.Is(err, ErrInvoiceNotFound) ||
×
1038
                errors.Is(err, ErrNoInvoicesCreated):
1039

1040
                // If the invoice was not found, return a failure resolution
936✔
1041
                // with an invoice not found result.
936✔
1042
                return NewFailResolution(
936✔
1043
                        ctx.circuitKey, ctx.currentHeight,
936✔
1044
                        ResultInvoiceNotFound,
936✔
1045
                ), nil, nil
936✔
1046

936✔
1047
        case err != nil:
936✔
1048
                ctx.log(err.Error())
936✔
1049
                return nil, nil, err
936✔
1050
        }
936✔
1051

936✔
1052
        var cancelSet bool
942✔
1053

6✔
1054
        // Provide the invoice to the settlement interceptor to allow
6✔
1055
        // the interceptor's client an opportunity to manipulate the
6✔
1056
        // settlement process.
9✔
1057
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
3✔
1058
                WireCustomRecords:  ctx.wireCustomRecords,
3✔
1059
                ExitHtlcCircuitKey: ctx.circuitKey,
1060
                ExitHtlcAmt:        ctx.amtPaid,
6✔
1061
                ExitHtlcExpiry:     ctx.expiry,
1062
                CurrentHeight:      uint32(ctx.currentHeight),
936✔
1063
                Invoice:            existingInvoice,
×
1064
        }, func(resp HtlcModifyResponse) {
×
1065
                log.Debugf("Received invoice HTLC interceptor response: %v",
×
1066
                        resp)
×
1067

×
1068
                if resp.AmountPaid != 0 {
×
1069
                        ctx.amtPaid = resp.AmountPaid
1070
                }
1071

1072
                cancelSet = resp.CancelSet
936✔
1073
        })
936✔
1074
        if err != nil {
936✔
1075
                err := fmt.Errorf("error during invoice HTLC interception: %w",
936✔
1076
                        err)
1,872✔
1077
                ctx.log(err.Error())
936✔
1078

936✔
1079
                return nil, nil, err
936✔
1080
        }
936✔
1081

936✔
1082
        // We'll attempt to settle an invoice matching this rHash on disk (if
×
1083
        // one exists). The callback will update the invoice state and/or htlcs.
×
1084
        var (
955✔
1085
                resolution        HtlcResolution
19✔
1086
                updateSubscribers bool
19✔
1087
        )
19✔
1088
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1089
                updateDesc, res, err := updateInvoice(ctx, inv)
1090
                if err != nil {
1091
                        return nil, err
926✔
1092
                }
6✔
1093

6✔
1094
                // Only send an update if the invoice state was changed.
6✔
1095
                updateSubscribers = updateDesc != nil &&
×
1096
                        updateDesc.State != nil
×
1097

×
1098
                // Assign resolution to outer scope variable.
×
1099
                if cancelSet {
×
1100
                        // If a cancel signal was set for the htlc set, we set
×
1101
                        // the resolution as a failure with an underpayment
×
1102
                        // indication. Something was wrong with this htlc, so
×
1103
                        // we probably can't settle the invoice at all.
×
1104
                        resolution = NewFailResolution(
×
1105
                                ctx.circuitKey, ctx.currentHeight,
×
1106
                                ResultAmountTooLow,
1107
                        )
1108
                } else {
1109
                        resolution = res
1110
                }
1111

1112
                return updateDesc, nil
1113
        }
6✔
1114

6✔
1115
        invoice, err := i.idb.UpdateInvoice(
6✔
1116
                context.Background(), invoiceRef, setID, callback,
6✔
1117
        )
6✔
1118

6✔
1119
        var duplicateSetIDErr ErrDuplicateSetID
6✔
1120
        if errors.As(err, &duplicateSetIDErr) {
6✔
1121
                return NewFailResolution(
6✔
1122
                        ctx.circuitKey, ctx.currentHeight,
6✔
1123
                        ResultInvoiceNotFound,
6✔
1124
                ), nil, nil
6✔
1125
        }
6✔
1126

6✔
1127
        switch {
6✔
1128
        case errors.Is(err, ErrInvoiceNotFound):
6✔
1129
                // If the invoice was not found, return a failure resolution
6✔
1130
                // with an invoice not found result.
6✔
1131
                return NewFailResolution(
6✔
1132
                        ctx.circuitKey, ctx.currentHeight,
6✔
1133
                        ResultInvoiceNotFound,
6✔
1134
                ), nil, nil
6✔
1135

6✔
1136
        case errors.Is(err, ErrInvRefEquivocation):
6✔
1137
                return NewFailResolution(
6✔
1138
                        ctx.circuitKey, ctx.currentHeight,
6✔
1139
                        ResultInvoiceNotFound,
6✔
1140
                ), nil, nil
1141

1142
        case err == nil:
917✔
1143

917✔
1144
        default:
×
1145
                ctx.log(err.Error())
×
1146
                return nil, nil, err
1147
        }
1148

917✔
1149
        var invoiceToExpire invoiceExpiry
917✔
1150

917✔
1151
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
917✔
1152

917✔
1153
        switch res := resolution.(type) {
917✔
1154
        case *HtlcFailResolution:
917✔
1155
                // Inspect latest htlc state on the invoice. If it is found,
1156
                // we will update the accept height as it was recorded in the
1157
                // invoice database (which occurs in the case where the htlc
936✔
1158
                // reached the database in a previous call). If the htlc was
936✔
1159
                // not found on the invoice, it was immediately failed so we
936✔
1160
                // send the failure resolution as is, which has the current
936✔
1161
                // height set as the accept height.
936✔
1162
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
936✔
1163
                if ok {
×
1164
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
×
1165
                }
×
1166

×
1167
                ctx.log(fmt.Sprintf("failure resolution result "+
×
1168
                        "outcome: %v, at accept height: %v",
1169
                        res.Outcome, res.AcceptHeight))
936✔
1170

×
1171
                // Some failures apply to the entire HTLC set. Break here if
×
1172
                // this isn't one of them.
×
1173
                if !res.Outcome.IsSetFailure() {
×
1174
                        break
×
1175
                }
×
1176

×
1177
                // Also cancel any HTLCs in the HTLC set that are also in the
1178
                // canceled state with the same failure result.
×
1179
                setID := ctx.setID()
×
1180
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
×
1181
                for key, htlc := range canceledHtlcSet {
×
1182
                        htlcFailResolution := NewFailResolution(
×
1183
                                key, int32(htlc.AcceptHeight), res.Outcome,
1184
                        )
936✔
1185

1186
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1187
                }
×
1188

×
1189
        // If the htlc was settled, we will settle any previously accepted
1190
        // htlcs and notify our peer to settle them.
1191
        case *HtlcSettleResolution:
936✔
1192
                ctx.log(fmt.Sprintf("settle resolution result "+
936✔
1193
                        "outcome: %v, at accept height: %v",
936✔
1194
                        res.Outcome, res.AcceptHeight))
936✔
1195

936✔
1196
                // Also settle any previously accepted htlcs. If a htlc is
32✔
1197
                // marked as settled, we should follow now and settle the htlc
32✔
1198
                // with our peer.
32✔
1199
                setID := ctx.setID()
32✔
1200
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
32✔
1201
                for key, htlc := range settledHtlcSet {
32✔
1202
                        preimage := res.Preimage
32✔
1203
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
32✔
1204
                                preimage = *htlc.AMP.Preimage
32✔
1205
                        }
38✔
1206

6✔
1207
                        // Notify subscribers that the htlcs should be settled
6✔
1208
                        // with our peer. Note that the outcome of the
1209
                        // resolution is set based on the outcome of the single
32✔
1210
                        // htlc that we just settled, so may not be accurate
32✔
1211
                        // for all htlcs.
32✔
1212
                        htlcSettleResolution := NewSettleResolution(
32✔
1213
                                preimage, key,
32✔
1214
                                int32(htlc.AcceptHeight), res.Outcome,
32✔
1215
                        )
55✔
1216

23✔
1217
                        // Notify subscribers that the htlc should be settled
1218
                        // with our peer.
1219
                        i.notifyHodlSubscribers(htlcSettleResolution)
1220
                }
1221

12✔
1222
                // If concurrent payments were attempted to this invoice before
12✔
1223
                // the current one was ultimately settled, cancel back any of
21✔
1224
                // the HTLCs immediately. As a result of the settle, the HTLCs
9✔
1225
                // in other HTLC sets are automatically converted to a canceled
9✔
1226
                // state when updating the invoice.
9✔
1227
                //
9✔
1228
                // TODO(roasbeef): can remove now??
9✔
1229
                canceledHtlcSet := invoice.HTLCSetCompliment(
9✔
1230
                        setID, HtlcStateCanceled,
1231
                )
1232
                for key, htlc := range canceledHtlcSet {
1233
                        htlcFailResolution := NewFailResolution(
476✔
1234
                                key, int32(htlc.AcceptHeight),
476✔
1235
                                ResultInvoiceAlreadySettled,
476✔
1236
                        )
476✔
1237

476✔
1238
                        i.notifyHodlSubscribers(htlcFailResolution)
476✔
1239
                }
476✔
1240

476✔
1241
        // If we accepted the htlc, subscribe to the hodl invoice and return
476✔
1242
        // an accept resolution with the htlc's accept time on it.
476✔
1243
        case *htlcAcceptResolution:
1,264✔
1244
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
788✔
1245
                if !ok {
803✔
1246
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
15✔
1247
                                " present on invoice: %x", ctx.circuitKey,
15✔
1248
                                ctx.hash[:])
1249
                }
1250

1251
                // Determine accepted height of this htlc. If the htlc reached
1252
                // the invoice database (possibly in a previous call to the
1253
                // invoice registry), we'll take the original accepted height
1254
                // as it was recorded in the database.
788✔
1255
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
788✔
1256

788✔
1257
                ctx.log(fmt.Sprintf("accept resolution result "+
788✔
1258
                        "outcome: %v, at accept height: %v",
788✔
1259
                        res.outcome, acceptHeight))
788✔
1260

788✔
1261
                // Auto-release the htlc if the invoice is still open. It can
788✔
1262
                // only happen for mpp payments that there are htlcs in state
1263
                // Accepted while the invoice is Open.
1264
                if invoice.State == ContractOpen {
1265
                        res.acceptTime = invoiceHtlc.AcceptTime
1266
                        res.autoRelease = true
1267
                }
1268

1269
                // If we have fully accepted the set of htlcs for this invoice,
1270
                // we can now add it to our invoice expiry watcher. We do not
1271
                // add invoices before they are fully accepted, because it is
476✔
1272
                // possible that we MppTimeout the htlcs, and then our relevant
476✔
1273
                // expiry height could change.
476✔
1274
                if res.outcome == resultAccepted {
476✔
1275
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
×
1276
                }
×
1277

×
1278
                // Subscribe to the resolution if the caller specified a
×
1279
                // notification channel.
×
1280
                if hodlChan != nil {
×
1281
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
×
1282
                }
1283

1284
        default:
1285
                panic("unknown action")
434✔
1286
        }
434✔
1287

434✔
1288
        // Now that the links have been notified of any state changes to their
×
1289
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
×
1290
        // state changes.
×
1291
        if updateSubscribers {
×
1292
                // We'll add a setID onto the notification, but only if this is
1293
                // an AMP invoice being settled.
1294
                var setID *[32]byte
1295
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1296
                        setID = ctx.setID()
1297
                }
434✔
1298

434✔
1299
                i.notifyClients(ctx.hash, invoice, setID)
434✔
1300
        }
434✔
1301

434✔
1302
        return resolution, invoiceToExpire, nil
434✔
1303
}
434✔
1304

434✔
1305
// SettleHodlInvoice sets the preimage of a hodl invoice.
434✔
1306
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
779✔
1307
        preimage lntypes.Preimage) error {
345✔
1308

345✔
1309
        i.Lock()
345✔
1310
        defer i.Unlock()
1311

1312
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
1313
                switch invoice.State {
1314
                case ContractOpen:
1315
                        return nil, ErrInvoiceStillOpen
1316

519✔
1317
                case ContractCanceled:
85✔
1318
                        return nil, ErrInvoiceAlreadyCanceled
85✔
1319

1320
                case ContractSettled:
1321
                        return nil, ErrInvoiceAlreadySettled
1322
                }
868✔
1323

434✔
1324
                return &InvoiceUpdateDesc{
434✔
1325
                        UpdateType: SettleHodlInvoiceUpdate,
1326
                        State: &InvoiceStateUpdateDesc{
×
1327
                                NewState: ContractSettled,
×
1328
                                Preimage: &preimage,
1329
                        },
1330
                }, nil
1331
        }
1332

1333
        hash := preimage.Hash()
1,491✔
1334
        invoiceRef := InvoiceRefByHash(hash)
555✔
1335
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
555✔
1336
        if err != nil {
555✔
1337
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
1,022✔
1338
                        preimage, err)
467✔
1339

467✔
1340
                return err
1341
        }
555✔
1342

1343
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
1344
                invoice.Terms.PaymentPreimage)
936✔
1345

1346
        // In the callback, we marked the invoice as settled. UpdateInvoice will
1347
        // have seen this and should have moved all htlcs that were accepted to
1348
        // the settled state. In the loop below, we go through all of these and
1349
        // notify links and resolvers that are waiting for resolution. Any htlcs
72✔
1350
        // that were already settled before, will be notified again. This isn't
72✔
1351
        // necessary but doesn't hurt either.
72✔
1352
        for key, htlc := range invoice.Htlcs {
72✔
1353
                if htlc.State != HtlcStateSettled {
72✔
1354
                        continue
144✔
1355
                }
72✔
1356

×
1357
                resolution := NewSettleResolution(
×
1358
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
1359
                )
×
1360

×
1361
                i.notifyHodlSubscribers(resolution)
1362
        }
3✔
1363
        i.notifyClients(hash, invoice, nil)
3✔
1364

1365
        return nil
1366
}
69✔
1367

69✔
1368
// CancelInvoice attempts to cancel the invoice corresponding to the passed
69✔
1369
// payment hash.
69✔
1370
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
69✔
1371
        payHash lntypes.Hash) error {
69✔
1372

69✔
1373
        return i.cancelInvoiceImpl(ctx, payHash, true)
1374
}
1375

72✔
1376
// shouldCancel examines the state of an invoice and whether we want to
72✔
1377
// cancel already accepted invoices, taking our force cancel boolean into
72✔
1378
// account. This is pulled out into its own function so that tests that mock
75✔
1379
// cancelInvoiceImpl can reuse this logic.
3✔
1380
func shouldCancel(state ContractState, cancelAccepted bool) bool {
3✔
1381
        if state != ContractAccepted {
3✔
1382
                return true
3✔
1383
        }
3✔
1384

1385
        // If the invoice is accepted, we should only cancel if we want to
69✔
1386
        // force cancellation of accepted invoices.
69✔
1387
        return cancelAccepted
69✔
1388
}
69✔
1389

69✔
1390
// cancelInvoice attempts to cancel the invoice corresponding to the passed
69✔
1391
// payment hash. Accepted invoices will only be canceled if explicitly
69✔
1392
// requested to do so. It notifies subscribing links and resolvers that
69✔
1393
// the associated htlcs were canceled if they change state.
69✔
1394
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
141✔
1395
        payHash lntypes.Hash, cancelAccepted bool) error {
72✔
1396

×
1397
        i.Lock()
1398
        defer i.Unlock()
1399

72✔
1400
        ref := InvoiceRefByHash(payHash)
72✔
1401
        log.Debugf("Invoice%v: canceling invoice", ref)
72✔
1402

72✔
1403
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
72✔
1404
                if !shouldCancel(invoice.State, cancelAccepted) {
1405
                        return nil, nil
69✔
1406
                }
69✔
1407

69✔
1408
                // Move invoice to the canceled state. Rely on validation in
1409
                // channeldb to return an error if the invoice is already
1410
                // settled or canceled.
1411
                return &InvoiceUpdateDesc{
1412
                        UpdateType: CancelInvoiceUpdate,
1413
                        State: &InvoiceStateUpdateDesc{
32✔
1414
                                NewState: ContractCanceled,
32✔
1415
                        },
32✔
1416
                }, nil
32✔
1417
        }
1418

1419
        invoiceRef := InvoiceRefByHash(payHash)
1420
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
1421

1422
        // Implement idempotency by returning success if the invoice was already
103✔
1423
        // canceled.
178✔
1424
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
75✔
1425
                log.Debugf("Invoice%v: already canceled", ref)
75✔
1426
                return nil
1427
        }
1428
        if err != nil {
1429
                return err
31✔
1430
        }
1431

1432
        // Return without cancellation if the invoice state is ContractAccepted.
1433
        if invoice.State == ContractAccepted {
1434
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
1435
                        "explicitly requested.", ref)
1436
                return nil
1437
        }
112✔
1438

112✔
1439
        log.Debugf("Invoice%v: canceled", ref)
112✔
1440

112✔
1441
        // In the callback, some htlcs may have been moved to the canceled
112✔
1442
        // state. We now go through all of these and notify links and resolvers
112✔
1443
        // that are waiting for resolution. Any htlcs that were already canceled
112✔
1444
        // before, will be notified again. This isn't necessary but doesn't hurt
112✔
1445
        // either.
215✔
1446
        for key, htlc := range invoice.Htlcs {
115✔
1447
                if htlc.State != HtlcStateCanceled {
12✔
1448
                        continue
12✔
1449
                }
1450

1451
                i.notifyHodlSubscribers(
1452
                        NewFailResolution(
1453
                                key, int32(htlc.AcceptHeight), ResultCanceled,
91✔
1454
                        ),
91✔
1455
                )
91✔
1456
        }
91✔
1457
        i.notifyClients(payHash, invoice, nil)
91✔
1458

91✔
1459
        // Attempt to also delete the invoice if requested through the registry
1460
        // config.
1461
        if i.cfg.GcCanceledInvoicesOnTheFly {
112✔
1462
                // Assemble the delete reference and attempt to delete through
112✔
1463
                // the invocice from the DB.
112✔
1464
                deleteRef := InvoiceDeleteRef{
112✔
1465
                        PayHash:     payHash,
112✔
1466
                        AddIndex:    invoice.AddIndex,
112✔
1467
                        SettleIndex: invoice.SettleIndex,
112✔
1468
                }
115✔
1469
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1470
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
3✔
1471
                }
3✔
1472

132✔
1473
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
23✔
1474
                // If by any chance deletion failed, then log it instead of
23✔
1475
                // returning the error, as the invoice itself has already been
1476
                // canceled.
1477
                if err != nil {
101✔
1478
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
12✔
1479
                                err)
12✔
1480
                }
12✔
1481
        }
12✔
1482

1483
        return nil
77✔
1484
}
77✔
1485

77✔
1486
// notifyClients notifies all currently registered invoice notification clients
77✔
1487
// of a newly added/settled invoice.
77✔
1488
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
77✔
1489
        invoice *Invoice, setID *[32]byte) {
77✔
1490

77✔
1491
        event := &invoiceEvent{
77✔
1492
                invoice: invoice,
114✔
1493
                hash:    hash,
37✔
1494
                setID:   setID,
×
1495
        }
1496

1497
        select {
37✔
1498
        case i.invoiceEvents <- event:
37✔
1499
        case <-i.quit:
37✔
1500
        }
37✔
1501
}
37✔
1502

1503
// invoiceSubscriptionKit defines that are common to both all invoice
77✔
1504
// subscribers and single invoice subscribers.
77✔
1505
type invoiceSubscriptionKit struct {
77✔
1506
        id uint32 // nolint:structcheck
77✔
1507

80✔
1508
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
3✔
1509
        quit chan struct{}
3✔
1510

3✔
1511
        ntfnQueue *queue.ConcurrentQueue
3✔
1512

3✔
1513
        canceled   uint32 // To be used atomically.
3✔
1514
        cancelChan chan struct{}
3✔
1515

3✔
1516
        // backlogDelivered is closed when the backlog events have been
×
1517
        // delivered.
×
1518
        backlogDelivered chan struct{}
1519
}
3✔
1520

3✔
1521
// InvoiceSubscription represents an intent to receive updates for newly added
3✔
1522
// or settled invoices. For each newly added invoice, a copy of the invoice
3✔
1523
// will be sent over the NewInvoices channel. Similarly, for each newly settled
3✔
1524
// invoice, a copy of the invoice will be sent over the SettledInvoices
×
1525
// channel.
×
1526
type InvoiceSubscription struct {
×
1527
        invoiceSubscriptionKit
1528

1529
        // NewInvoices is a channel that we'll use to send all newly created
77✔
1530
        // invoices with an invoice index greater than the specified
1531
        // StartingInvoiceIndex field.
1532
        NewInvoices chan *Invoice
1533

1534
        // SettledInvoices is a channel that we'll use to send all settled
1535
        // invoices with an invoices index greater than the specified
1,406✔
1536
        // StartingInvoiceIndex field.
1,406✔
1537
        SettledInvoices chan *Invoice
1,406✔
1538

1,406✔
1539
        // addIndex is the highest add index the caller knows of. We'll use
1,406✔
1540
        // this information to send out an event backlog to the notifications
1,406✔
1541
        // subscriber. Any new add events with an index greater than this will
1,406✔
1542
        // be dispatched before any new notifications are sent out.
1,406✔
1543
        addIndex uint64
1,406✔
1544

1,406✔
1545
        // settleIndex is the highest settle index the caller knows of. We'll
×
1546
        // use this information to send out an event backlog to the
1547
        // notifications subscriber. Any new settle events with an index
1548
        // greater than this will be dispatched before any new notifications
1549
        // are sent out.
1550
        settleIndex uint64
1551
}
1552

1553
// SingleInvoiceSubscription represents an intent to receive updates for a
1554
// specific invoice.
1555
type SingleInvoiceSubscription struct {
1556
        invoiceSubscriptionKit
1557

1558
        invoiceRef InvoiceRef
1559

1560
        // Updates is a channel that we'll use to send all invoice events for
1561
        // the invoice that is subscribed to.
1562
        Updates chan *Invoice
1563
}
1564

1565
// PayHash returns the optional payment hash of the target invoice.
1566
//
1567
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1568
// be deleted as soon as invoiceregistery_test is in the same module.
1569
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
1570
        return s.invoiceRef.PayHash()
1571
}
1572

1573
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1574
// resources.
1575
func (i *invoiceSubscriptionKit) Cancel() {
1576
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
1577
                return
1578
        }
1579

1580
        i.ntfnQueue.Stop()
1581
        close(i.cancelChan)
1582
}
1583

1584
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
1585
        select {
1586
        case i.ntfnQueue.ChanIn() <- event:
1587

1588
        case <-i.cancelChan:
1589
                // This can only be triggered by delivery of non-backlog
1590
                // events.
1591
                return ErrShuttingDown
1592
        case <-i.quit:
1593
                return ErrShuttingDown
1594
        }
1595

1596
        return nil
1597
}
1598

1599
// SubscribeNotifications returns an InvoiceSubscription which allows the
1600
// caller to receive async notifications when any invoices are settled or
1601
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1602
// by first sending out all new events with an invoice index _greater_ than
1603
// this value. Afterwards, we'll send out real-time notifications.
1604
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1605
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
1606

1607
        client := &InvoiceSubscription{
1608
                NewInvoices:     make(chan *Invoice),
1609
                SettledInvoices: make(chan *Invoice),
1610
                addIndex:        addIndex,
1611
                settleIndex:     settleIndex,
1612
                invoiceSubscriptionKit: invoiceSubscriptionKit{
1613
                        quit:             i.quit,
1614
                        ntfnQueue:        queue.NewConcurrentQueue(20),
1615
                        cancelChan:       make(chan struct{}),
18✔
1616
                        backlogDelivered: make(chan struct{}),
18✔
1617
                },
18✔
1618
        }
1619
        client.ntfnQueue.Start()
1620

1621
        // This notifies other goroutines that the backlog phase is over.
66✔
1622
        defer close(client.backlogDelivered)
66✔
1623

×
1624
        // Always increment by 1 first, and our client ID will start with 1,
×
1625
        // not 0.
1626
        client.id = atomic.AddUint32(&i.nextClientID, 1)
66✔
1627

66✔
1628
        // Before we register this new invoice subscription, we'll launch a new
1629
        // goroutine that will proxy all notifications appended to the end of
1630
        // the concurrent queue to the two client-side channels the caller will
105✔
1631
        // feed off of.
105✔
1632
        i.wg.Add(1)
105✔
1633
        go func() {
1634
                defer i.wg.Done()
×
1635
                defer i.deleteClient(client.id)
×
1636

×
1637
                for {
×
1638
                        select {
×
1639
                        // A new invoice event has been sent by the
×
1640
                        // invoiceRegistry! We'll figure out if this is an add
1641
                        // event or a settle event, then dispatch the event to
1642
                        // the client.
105✔
1643
                        case ntfn := <-client.ntfnQueue.ChanOut():
1644
                                invoiceEvent := ntfn.(*invoiceEvent)
1645

1646
                                var targetChan chan *Invoice
1647
                                state := invoiceEvent.invoice.State
1648
                                switch {
1649
                                // AMP invoices never move to settled, but will
1650
                                // be sent with a set ID if an HTLC set is
1651
                                // being settled.
48✔
1652
                                case state == ContractOpen &&
48✔
1653
                                        invoiceEvent.setID != nil:
48✔
1654
                                        fallthrough
48✔
1655

48✔
1656
                                case state == ContractSettled:
48✔
1657
                                        targetChan = client.SettledInvoices
48✔
1658

48✔
1659
                                case state == ContractOpen:
48✔
1660
                                        targetChan = client.NewInvoices
48✔
1661

48✔
1662
                                default:
48✔
1663
                                        log.Errorf("unknown invoice state: %v",
48✔
1664
                                                state)
48✔
1665

48✔
1666
                                        continue
48✔
1667
                                }
48✔
1668

48✔
1669
                                select {
48✔
1670
                                case targetChan <- invoiceEvent.invoice:
48✔
1671

48✔
1672
                                case <-client.cancelChan:
48✔
1673
                                        return
48✔
1674

48✔
1675
                                case <-i.quit:
48✔
1676
                                        return
48✔
1677
                                }
48✔
1678

48✔
1679
                        case <-client.cancelChan:
96✔
1680
                                return
48✔
1681

48✔
1682
                        case <-i.quit:
48✔
1683
                                return
162✔
1684
                        }
114✔
1685
                }
1686
        }()
1687

1688
        i.notificationClientMux.Lock()
1689
        i.notificationClients[client.id] = client
69✔
1690
        i.notificationClientMux.Unlock()
69✔
1691

69✔
1692
        // Query the database to see if based on the provided addIndex and
69✔
1693
        // settledIndex we need to deliver any backlog notifications.
69✔
1694
        err := i.deliverBacklogEvents(ctx, client)
69✔
1695
        if err != nil {
1696
                return nil, err
1697
        }
1698

1699
        log.Infof("New invoice subscription client: id=%v", client.id)
9✔
1700

9✔
1701
        return client, nil
1702
}
27✔
1703

27✔
1704
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1705
// caller to receive async notifications for a specific invoice.
45✔
1706
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
45✔
1707
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
1708

×
1709
        client := &SingleInvoiceSubscription{
×
1710
                Updates: make(chan *Invoice),
×
1711
                invoiceSubscriptionKit: invoiceSubscriptionKit{
×
1712
                        quit:             i.quit,
×
1713
                        ntfnQueue:        queue.NewConcurrentQueue(20),
1714
                        cancelChan:       make(chan struct{}),
1715
                        backlogDelivered: make(chan struct{}),
69✔
1716
                },
69✔
1717
                invoiceRef: InvoiceRefByHash(hash),
1718
        }
×
1719
        client.ntfnQueue.Start()
×
1720

1721
        // This notifies other goroutines that the backlog phase is done.
×
1722
        defer close(client.backlogDelivered)
×
1723

1724
        // Always increment by 1 first, and our client ID will start with 1,
1725
        // not 0.
48✔
1726
        client.id = atomic.AddUint32(&i.nextClientID, 1)
48✔
1727

1728
        // Before we register this new invoice subscription, we'll launch a new
×
1729
        // goroutine that will proxy all notifications appended to the end of
×
1730
        // the concurrent queue to the two client-side channels the caller will
1731
        // feed off of.
1732
        i.wg.Add(1)
1733
        go func() {
1734
                defer i.wg.Done()
48✔
1735
                defer i.deleteClient(client.id)
48✔
1736

48✔
1737
                for {
48✔
1738
                        select {
48✔
1739
                        // A new invoice event has been sent by the
48✔
1740
                        // invoiceRegistry. We will dispatch the event to the
48✔
1741
                        // client.
48✔
1742
                        case ntfn := <-client.ntfnQueue.ChanOut():
×
1743
                                invoiceEvent := ntfn.(*invoiceEvent)
×
1744

1745
                                select {
48✔
1746
                                case client.Updates <- invoiceEvent.invoice:
48✔
1747

48✔
1748
                                case <-client.cancelChan:
1749
                                        return
1750

1751
                                case <-i.quit:
1752
                                        return
1753
                                }
21✔
1754

21✔
1755
                        case <-client.cancelChan:
21✔
1756
                                return
21✔
1757

21✔
1758
                        case <-i.quit:
21✔
1759
                                return
21✔
1760
                        }
21✔
1761
                }
21✔
1762
        }()
21✔
1763

21✔
1764
        i.notificationClientMux.Lock()
21✔
1765
        i.singleNotificationClients[client.id] = client
21✔
1766
        i.notificationClientMux.Unlock()
21✔
1767

21✔
1768
        err := i.deliverSingleBacklogEvents(ctx, client)
21✔
1769
        if err != nil {
21✔
1770
                return nil, err
21✔
1771
        }
21✔
1772

21✔
1773
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
21✔
1774
                client.id, client.invoiceRef)
21✔
1775

21✔
1776
        return client, nil
21✔
1777
}
21✔
1778

21✔
1779
// notifyHodlSubscribers sends out the htlc resolution to all current
42✔
1780
// subscribers.
21✔
1781
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
21✔
1782
        i.hodlSubscriptionsMux.Lock()
21✔
1783
        defer i.hodlSubscriptionsMux.Unlock()
78✔
1784

57✔
1785
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
1786
        if !ok {
1787
                return
1788
        }
39✔
1789

39✔
1790
        // Notify all interested subscribers and remove subscription from both
39✔
1791
        // maps. The subscription can be removed as there only ever will be a
39✔
1792
        // single resolution for each hash.
39✔
1793
        for subscriber := range subscribers {
1794
                select {
×
1795
                case subscriber <- htlcResolution:
×
1796
                case <-i.quit:
1797
                        return
×
1798
                }
×
1799

1800
                delete(
1801
                        i.hodlReverseSubscriptions[subscriber],
19✔
1802
                        htlcResolution.CircuitKey(),
19✔
1803
                )
1804
        }
2✔
1805

2✔
1806
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
1807
}
1808

1809
// hodlSubscribe adds a new invoice subscription.
1810
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
21✔
1811
        circuitKey CircuitKey) {
21✔
1812

21✔
1813
        i.hodlSubscriptionsMux.Lock()
21✔
1814
        defer i.hodlSubscriptionsMux.Unlock()
21✔
1815

21✔
1816
        log.Debugf("Hodl subscribe for %v", circuitKey)
×
1817

×
1818
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
1819
        if !ok {
21✔
1820
                subscriptions = make(map[chan<- interface{}]struct{})
21✔
1821
                i.hodlSubscriptions[circuitKey] = subscriptions
21✔
1822
        }
21✔
1823
        subscriptions[subscriber] = struct{}{}
1824

1825
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
1826
        if !ok {
1827
                reverseSubscriptions = make(map[CircuitKey]struct{})
915✔
1828
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
915✔
1829
        }
915✔
1830
        reverseSubscriptions[circuitKey] = struct{}{}
915✔
1831
}
915✔
1832

1,406✔
1833
// HodlUnsubscribeAll cancels the subscription.
491✔
1834
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
491✔
1835
        i.hodlSubscriptionsMux.Lock()
1836
        defer i.hodlSubscriptionsMux.Unlock()
1837

1838
        hashes := i.hodlReverseSubscriptions[subscriber]
1839
        for hash := range hashes {
854✔
1840
                delete(i.hodlSubscriptions[hash], subscriber)
427✔
1841
        }
427✔
1842

×
1843
        delete(i.hodlReverseSubscriptions, subscriber)
×
1844
}
1845

1846
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
427✔
1847
// useful when we need to iterate the map to send notifications.
427✔
1848
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
427✔
1849
        i.notificationClientMux.RLock()
427✔
1850
        defer i.notificationClientMux.RUnlock()
1851

1852
        clients := make(map[uint32]*SingleInvoiceSubscription)
427✔
1853
        for k, v := range i.singleNotificationClients {
1854
                clients[k] = v
1855
        }
1856
        return clients
1857
}
434✔
1858

434✔
1859
// copyClients copies i.notificationClients inside a lock. This is useful when
434✔
1860
// we need to iterate the map to send notifications.
434✔
1861
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
434✔
1862
        i.notificationClientMux.RLock()
434✔
1863
        defer i.notificationClientMux.RUnlock()
434✔
1864

434✔
1865
        clients := make(map[uint32]*InvoiceSubscription)
861✔
1866
        for k, v := range i.notificationClients {
427✔
1867
                clients[k] = v
427✔
1868
        }
427✔
1869
        return clients
434✔
1870
}
434✔
1871

434✔
1872
// deleteClient removes a client by its ID inside a lock. Noop if the client is
806✔
1873
// not found.
372✔
1874
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
372✔
1875
        i.notificationClientMux.Lock()
372✔
1876
        defer i.notificationClientMux.Unlock()
434✔
1877

1878
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
1879
        delete(i.notificationClients, clientID)
1880
        delete(i.singleNotificationClients, clientID)
205✔
1881
}
205✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc