• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.73
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/lntypes"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/queue"
16
        "github.com/lightningnetwork/lnd/record"
17
)
18

19
var (
20
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
21
        // be accepted or settled with not enough blocks remaining.
22
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
23

24
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
25
        // be accepted or settled with an amount that is too low.
26
        ErrInvoiceAmountTooLow = errors.New(
27
                "paid amount less than invoice amount",
28
        )
29

30
        // ErrShuttingDown is returned when an operation failed because the
31
        // invoice registry is shutting down.
32
        ErrShuttingDown = errors.New("invoice registry shutting down")
33
)
34

35
const (
36
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
37
        // are held while waiting for the other set members to arrive.
38
        DefaultHtlcHoldDuration = 120 * time.Second
39
)
40

41
// RegistryConfig contains the configuration parameters for invoice registry.
42
type RegistryConfig struct {
43
        // FinalCltvRejectDelta defines the number of blocks before the expiry
44
        // of the htlc where we no longer settle it as an exit hop and instead
45
        // cancel it back. Normally this value should be lower than the cltv
46
        // expiry of any invoice we create and the code effectuating this should
47
        // not be hit.
48
        FinalCltvRejectDelta int32
49

50
        // HtlcHoldDuration defines for how long mpp htlcs are held while
51
        // waiting for the other set members to arrive.
52
        HtlcHoldDuration time.Duration
53

54
        // Clock holds the clock implementation that is used to provide
55
        // Now() and TickAfter() and is useful to stub out the clock functions
56
        // during testing.
57
        Clock clock.Clock
58

59
        // AcceptKeySend indicates whether we want to accept spontaneous key
60
        // send payments.
61
        AcceptKeySend bool
62

63
        // AcceptAMP indicates whether we want to accept spontaneous AMP
64
        // payments.
65
        AcceptAMP bool
66

67
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
68
        // all canceled invoices upon start.
69
        GcCanceledInvoicesOnStartup bool
70

71
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
72
        // canceled invoices on the fly.
73
        GcCanceledInvoicesOnTheFly bool
74

75
        // KeysendHoldTime indicates for how long we want to accept and hold
76
        // spontaneous keysend payments.
77
        KeysendHoldTime time.Duration
78

79
        // HtlcInterceptor is an interface that allows the invoice registry to
80
        // let clients intercept invoices before they are settled.
81
        HtlcInterceptor HtlcInterceptor
82
}
83

84
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
85
// mpp htlcs for which the complete set didn't arrive in time.
86
type htlcReleaseEvent struct {
87
        // invoiceRef identifiers the invoice this htlc belongs to.
88
        invoiceRef InvoiceRef
89

90
        // key is the circuit key of the htlc to release.
91
        key CircuitKey
92

93
        // releaseTime is the time at which to release the htlc.
94
        releaseTime time.Time
95
}
96

97
// Less is used to order PriorityQueueItem's by their release time such that
98
// items with the older release time are at the top of the queue.
99
//
100
// NOTE: Part of the queue.PriorityQueueItem interface.
101
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
3✔
102
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
3✔
103
}
3✔
104

105
// InvoiceRegistry is a central registry of all the outstanding invoices
106
// created by the daemon. The registry is a thin wrapper around a map in order
107
// to ensure that all updates/reads are thread safe.
108
type InvoiceRegistry struct {
109
        started atomic.Bool
110
        stopped atomic.Bool
111

112
        sync.RWMutex
113

114
        nextClientID uint32 // must be used atomically
115

116
        idb InvoiceDB
117

118
        // cfg contains the registry's configuration parameters.
119
        cfg *RegistryConfig
120

121
        // notificationClientMux locks notificationClients and
122
        // singleNotificationClients. Using a separate mutex for these maps is
123
        // necessary to avoid deadlocks in the registry when processing invoice
124
        // events.
125
        notificationClientMux sync.RWMutex
126

127
        notificationClients map[uint32]*InvoiceSubscription
128

129
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
130
        // performance.
131
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
132

133
        // invoiceEvents is a single channel over which invoice updates are
134
        // carried.
135
        invoiceEvents chan *invoiceEvent
136

137
        // hodlSubscriptionsMux locks the hodlSubscriptions and
138
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
139
        // necessary to avoid deadlocks in the registry when processing invoice
140
        // events.
141
        hodlSubscriptionsMux sync.RWMutex
142

143
        // hodlSubscriptions is a map from a circuit key to a list of
144
        // subscribers. It is used for efficient notification of links.
145
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
146

147
        // reverseSubscriptions tracks circuit keys subscribed to per
148
        // subscriber. This is used to unsubscribe from all hashes efficiently.
149
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
150

151
        // htlcAutoReleaseChan contains the new htlcs that need to be
152
        // auto-released.
153
        htlcAutoReleaseChan chan *htlcReleaseEvent
154

155
        expiryWatcher *InvoiceExpiryWatcher
156

157
        wg   sync.WaitGroup
158
        quit chan struct{}
159
}
160

161
// NewRegistry creates a new invoice registry. The invoice registry
162
// wraps the persistent on-disk invoice storage with an additional in-memory
163
// layer. The in-memory layer is in place such that debug invoices can be added
164
// which are volatile yet available system wide within the daemon.
165
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
166
        cfg *RegistryConfig) *InvoiceRegistry {
3✔
167

3✔
168
        notificationClients := make(map[uint32]*InvoiceSubscription)
3✔
169
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
3✔
170
        return &InvoiceRegistry{
3✔
171
                idb:                       idb,
3✔
172
                notificationClients:       notificationClients,
3✔
173
                singleNotificationClients: singleNotificationClients,
3✔
174
                invoiceEvents:             make(chan *invoiceEvent, 100),
3✔
175
                hodlSubscriptions: make(
3✔
176
                        map[CircuitKey]map[chan<- interface{}]struct{},
3✔
177
                ),
3✔
178
                hodlReverseSubscriptions: make(
3✔
179
                        map[chan<- interface{}]map[CircuitKey]struct{},
3✔
180
                ),
3✔
181
                cfg:                 cfg,
3✔
182
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
3✔
183
                expiryWatcher:       expiryWatcher,
3✔
184
                quit:                make(chan struct{}),
3✔
185
        }
3✔
186
}
3✔
187

188
// scanInvoicesOnStart will scan all invoices on start and add active invoices
189
// to the invoice expiry watcher while also attempting to delete all canceled
190
// invoices.
191
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
3✔
192
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
3✔
193
        if err != nil {
3✔
194
                return err
×
195
        }
×
196

197
        var pending []invoiceExpiry
3✔
198
        for paymentHash, invoice := range pendingInvoices {
6✔
199
                invoice := invoice
3✔
200
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
3✔
201
                if expiryRef != nil {
6✔
202
                        pending = append(pending, expiryRef)
3✔
203
                }
3✔
204
        }
205

206
        log.Debugf("Adding %d pending invoices to the expiry watcher",
3✔
207
                len(pending))
3✔
208
        i.expiryWatcher.AddInvoices(pending...)
3✔
209

3✔
210
        if i.cfg.GcCanceledInvoicesOnStartup {
3✔
UNCOV
211
                log.Infof("Deleting canceled invoices")
×
UNCOV
212
                err = i.idb.DeleteCanceledInvoices(ctx)
×
UNCOV
213
                if err != nil {
×
214
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
215
                        return err
×
216
                }
×
217
        }
218

219
        return nil
3✔
220
}
221

222
// Start starts the registry and all goroutines it needs to carry out its task.
223
func (i *InvoiceRegistry) Start() error {
3✔
224
        var err error
3✔
225

3✔
226
        log.Info("InvoiceRegistry starting...")
3✔
227

3✔
228
        if i.started.Swap(true) {
3✔
229
                return fmt.Errorf("InvoiceRegistry started more than once")
×
230
        }
×
231
        // Start InvoiceExpiryWatcher and prepopulate it with existing
232
        // active invoices.
233
        err = i.expiryWatcher.Start(
3✔
234
                func(hash lntypes.Hash, force bool) error {
6✔
235
                        return i.cancelInvoiceImpl(
3✔
236
                                context.Background(), hash, force,
3✔
237
                        )
3✔
238
                })
3✔
239
        if err != nil {
3✔
240
                return err
×
241
        }
×
242

243
        i.wg.Add(1)
3✔
244
        go i.invoiceEventLoop()
3✔
245

3✔
246
        // Now scan all pending and removable invoices to the expiry
3✔
247
        // watcher or delete them.
3✔
248
        err = i.scanInvoicesOnStart(context.Background())
3✔
249
        if err != nil {
3✔
250
                _ = i.Stop()
×
251
        }
×
252

253
        log.Debug("InvoiceRegistry started")
3✔
254

3✔
255
        return err
3✔
256
}
257

258
// Stop signals the registry for a graceful shutdown.
259
func (i *InvoiceRegistry) Stop() error {
3✔
260
        log.Info("InvoiceRegistry shutting down...")
3✔
261

3✔
262
        if i.stopped.Swap(true) {
3✔
263
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
264
        }
×
265

266
        log.Info("InvoiceRegistry shutting down...")
3✔
267
        defer log.Debug("InvoiceRegistry shutdown complete")
3✔
268

3✔
269
        var err error
3✔
270
        if i.expiryWatcher == nil {
3✔
271
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
272
                        "initialized")
×
273
        } else {
3✔
274
                i.expiryWatcher.Stop()
3✔
275
        }
3✔
276

277
        close(i.quit)
3✔
278

3✔
279
        i.wg.Wait()
3✔
280

3✔
281
        log.Debug("InvoiceRegistry shutdown complete")
3✔
282

3✔
283
        return err
3✔
284
}
285

286
// invoiceEvent represents a new event that has modified on invoice on disk.
287
// Only two event types are currently supported: newly created invoices, and
288
// instance where invoices are settled.
289
type invoiceEvent struct {
290
        hash    lntypes.Hash
291
        invoice *Invoice
292
        setID   *[32]byte
293
}
294

295
// tickAt returns a channel that ticks at the specified time. If the time has
296
// already passed, it will tick immediately.
297
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
3✔
298
        now := i.cfg.Clock.Now()
3✔
299
        return i.cfg.Clock.TickAfter(t.Sub(now))
3✔
300
}
3✔
301

302
// invoiceEventLoop is the dedicated goroutine responsible for accepting
303
// new notification subscriptions, cancelling old subscriptions, and
304
// dispatching new invoice events.
305
func (i *InvoiceRegistry) invoiceEventLoop() {
3✔
306
        defer i.wg.Done()
3✔
307

3✔
308
        // Set up a heap for htlc auto-releases.
3✔
309
        autoReleaseHeap := &queue.PriorityQueue{}
3✔
310

3✔
311
        for {
6✔
312
                // If there is something to release, set up a release tick
3✔
313
                // channel.
3✔
314
                var nextReleaseTick <-chan time.Time
3✔
315
                if autoReleaseHeap.Len() > 0 {
6✔
316
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
3✔
317
                        nextReleaseTick = i.tickAt(head.releaseTime)
3✔
318
                }
3✔
319

320
                select {
3✔
321
                // A sub-systems has just modified the invoice state, so we'll
322
                // dispatch notifications to all registered clients.
323
                case event := <-i.invoiceEvents:
3✔
324
                        // For backwards compatibility, do not notify all
3✔
325
                        // invoice subscribers of cancel and accept events.
3✔
326
                        state := event.invoice.State
3✔
327
                        if state != ContractCanceled &&
3✔
328
                                state != ContractAccepted {
6✔
329

3✔
330
                                i.dispatchToClients(event)
3✔
331
                        }
3✔
332
                        i.dispatchToSingleClients(event)
3✔
333

334
                // A new htlc came in for auto-release.
335
                case event := <-i.htlcAutoReleaseChan:
3✔
336
                        log.Debugf("Scheduling auto-release for htlc: "+
3✔
337
                                "ref=%v, key=%v at %v",
3✔
338
                                event.invoiceRef, event.key, event.releaseTime)
3✔
339

3✔
340
                        // We use an independent timer for every htlc rather
3✔
341
                        // than a set timer that is reset with every htlc coming
3✔
342
                        // in. Otherwise the sender could keep resetting the
3✔
343
                        // timer until the broadcast window is entered and our
3✔
344
                        // channel is force closed.
3✔
345
                        autoReleaseHeap.Push(event)
3✔
346

347
                // The htlc at the top of the heap needs to be auto-released.
UNCOV
348
                case <-nextReleaseTick:
×
UNCOV
349
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
×
UNCOV
350
                        err := i.cancelSingleHtlc(
×
UNCOV
351
                                event.invoiceRef, event.key, ResultMppTimeout,
×
UNCOV
352
                        )
×
UNCOV
353
                        if err != nil {
×
354
                                log.Errorf("HTLC timer: %v", err)
×
355
                        }
×
356

357
                case <-i.quit:
3✔
358
                        return
3✔
359
                }
360
        }
361
}
362

363
// dispatchToSingleClients passes the supplied event to all notification
364
// clients that subscribed to all the invoice this event applies to.
365
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
3✔
366
        // Dispatch to single invoice subscribers.
3✔
367
        clients := i.copySingleClients()
3✔
368
        for _, client := range clients {
6✔
369
                payHash := client.invoiceRef.PayHash()
3✔
370

3✔
371
                if payHash == nil || *payHash != event.hash {
6✔
372
                        continue
3✔
373
                }
374

375
                select {
3✔
376
                case <-client.backlogDelivered:
3✔
377
                        // We won't deliver any events until the backlog has
378
                        // went through first.
379
                case <-i.quit:
×
380
                        return
×
381
                }
382

383
                client.notify(event)
3✔
384
        }
385
}
386

387
// dispatchToClients passes the supplied event to all notification clients that
388
// subscribed to all invoices. Add and settle indices are used to make sure
389
// that clients don't receive duplicate or unwanted events.
390
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
3✔
391
        invoice := event.invoice
3✔
392

3✔
393
        clients := i.copyClients()
3✔
394
        for clientID, client := range clients {
6✔
395
                // Before we dispatch this event, we'll check
3✔
396
                // to ensure that this client hasn't already
3✔
397
                // received this notification in order to
3✔
398
                // ensure we don't duplicate any events.
3✔
399

3✔
400
                // TODO(joostjager): Refactor switches.
3✔
401
                state := event.invoice.State
3✔
402
                switch {
3✔
403
                // If we've already sent this settle event to
404
                // the client, then we can skip this.
405
                case state == ContractSettled &&
406
                        client.settleIndex >= invoice.SettleIndex:
×
407
                        continue
×
408

409
                // Similarly, if we've already sent this add to
410
                // the client then we can skip this one, but only if this isn't
411
                // an AMP invoice. AMP invoices always remain in the settle
412
                // state as a base invoice.
413
                case event.setID == nil && state == ContractOpen &&
414
                        client.addIndex >= invoice.AddIndex:
×
415
                        continue
×
416

417
                // These two states should never happen, but we
418
                // log them just in case so we can detect this
419
                // instance.
420
                case state == ContractOpen &&
421
                        client.addIndex+1 != invoice.AddIndex:
3✔
422
                        log.Warnf("client=%v for invoice "+
3✔
423
                                "notifications missed an update, "+
3✔
424
                                "add_index=%v, new add event index=%v",
3✔
425
                                clientID, client.addIndex,
3✔
426
                                invoice.AddIndex)
3✔
427

428
                case state == ContractSettled &&
429
                        client.settleIndex+1 != invoice.SettleIndex:
3✔
430
                        log.Warnf("client=%v for invoice "+
3✔
431
                                "notifications missed an update, "+
3✔
432
                                "settle_index=%v, new settle event index=%v",
3✔
433
                                clientID, client.settleIndex,
3✔
434
                                invoice.SettleIndex)
3✔
435
                }
436

437
                select {
3✔
438
                case <-client.backlogDelivered:
3✔
439
                        // We won't deliver any events until the backlog has
440
                        // been processed.
441
                case <-i.quit:
×
442
                        return
×
443
                }
444

445
                err := client.notify(&invoiceEvent{
3✔
446
                        invoice: invoice,
3✔
447
                        setID:   event.setID,
3✔
448
                })
3✔
449
                if err != nil {
3✔
450
                        log.Errorf("Failed dispatching to client: %v", err)
×
451
                        return
×
452
                }
×
453

454
                // Each time we send a notification to a client, we'll record
455
                // the latest add/settle index it has. We'll use this to ensure
456
                // we don't send a notification twice, which can happen if a new
457
                // event is added while we're catching up a new client.
458
                invState := event.invoice.State
3✔
459
                switch {
3✔
460
                case invState == ContractSettled:
3✔
461
                        client.settleIndex = invoice.SettleIndex
3✔
462

463
                case invState == ContractOpen && event.setID == nil:
3✔
464
                        client.addIndex = invoice.AddIndex
3✔
465

466
                // If this is an AMP invoice, then we'll need to use the set ID
467
                // to keep track of the settle index of the client. AMP
468
                // invoices never go to the open state, but if a setID is
469
                // passed, then we know it was just settled and will track the
470
                // highest settle index so far.
471
                case invState == ContractOpen && event.setID != nil:
3✔
472
                        setID := *event.setID
3✔
473
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
3✔
474

475
                default:
×
476
                        log.Errorf("unexpected invoice state: %v",
×
477
                                event.invoice.State)
×
478
                }
479
        }
480
}
481

482
// deliverBacklogEvents will attempts to query the invoice database for any
483
// notifications that the client has missed since it reconnected last.
484
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
485
        client *InvoiceSubscription) error {
3✔
486

3✔
487
        log.Debugf("Collecting added invoices since %v for client %v",
3✔
488
                client.addIndex, client.id)
3✔
489

3✔
490
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
3✔
491
        if err != nil {
3✔
492
                return err
×
493
        }
×
494

495
        log.Debugf("Collecting settled invoices since %v for client %v",
3✔
496
                client.settleIndex, client.id)
3✔
497

3✔
498
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
3✔
499
        if err != nil {
3✔
500
                return err
×
501
        }
×
502

503
        log.Debugf("Delivering %d added invoices and %d settled invoices "+
3✔
504
                "for client %v", len(addEvents), len(settleEvents), client.id)
3✔
505

3✔
506
        // If we have any to deliver, then we'll append them to the end of the
3✔
507
        // notification queue in order to catch up the client before delivering
3✔
508
        // any new notifications.
3✔
509
        for _, addEvent := range addEvents {
6✔
510
                // We re-bind the loop variable to ensure we don't hold onto
3✔
511
                // the loop reference causing is to point to the same item.
3✔
512
                addEvent := addEvent
3✔
513

3✔
514
                select {
3✔
515
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
516
                        invoice: &addEvent,
517
                }:
3✔
518
                case <-i.quit:
×
519
                        return ErrShuttingDown
×
520
                }
521
        }
522

523
        for _, settleEvent := range settleEvents {
6✔
524
                // We re-bind the loop variable to ensure we don't hold onto
3✔
525
                // the loop reference causing is to point to the same item.
3✔
526
                settleEvent := settleEvent
3✔
527

3✔
528
                select {
3✔
529
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
530
                        invoice: &settleEvent,
531
                }:
3✔
532
                case <-i.quit:
×
533
                        return ErrShuttingDown
×
534
                }
535
        }
536

537
        return nil
3✔
538
}
539

540
// deliverSingleBacklogEvents will attempt to query the invoice database to
541
// retrieve the current invoice state and deliver this to the subscriber. Single
542
// invoice subscribers will always receive the current state right after
543
// subscribing. Only in case the invoice does not yet exist, nothing is sent
544
// yet.
545
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
546
        client *SingleInvoiceSubscription) error {
3✔
547

3✔
548
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
3✔
549

3✔
550
        // It is possible that the invoice does not exist yet, but the client is
3✔
551
        // already watching it in anticipation.
3✔
552
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
3✔
553
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
3✔
554
        if isNotFound || isNotCreated {
6✔
555
                return nil
3✔
556
        }
3✔
557
        if err != nil {
3✔
558
                return err
×
559
        }
×
560

561
        payHash := client.invoiceRef.PayHash()
3✔
562
        if payHash == nil {
3✔
563
                return nil
×
564
        }
×
565

566
        err = client.notify(&invoiceEvent{
3✔
567
                hash:    *payHash,
3✔
568
                invoice: &invoice,
3✔
569
        })
3✔
570
        if err != nil {
3✔
571
                return err
×
572
        }
×
573

574
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
3✔
575
                client.id, payHash)
3✔
576

3✔
577
        return nil
3✔
578
}
579

580
// AddInvoice adds a regular invoice for the specified amount, identified by
581
// the passed preimage. Additionally, any memo or receipt data provided will
582
// also be stored on-disk. Once this invoice is added, subsystems within the
583
// daemon add/forward HTLCs are able to obtain the proper preimage required for
584
// redemption in the case that we're the final destination. We also return the
585
// addIndex of the newly created invoice which monotonically increases for each
586
// new invoice added.  A side effect of this function is that it also sets
587
// AddIndex on the invoice argument.
588
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
589
        paymentHash lntypes.Hash) (uint64, error) {
3✔
590

3✔
591
        i.Lock()
3✔
592

3✔
593
        ref := InvoiceRefByHash(paymentHash)
3✔
594
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
3✔
595

3✔
596
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
3✔
597
        if err != nil {
6✔
598
                i.Unlock()
3✔
599
                return 0, err
3✔
600
        }
3✔
601

602
        // Now that we've added the invoice, we'll send dispatch a message to
603
        // notify the clients of this new invoice.
604
        i.notifyClients(paymentHash, invoice, nil)
3✔
605
        i.Unlock()
3✔
606

3✔
607
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
3✔
608
        // to avoid deadlock when a new invoice is added while an other is being
3✔
609
        // canceled.
3✔
610
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
3✔
611
        if invoiceExpiryRef != nil {
6✔
612
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
3✔
613
        }
3✔
614

615
        return addIndex, nil
3✔
616
}
617

618
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
619
// then we're able to pull the funds pending within an HTLC.
620
//
621
// TODO(roasbeef): ignore if settled?
622
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
623
        rHash lntypes.Hash) (Invoice, error) {
3✔
624

3✔
625
        // We'll check the database to see if there's an existing matching
3✔
626
        // invoice.
3✔
627
        ref := InvoiceRefByHash(rHash)
3✔
628
        return i.idb.LookupInvoice(ctx, ref)
3✔
629
}
3✔
630

631
// LookupInvoiceByRef looks up an invoice by the given reference, if found
632
// then we're able to pull the funds pending within an HTLC.
633
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
634
        ref InvoiceRef) (Invoice, error) {
3✔
635

3✔
636
        return i.idb.LookupInvoice(ctx, ref)
3✔
637
}
3✔
638

639
// startHtlcTimer starts a new timer via the invoice registry main loop that
640
// cancels a single htlc on an invoice when the htlc hold duration has passed.
641
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
642
        key CircuitKey, acceptTime time.Time) error {
3✔
643

3✔
644
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
3✔
645
        event := &htlcReleaseEvent{
3✔
646
                invoiceRef:  invoiceRef,
3✔
647
                key:         key,
3✔
648
                releaseTime: releaseTime,
3✔
649
        }
3✔
650

3✔
651
        select {
3✔
652
        case i.htlcAutoReleaseChan <- event:
3✔
653
                return nil
3✔
654

655
        case <-i.quit:
×
656
                return ErrShuttingDown
×
657
        }
658
}
659

660
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
661
// a resolution result which will be used to notify subscribed links and
662
// resolvers of the details of the htlc cancellation.
663
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
UNCOV
664
        key CircuitKey, result FailResolutionResult) error {
×
UNCOV
665

×
UNCOV
666
        updateInvoice := func(invoice *Invoice, setID *SetID) (
×
UNCOV
667
                *InvoiceUpdateDesc, error) {
×
UNCOV
668

×
UNCOV
669
                // Only allow individual htlc cancellation on open invoices.
×
UNCOV
670
                if invoice.State != ContractOpen {
×
UNCOV
671
                        log.Debugf("CancelSingleHtlc: cannot cancel htlc %v "+
×
UNCOV
672
                                "on invoice %v, invoice is no longer open", key,
×
UNCOV
673
                                invoiceRef)
×
UNCOV
674

×
UNCOV
675
                        return nil, nil
×
UNCOV
676
                }
×
677

678
                // Also for AMP invoices we fetch the relevant HTLCs, so
679
                // the HTLC should be found, otherwise we return an error.
UNCOV
680
                htlc, ok := invoice.Htlcs[key]
×
UNCOV
681
                if !ok {
×
682
                        return nil, fmt.Errorf("htlc %v not found on "+
×
683
                                "invoice %v", key, invoiceRef)
×
684
                }
×
685

UNCOV
686
                htlcState := htlc.State
×
UNCOV
687

×
UNCOV
688
                // Cancellation is only possible if the htlc wasn't already
×
UNCOV
689
                // resolved.
×
UNCOV
690
                if htlcState != HtlcStateAccepted {
×
UNCOV
691
                        log.Debugf("CancelSingleHtlc: htlc %v on invoice %v "+
×
UNCOV
692
                                "is already resolved", key, invoiceRef)
×
UNCOV
693

×
UNCOV
694
                        return nil, nil
×
UNCOV
695
                }
×
696

UNCOV
697
                log.Debugf("CancelSingleHtlc: cancelling htlc %v on invoice %v",
×
UNCOV
698
                        key, invoiceRef)
×
UNCOV
699

×
UNCOV
700
                // Return an update descriptor that cancels htlc and keeps
×
UNCOV
701
                // invoice open.
×
UNCOV
702
                canceledHtlcs := map[CircuitKey]struct{}{
×
UNCOV
703
                        key: {},
×
UNCOV
704
                }
×
UNCOV
705

×
UNCOV
706
                return &InvoiceUpdateDesc{
×
UNCOV
707
                        UpdateType:  CancelHTLCsUpdate,
×
UNCOV
708
                        CancelHtlcs: canceledHtlcs,
×
UNCOV
709
                        SetID:       setID,
×
UNCOV
710
                }, nil
×
711
        }
712

713
        // Try to mark the specified htlc as canceled in the invoice database.
714
        // Intercept the update descriptor to set the local updated variable. If
715
        // no invoice update is performed, we can return early.
716
        // setID is only set for AMP HTLCs, so it can be nil and it is expected
717
        // to be nil for non-AMP HTLCs.
UNCOV
718
        setID := (*SetID)(invoiceRef.SetID())
×
UNCOV
719

×
UNCOV
720
        var updated bool
×
UNCOV
721
        invoice, err := i.idb.UpdateInvoice(
×
UNCOV
722
                context.Background(), invoiceRef, setID,
×
UNCOV
723
                func(invoice *Invoice) (
×
UNCOV
724
                        *InvoiceUpdateDesc, error) {
×
UNCOV
725

×
UNCOV
726
                        updateDesc, err := updateInvoice(invoice, setID)
×
UNCOV
727
                        if err != nil {
×
728
                                return nil, err
×
729
                        }
×
UNCOV
730
                        updated = updateDesc != nil
×
UNCOV
731

×
UNCOV
732
                        return updateDesc, err
×
733
                },
734
        )
UNCOV
735
        if err != nil {
×
736
                return err
×
737
        }
×
UNCOV
738
        if !updated {
×
UNCOV
739
                return nil
×
UNCOV
740
        }
×
741

742
        // The invoice has been updated. Notify subscribers of the htlc
743
        // resolution.
UNCOV
744
        htlc, ok := invoice.Htlcs[key]
×
UNCOV
745
        if !ok {
×
746
                return fmt.Errorf("htlc %v not found", key)
×
747
        }
×
UNCOV
748
        if htlc.State == HtlcStateCanceled {
×
UNCOV
749
                resolution := NewFailResolution(
×
UNCOV
750
                        key, int32(htlc.AcceptHeight), result,
×
UNCOV
751
                )
×
UNCOV
752

×
UNCOV
753
                log.Debugf("Signaling htlc(%v) cancellation of invoice(%v) "+
×
UNCOV
754
                        "with resolution(%v) to the link subsystem", key,
×
UNCOV
755
                        invoiceRef, result)
×
UNCOV
756

×
UNCOV
757
                i.notifyHodlSubscribers(resolution)
×
UNCOV
758
        }
×
759

UNCOV
760
        return nil
×
761
}
762

763
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
764
// htlc.
765
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
3✔
766
        // Retrieve keysend record if present.
3✔
767
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
3✔
768
        if !ok {
6✔
769
                return nil
3✔
770
        }
3✔
771

772
        // Cancel htlc is preimage is invalid.
773
        preimage, err := lntypes.MakePreimage(preimageSlice)
3✔
774
        if err != nil {
3✔
UNCOV
775
                return err
×
UNCOV
776
        }
×
777
        if preimage.Hash() != ctx.hash {
3✔
778
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
779
                        preimage, ctx.hash)
×
780
        }
×
781

782
        // Only allow keysend for non-mpp payments.
783
        if ctx.mpp != nil {
3✔
784
                return errors.New("no mpp keysend supported")
×
785
        }
×
786

787
        // Create an invoice for the htlc amount.
788
        amt := ctx.amtPaid
3✔
789

3✔
790
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
3✔
791
        // be able to pay to it with keysend.
3✔
792
        rawFeatures := lnwire.NewRawFeatureVector(
3✔
793
                lnwire.TLVOnionPayloadRequired,
3✔
794
        )
3✔
795
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
3✔
796

3✔
797
        // Use the minimum block delta that we require for settling htlcs.
3✔
798
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
3✔
799

3✔
800
        // Pre-check expiry here to prevent inserting an invoice that will not
3✔
801
        // be settled.
3✔
802
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
3✔
803
                return errors.New("final expiry too soon")
×
804
        }
×
805

806
        // The invoice database indexes all invoices by payment address, however
807
        // legacy keysend payment do not have one. In order to avoid a new
808
        // payment type on-disk wrt. to indexing, we'll continue to insert a
809
        // blank payment address which is special cased in the insertion logic
810
        // to not be indexed. In the future, once AMP is merged, this should be
811
        // replaced by generating a random payment address on the behalf of the
812
        // sender.
813
        payAddr := BlankPayAddr
3✔
814

3✔
815
        // Create placeholder invoice.
3✔
816
        invoice := &Invoice{
3✔
817
                CreationDate: i.cfg.Clock.Now(),
3✔
818
                Terms: ContractTerm{
3✔
819
                        FinalCltvDelta:  finalCltvDelta,
3✔
820
                        Value:           amt,
3✔
821
                        PaymentPreimage: &preimage,
3✔
822
                        PaymentAddr:     payAddr,
3✔
823
                        Features:        features,
3✔
824
                },
3✔
825
        }
3✔
826

3✔
827
        if i.cfg.KeysendHoldTime != 0 {
3✔
UNCOV
828
                invoice.HodlInvoice = true
×
UNCOV
829
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
×
UNCOV
830
        }
×
831

832
        // Insert invoice into database. Ignore duplicates, because this
833
        // may be a replay.
834
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
3✔
835
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
3✔
836
                return err
×
837
        }
×
838

839
        return nil
3✔
840
}
841

842
// processAMP just-in-time inserts an invoice if this htlc is a keysend
843
// htlc.
844
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
3✔
845
        // AMP payments MUST also include an MPP record.
3✔
846
        if ctx.mpp == nil {
3✔
UNCOV
847
                return errors.New("no MPP record for AMP")
×
UNCOV
848
        }
×
849

850
        // Create an invoice for the total amount expected, provided in the MPP
851
        // record.
852
        amt := ctx.mpp.TotalMsat()
3✔
853

3✔
854
        // Set the TLV required and MPP optional features on the invoice. We'll
3✔
855
        // also make the AMP features required so that it can't be paid by
3✔
856
        // legacy or MPP htlcs.
3✔
857
        rawFeatures := lnwire.NewRawFeatureVector(
3✔
858
                lnwire.TLVOnionPayloadRequired,
3✔
859
                lnwire.PaymentAddrOptional,
3✔
860
                lnwire.AMPRequired,
3✔
861
        )
3✔
862
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
3✔
863

3✔
864
        // Use the minimum block delta that we require for settling htlcs.
3✔
865
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
3✔
866

3✔
867
        // Pre-check expiry here to prevent inserting an invoice that will not
3✔
868
        // be settled.
3✔
869
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
3✔
870
                return errors.New("final expiry too soon")
×
871
        }
×
872

873
        // We'll use the sender-generated payment address provided in the HTLC
874
        // to create our AMP invoice.
875
        payAddr := ctx.mpp.PaymentAddr()
3✔
876

3✔
877
        // Create placeholder invoice.
3✔
878
        invoice := &Invoice{
3✔
879
                CreationDate: i.cfg.Clock.Now(),
3✔
880
                Terms: ContractTerm{
3✔
881
                        FinalCltvDelta:  finalCltvDelta,
3✔
882
                        Value:           amt,
3✔
883
                        PaymentPreimage: nil,
3✔
884
                        PaymentAddr:     payAddr,
3✔
885
                        Features:        features,
3✔
886
                },
3✔
887
        }
3✔
888

3✔
889
        // Insert invoice into database. Ignore duplicates payment hashes and
3✔
890
        // payment addrs, this may be a replay or a different HTLC for the AMP
3✔
891
        // invoice.
3✔
892
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
3✔
893
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
3✔
894
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
3✔
895
        switch {
3✔
896
        case isDuplicatedInvoice || isDuplicatedPayAddr:
3✔
897
                return nil
3✔
898
        default:
3✔
899
                return err
3✔
900
        }
901
}
902

903
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
904
// describes how the htlc should be resolved.
905
//
906
// When the preimage of the invoice is not yet known (hodl invoice), this
907
// function moves the invoice to the accepted state. When SettleHoldInvoice is
908
// called later, a resolution message will be send back to the caller via the
909
// provided hodlChan. Invoice registry sends on this channel what action needs
910
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
911
// the channel is either buffered or received on from another goroutine to
912
// prevent deadlock.
913
//
914
// In the case that the htlc is part of a larger set of htlcs that pay to the
915
// same invoice (multi-path payment), the htlc is held until the set is
916
// complete. If the set doesn't fully arrive in time, a timer will cancel the
917
// held htlc.
918
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
919
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
920
        circuitKey CircuitKey, hodlChan chan<- interface{},
921
        wireCustomRecords lnwire.CustomRecords,
922
        payload Payload) (HtlcResolution, error) {
3✔
923

3✔
924
        // Create the update context containing the relevant details of the
3✔
925
        // incoming htlc.
3✔
926
        ctx := invoiceUpdateCtx{
3✔
927
                hash:                 rHash,
3✔
928
                circuitKey:           circuitKey,
3✔
929
                amtPaid:              amtPaid,
3✔
930
                expiry:               expiry,
3✔
931
                currentHeight:        currentHeight,
3✔
932
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
3✔
933
                wireCustomRecords:    wireCustomRecords,
3✔
934
                customRecords:        payload.CustomRecords(),
3✔
935
                mpp:                  payload.MultiPath(),
3✔
936
                amp:                  payload.AMPRecord(),
3✔
937
                metadata:             payload.Metadata(),
3✔
938
                pathID:               payload.PathID(),
3✔
939
                totalAmtMsat:         payload.TotalAmtMsat(),
3✔
940
        }
3✔
941

3✔
942
        switch {
3✔
943
        // If we are accepting spontaneous AMP payments and this payload
944
        // contains an AMP record, create an AMP invoice that will be settled
945
        // below.
946
        case i.cfg.AcceptAMP && ctx.amp != nil:
3✔
947
                err := i.processAMP(ctx)
3✔
948
                if err != nil {
3✔
UNCOV
949
                        ctx.log(fmt.Sprintf("amp error: %v", err))
×
UNCOV
950

×
UNCOV
951
                        return NewFailResolution(
×
UNCOV
952
                                circuitKey, currentHeight, ResultAmpError,
×
UNCOV
953
                        ), nil
×
UNCOV
954
                }
×
955

956
        // If we are accepting spontaneous keysend payments, create a regular
957
        // invoice that will be settled below. We also enforce that this is only
958
        // done when no AMP payload is present since it will only be settle-able
959
        // by regular HTLCs.
960
        case i.cfg.AcceptKeySend && ctx.amp == nil:
3✔
961
                err := i.processKeySend(ctx)
3✔
962
                if err != nil {
3✔
UNCOV
963
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
×
UNCOV
964

×
UNCOV
965
                        return NewFailResolution(
×
UNCOV
966
                                circuitKey, currentHeight, ResultKeySendError,
×
UNCOV
967
                        ), nil
×
UNCOV
968
                }
×
969
        }
970

971
        // Execute locked notify exit hop logic.
972
        i.Lock()
3✔
973
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
3✔
974
                &ctx, hodlChan,
3✔
975
        )
3✔
976
        i.Unlock()
3✔
977
        if err != nil {
3✔
978
                return nil, err
×
979
        }
×
980

981
        if invoiceToExpire != nil {
6✔
982
                i.expiryWatcher.AddInvoices(invoiceToExpire)
3✔
983
        }
3✔
984

985
        switch r := resolution.(type) {
3✔
986
        // The htlc is held. Start a timer outside the lock if the htlc should
987
        // be auto-released, because otherwise a deadlock may happen with the
988
        // main event loop.
989
        case *htlcAcceptResolution:
3✔
990
                if r.autoRelease {
6✔
991
                        var invRef InvoiceRef
3✔
992
                        if ctx.amp != nil {
6✔
993
                                invRef = InvoiceRefBySetID(*ctx.setID())
3✔
994
                        } else {
6✔
995
                                invRef = ctx.invoiceRef()
3✔
996
                        }
3✔
997

998
                        err := i.startHtlcTimer(
3✔
999
                                invRef, circuitKey, r.acceptTime,
3✔
1000
                        )
3✔
1001
                        if err != nil {
3✔
1002
                                return nil, err
×
1003
                        }
×
1004
                }
1005

1006
                // We return a nil resolution because htlc acceptances are
1007
                // represented as nil resolutions externally.
1008
                // TODO(carla) update calling code to handle accept resolutions.
1009
                return nil, nil
3✔
1010

1011
        // A direct resolution was received for this htlc.
1012
        case HtlcResolution:
3✔
1013
                return r, nil
3✔
1014

1015
        // Fail if an unknown resolution type was received.
1016
        default:
×
1017
                return nil, errors.New("invalid resolution type")
×
1018
        }
1019
}
1020

1021
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1022
// that should be executed inside the registry lock. The returned invoiceExpiry
1023
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1024
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1025
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1026
        HtlcResolution, invoiceExpiry, error) {
3✔
1027

3✔
1028
        invoiceRef := ctx.invoiceRef()
3✔
1029

3✔
1030
        // This setID is only set for AMP HTLCs, so it can be nil and it is
3✔
1031
        // also expected to be nil for non-AMP HTLCs.
3✔
1032
        setID := (*SetID)(ctx.setID())
3✔
1033

3✔
1034
        // We need to look up the current state of the invoice in order to send
3✔
1035
        // the previously accepted/settled HTLCs to the interceptor.
3✔
1036
        existingInvoice, err := i.idb.LookupInvoice(
3✔
1037
                context.Background(), invoiceRef,
3✔
1038
        )
3✔
1039
        switch {
3✔
1040
        case errors.Is(err, ErrInvoiceNotFound) ||
1041
                errors.Is(err, ErrNoInvoicesCreated):
3✔
1042

3✔
1043
                // If the invoice was not found, return a failure resolution
3✔
1044
                // with an invoice not found result.
3✔
1045
                return NewFailResolution(
3✔
1046
                        ctx.circuitKey, ctx.currentHeight,
3✔
1047
                        ResultInvoiceNotFound,
3✔
1048
                ), nil, nil
3✔
1049

1050
        case err != nil:
×
1051
                ctx.log(err.Error())
×
1052
                return nil, nil, err
×
1053
        }
1054

1055
        var cancelSet bool
3✔
1056

3✔
1057
        // Provide the invoice to the settlement interceptor to allow
3✔
1058
        // the interceptor's client an opportunity to manipulate the
3✔
1059
        // settlement process.
3✔
1060
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
3✔
1061
                WireCustomRecords:  ctx.wireCustomRecords,
3✔
1062
                ExitHtlcCircuitKey: ctx.circuitKey,
3✔
1063
                ExitHtlcAmt:        ctx.amtPaid,
3✔
1064
                ExitHtlcExpiry:     ctx.expiry,
3✔
1065
                CurrentHeight:      uint32(ctx.currentHeight),
3✔
1066
                Invoice:            existingInvoice,
3✔
1067
        }, func(resp HtlcModifyResponse) {
6✔
1068
                log.Debugf("Received invoice HTLC interceptor response: %v",
3✔
1069
                        resp)
3✔
1070

3✔
1071
                if resp.AmountPaid != 0 {
6✔
1072
                        ctx.amtPaid = resp.AmountPaid
3✔
1073
                }
3✔
1074

1075
                cancelSet = resp.CancelSet
3✔
1076
        })
1077
        if err != nil {
3✔
1078
                err := fmt.Errorf("error during invoice HTLC interception: %w",
×
1079
                        err)
×
1080
                ctx.log(err.Error())
×
1081

×
1082
                return nil, nil, err
×
1083
        }
×
1084

1085
        // We'll attempt to settle an invoice matching this rHash on disk (if
1086
        // one exists). The callback will update the invoice state and/or htlcs.
1087
        var (
3✔
1088
                resolution        HtlcResolution
3✔
1089
                updateSubscribers bool
3✔
1090
        )
3✔
1091
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1092
                // First check if this is a replayed htlc and resolve it
3✔
1093
                // according to its current state. We cannot decide differently
3✔
1094
                // once the HTLC has already been processed before.
3✔
1095
                isReplayed, res, err := resolveReplayedHtlc(ctx, inv)
3✔
1096
                if err != nil {
3✔
1097
                        return nil, err
×
1098
                }
×
1099
                if isReplayed {
6✔
1100
                        resolution = res
3✔
1101
                        return nil, nil
3✔
1102
                }
3✔
1103

1104
                // In case the HTLC interceptor cancels the HTLC set, we do NOT
1105
                // cancel the invoice however we cancel the complete HTLC set.
1106
                if cancelSet {
6✔
1107
                        // If the invoice is not open, something is wrong, we
3✔
1108
                        // fail just the HTLC with the specific error.
3✔
1109
                        if inv.State != ContractOpen {
3✔
1110
                                log.Errorf("Invoice state (%v) is not OPEN, "+
×
1111
                                        "cancelling HTLC set not allowed by "+
×
1112
                                        "external source", inv.State)
×
1113

×
1114
                                resolution = NewFailResolution(
×
1115
                                        ctx.circuitKey, ctx.currentHeight,
×
1116
                                        ResultInvoiceNotOpen,
×
1117
                                )
×
1118

×
1119
                                return nil, nil
×
1120
                        }
×
1121

1122
                        // The error `ExternalValidationFailed` error
1123
                        // information will be packed in the
1124
                        // `FailIncorrectDetails` msg when sending the msg to
1125
                        // the peer. Error codes are defined by the BOLT 04
1126
                        // specification. The error text can be arbitrary
1127
                        // therefore we return a custom error msg.
1128
                        resolution = NewFailResolution(
3✔
1129
                                ctx.circuitKey, ctx.currentHeight,
3✔
1130
                                ExternalValidationFailed,
3✔
1131
                        )
3✔
1132

3✔
1133
                        // We cancel all HTLCs which are in the accepted state.
3✔
1134
                        //
3✔
1135
                        // NOTE: The current HTLC is not included because it
3✔
1136
                        // was never accepted in the first place.
3✔
1137
                        htlcs := inv.HTLCSet(ctx.setID(), HtlcStateAccepted)
3✔
1138
                        htlcKeys := fn.KeySet[CircuitKey](htlcs)
3✔
1139

3✔
1140
                        // The external source did cancel the htlc set, so we
3✔
1141
                        // cancel all HTLCs in the set. We however keep the
3✔
1142
                        // invoice in the open state.
3✔
1143
                        //
3✔
1144
                        // NOTE: The invoice event loop will still call the
3✔
1145
                        // `cancelSingleHTLC` method for MPP payments, however
3✔
1146
                        // because the HTLCs are already cancled back it will be
3✔
1147
                        // a NOOP.
3✔
1148
                        update := &InvoiceUpdateDesc{
3✔
1149
                                UpdateType:  CancelHTLCsUpdate,
3✔
1150
                                CancelHtlcs: htlcKeys,
3✔
1151
                                SetID:       setID,
3✔
1152
                        }
3✔
1153

3✔
1154
                        return update, nil
3✔
1155
                }
1156

1157
                updateDesc, res, err := updateInvoice(ctx, inv)
3✔
1158
                if err != nil {
3✔
1159
                        return nil, err
×
1160
                }
×
1161

1162
                // Set resolution in outer scope only after successful update.
1163
                resolution = res
3✔
1164

3✔
1165
                // Only send an update if the invoice state was changed.
3✔
1166
                updateSubscribers = updateDesc != nil &&
3✔
1167
                        updateDesc.State != nil
3✔
1168

3✔
1169
                return updateDesc, nil
3✔
1170
        }
1171

1172
        invoice, err := i.idb.UpdateInvoice(
3✔
1173
                context.Background(), invoiceRef, setID, callback,
3✔
1174
        )
3✔
1175

3✔
1176
        var duplicateSetIDErr ErrDuplicateSetID
3✔
1177
        if errors.As(err, &duplicateSetIDErr) {
3✔
1178
                return NewFailResolution(
×
1179
                        ctx.circuitKey, ctx.currentHeight,
×
1180
                        ResultInvoiceNotFound,
×
1181
                ), nil, nil
×
1182
        }
×
1183

1184
        switch {
3✔
1185
        case errors.Is(err, ErrInvoiceNotFound):
×
1186
                // If the invoice was not found, return a failure resolution
×
1187
                // with an invoice not found result.
×
1188
                return NewFailResolution(
×
1189
                        ctx.circuitKey, ctx.currentHeight,
×
1190
                        ResultInvoiceNotFound,
×
1191
                ), nil, nil
×
1192

1193
        case errors.Is(err, ErrInvRefEquivocation):
×
1194
                return NewFailResolution(
×
1195
                        ctx.circuitKey, ctx.currentHeight,
×
1196
                        ResultInvoiceNotFound,
×
1197
                ), nil, nil
×
1198

1199
        case err == nil:
3✔
1200

1201
        default:
×
1202
                ctx.log(err.Error())
×
1203
                return nil, nil, err
×
1204
        }
1205

1206
        var invoiceToExpire invoiceExpiry
3✔
1207

3✔
1208
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
3✔
1209

3✔
1210
        switch res := resolution.(type) {
3✔
1211
        case *HtlcFailResolution:
3✔
1212
                // Inspect latest htlc state on the invoice. If it is found,
3✔
1213
                // we will update the accept height as it was recorded in the
3✔
1214
                // invoice database (which occurs in the case where the htlc
3✔
1215
                // reached the database in a previous call). If the htlc was
3✔
1216
                // not found on the invoice, it was immediately failed so we
3✔
1217
                // send the failure resolution as is, which has the current
3✔
1218
                // height set as the accept height.
3✔
1219
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
3✔
1220
                if ok {
6✔
1221
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1222
                }
3✔
1223

1224
                ctx.log(fmt.Sprintf("failure resolution result "+
3✔
1225
                        "outcome: %v, at accept height: %v",
3✔
1226
                        res.Outcome, res.AcceptHeight))
3✔
1227

3✔
1228
                // Some failures apply to the entire HTLC set. Break here if
3✔
1229
                // this isn't one of them.
3✔
1230
                if !res.Outcome.IsSetFailure() {
6✔
1231
                        break
3✔
1232
                }
1233

1234
                // Also cancel any HTLCs in the HTLC set that are also in the
1235
                // canceled state with the same failure result.
1236
                setID := ctx.setID()
3✔
1237
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
3✔
1238
                for key, htlc := range canceledHtlcSet {
3✔
UNCOV
1239
                        htlcFailResolution := NewFailResolution(
×
UNCOV
1240
                                key, int32(htlc.AcceptHeight), res.Outcome,
×
UNCOV
1241
                        )
×
UNCOV
1242

×
UNCOV
1243
                        i.notifyHodlSubscribers(htlcFailResolution)
×
UNCOV
1244
                }
×
1245

1246
        // If the htlc was settled, we will settle any previously accepted
1247
        // htlcs and notify our peer to settle them.
1248
        case *HtlcSettleResolution:
3✔
1249
                ctx.log(fmt.Sprintf("settle resolution result "+
3✔
1250
                        "outcome: %v, at accept height: %v",
3✔
1251
                        res.Outcome, res.AcceptHeight))
3✔
1252

3✔
1253
                // Also settle any previously accepted htlcs. If a htlc is
3✔
1254
                // marked as settled, we should follow now and settle the htlc
3✔
1255
                // with our peer.
3✔
1256
                setID := ctx.setID()
3✔
1257
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
3✔
1258
                for key, htlc := range settledHtlcSet {
6✔
1259
                        preimage := res.Preimage
3✔
1260
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
6✔
1261
                                preimage = *htlc.AMP.Preimage
3✔
1262
                        }
3✔
1263

1264
                        // Notify subscribers that the htlcs should be settled
1265
                        // with our peer. Note that the outcome of the
1266
                        // resolution is set based on the outcome of the single
1267
                        // htlc that we just settled, so may not be accurate
1268
                        // for all htlcs.
1269
                        htlcSettleResolution := NewSettleResolution(
3✔
1270
                                preimage, key,
3✔
1271
                                int32(htlc.AcceptHeight), res.Outcome,
3✔
1272
                        )
3✔
1273

3✔
1274
                        // Notify subscribers that the htlc should be settled
3✔
1275
                        // with our peer.
3✔
1276
                        i.notifyHodlSubscribers(htlcSettleResolution)
3✔
1277
                }
1278

1279
                // If concurrent payments were attempted to this invoice before
1280
                // the current one was ultimately settled, cancel back any of
1281
                // the HTLCs immediately. As a result of the settle, the HTLCs
1282
                // in other HTLC sets are automatically converted to a canceled
1283
                // state when updating the invoice.
1284
                //
1285
                // TODO(roasbeef): can remove now??
1286
                canceledHtlcSet := invoice.HTLCSetCompliment(
3✔
1287
                        setID, HtlcStateCanceled,
3✔
1288
                )
3✔
1289
                for key, htlc := range canceledHtlcSet {
3✔
1290
                        htlcFailResolution := NewFailResolution(
×
1291
                                key, int32(htlc.AcceptHeight),
×
1292
                                ResultInvoiceAlreadySettled,
×
1293
                        )
×
1294

×
1295
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1296
                }
×
1297

1298
        // If we accepted the htlc, subscribe to the hodl invoice and return
1299
        // an accept resolution with the htlc's accept time on it.
1300
        case *htlcAcceptResolution:
3✔
1301
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
3✔
1302
                if !ok {
3✔
1303
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1304
                                " present on invoice: %x", ctx.circuitKey,
×
1305
                                ctx.hash[:])
×
1306
                }
×
1307

1308
                // Determine accepted height of this htlc. If the htlc reached
1309
                // the invoice database (possibly in a previous call to the
1310
                // invoice registry), we'll take the original accepted height
1311
                // as it was recorded in the database.
1312
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
3✔
1313

3✔
1314
                ctx.log(fmt.Sprintf("accept resolution result "+
3✔
1315
                        "outcome: %v, at accept height: %v",
3✔
1316
                        res.outcome, acceptHeight))
3✔
1317

3✔
1318
                // Auto-release the htlc if the invoice is still open. It can
3✔
1319
                // only happen for mpp payments that there are htlcs in state
3✔
1320
                // Accepted while the invoice is Open.
3✔
1321
                if invoice.State == ContractOpen {
6✔
1322
                        res.acceptTime = invoiceHtlc.AcceptTime
3✔
1323
                        res.autoRelease = true
3✔
1324
                }
3✔
1325

1326
                // If we have fully accepted the set of htlcs for this invoice,
1327
                // we can now add it to our invoice expiry watcher. We do not
1328
                // add invoices before they are fully accepted, because it is
1329
                // possible that we MppTimeout the htlcs, and then our relevant
1330
                // expiry height could change.
1331
                if res.outcome == resultAccepted {
6✔
1332
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
3✔
1333
                }
3✔
1334

1335
                // Subscribe to the resolution if the caller specified a
1336
                // notification channel.
1337
                if hodlChan != nil {
6✔
1338
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
3✔
1339
                }
3✔
1340

1341
        default:
×
1342
                panic("unknown action")
×
1343
        }
1344

1345
        // Now that the links have been notified of any state changes to their
1346
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1347
        // state changes.
1348
        if updateSubscribers {
6✔
1349
                // We'll add a setID onto the notification, but only if this is
3✔
1350
                // an AMP invoice being settled.
3✔
1351
                var setID *[32]byte
3✔
1352
                if _, ok := resolution.(*HtlcSettleResolution); ok {
6✔
1353
                        setID = ctx.setID()
3✔
1354
                }
3✔
1355

1356
                i.notifyClients(ctx.hash, invoice, setID)
3✔
1357
        }
1358

1359
        return resolution, invoiceToExpire, nil
3✔
1360
}
1361

1362
// SettleHodlInvoice sets the preimage of a hodl invoice.
1363
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1364
        preimage lntypes.Preimage) error {
3✔
1365

3✔
1366
        i.Lock()
3✔
1367
        defer i.Unlock()
3✔
1368

3✔
1369
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1370
                switch invoice.State {
3✔
1371
                case ContractOpen:
×
1372
                        return nil, ErrInvoiceStillOpen
×
1373

1374
                case ContractCanceled:
×
1375
                        return nil, ErrInvoiceAlreadyCanceled
×
1376

UNCOV
1377
                case ContractSettled:
×
UNCOV
1378
                        return nil, ErrInvoiceAlreadySettled
×
1379
                }
1380

1381
                return &InvoiceUpdateDesc{
3✔
1382
                        UpdateType: SettleHodlInvoiceUpdate,
3✔
1383
                        State: &InvoiceStateUpdateDesc{
3✔
1384
                                NewState: ContractSettled,
3✔
1385
                                Preimage: &preimage,
3✔
1386
                        },
3✔
1387
                }, nil
3✔
1388
        }
1389

1390
        hash := preimage.Hash()
3✔
1391
        invoiceRef := InvoiceRefByHash(hash)
3✔
1392

3✔
1393
        // AMP hold invoices are not supported so we set the setID to nil.
3✔
1394
        // For non-AMP invoices this parameter is ignored during the fetching
3✔
1395
        // of the database state.
3✔
1396
        setID := (*SetID)(nil)
3✔
1397

3✔
1398
        invoice, err := i.idb.UpdateInvoice(
3✔
1399
                ctx, invoiceRef, setID, updateInvoice,
3✔
1400
        )
3✔
1401
        if err != nil {
3✔
UNCOV
1402
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
×
UNCOV
1403
                        preimage, err)
×
UNCOV
1404

×
UNCOV
1405
                return err
×
UNCOV
1406
        }
×
1407

1408
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
3✔
1409
                invoice.Terms.PaymentPreimage)
3✔
1410

3✔
1411
        // In the callback, we marked the invoice as settled. UpdateInvoice will
3✔
1412
        // have seen this and should have moved all htlcs that were accepted to
3✔
1413
        // the settled state. In the loop below, we go through all of these and
3✔
1414
        // notify links and resolvers that are waiting for resolution. Any htlcs
3✔
1415
        // that were already settled before, will be notified again. This isn't
3✔
1416
        // necessary but doesn't hurt either.
3✔
1417
        for key, htlc := range invoice.Htlcs {
6✔
1418
                if htlc.State != HtlcStateSettled {
3✔
1419
                        continue
×
1420
                }
1421

1422
                resolution := NewSettleResolution(
3✔
1423
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
3✔
1424
                )
3✔
1425

3✔
1426
                i.notifyHodlSubscribers(resolution)
3✔
1427
        }
1428
        i.notifyClients(hash, invoice, nil)
3✔
1429

3✔
1430
        return nil
3✔
1431
}
1432

1433
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1434
// payment hash.
1435
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1436
        payHash lntypes.Hash) error {
3✔
1437

3✔
1438
        return i.cancelInvoiceImpl(ctx, payHash, true)
3✔
1439
}
3✔
1440

1441
// shouldCancel examines the state of an invoice and whether we want to
1442
// cancel already accepted invoices, taking our force cancel boolean into
1443
// account. This is pulled out into its own function so that tests that mock
1444
// cancelInvoiceImpl can reuse this logic.
1445
func shouldCancel(state ContractState, cancelAccepted bool) bool {
3✔
1446
        if state != ContractAccepted {
6✔
1447
                return true
3✔
1448
        }
3✔
1449

1450
        // If the invoice is accepted, we should only cancel if we want to
1451
        // force cancellation of accepted invoices.
1452
        return cancelAccepted
3✔
1453
}
1454

1455
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1456
// payment hash. Accepted invoices will only be canceled if explicitly
1457
// requested to do so. It notifies subscribing links and resolvers that
1458
// the associated htlcs were canceled if they change state.
1459
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1460
        payHash lntypes.Hash, cancelAccepted bool) error {
3✔
1461

3✔
1462
        i.Lock()
3✔
1463
        defer i.Unlock()
3✔
1464

3✔
1465
        ref := InvoiceRefByHash(payHash)
3✔
1466
        log.Debugf("Invoice%v: canceling invoice", ref)
3✔
1467

3✔
1468
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1469
                if !shouldCancel(invoice.State, cancelAccepted) {
3✔
UNCOV
1470
                        return nil, nil
×
UNCOV
1471
                }
×
1472

1473
                // Move invoice to the canceled state. Rely on validation in
1474
                // channeldb to return an error if the invoice is already
1475
                // settled or canceled.
1476
                return &InvoiceUpdateDesc{
3✔
1477
                        UpdateType: CancelInvoiceUpdate,
3✔
1478
                        State: &InvoiceStateUpdateDesc{
3✔
1479
                                NewState: ContractCanceled,
3✔
1480
                        },
3✔
1481
                }, nil
3✔
1482
        }
1483

1484
        // If it's an AMP invoice we need to fetch all AMP HTLCs here so that
1485
        // we can cancel all of HTLCs which are in the accepted state across
1486
        // different setIDs.
1487
        setID := (*SetID)(nil)
3✔
1488
        invoiceRef := InvoiceRefByHash(payHash)
3✔
1489
        invoice, err := i.idb.UpdateInvoice(
3✔
1490
                ctx, invoiceRef, setID, updateInvoice,
3✔
1491
        )
3✔
1492

3✔
1493
        // Implement idempotency by returning success if the invoice was already
3✔
1494
        // canceled.
3✔
1495
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
3✔
UNCOV
1496
                log.Debugf("Invoice%v: already canceled", ref)
×
UNCOV
1497
                return nil
×
UNCOV
1498
        }
×
1499
        if err != nil {
6✔
1500
                return err
3✔
1501
        }
3✔
1502

1503
        // Return without cancellation if the invoice state is ContractAccepted.
1504
        if invoice.State == ContractAccepted {
3✔
UNCOV
1505
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
×
UNCOV
1506
                        "explicitly requested.", ref)
×
UNCOV
1507
                return nil
×
UNCOV
1508
        }
×
1509

1510
        log.Debugf("Invoice%v: canceled", ref)
3✔
1511

3✔
1512
        // In the callback, some htlcs may have been moved to the canceled
3✔
1513
        // state. We now go through all of these and notify links and resolvers
3✔
1514
        // that are waiting for resolution. Any htlcs that were already canceled
3✔
1515
        // before, will be notified again. This isn't necessary but doesn't hurt
3✔
1516
        // either.
3✔
1517
        // For AMP invoices we fetched all AMP HTLCs for all sub AMP invoices
3✔
1518
        // here so we can clean up all of them.
3✔
1519
        for key, htlc := range invoice.Htlcs {
6✔
1520
                if htlc.State != HtlcStateCanceled {
3✔
1521
                        continue
×
1522
                }
1523

1524
                i.notifyHodlSubscribers(
3✔
1525
                        NewFailResolution(
3✔
1526
                                key, int32(htlc.AcceptHeight), ResultCanceled,
3✔
1527
                        ),
3✔
1528
                )
3✔
1529
        }
1530

1531
        i.notifyClients(payHash, invoice, nil)
3✔
1532

3✔
1533
        // Attempt to also delete the invoice if requested through the registry
3✔
1534
        // config.
3✔
1535
        if i.cfg.GcCanceledInvoicesOnTheFly {
3✔
UNCOV
1536
                // Assemble the delete reference and attempt to delete through
×
UNCOV
1537
                // the invocice from the DB.
×
UNCOV
1538
                deleteRef := InvoiceDeleteRef{
×
UNCOV
1539
                        PayHash:     payHash,
×
UNCOV
1540
                        AddIndex:    invoice.AddIndex,
×
UNCOV
1541
                        SettleIndex: invoice.SettleIndex,
×
UNCOV
1542
                }
×
UNCOV
1543
                if invoice.Terms.PaymentAddr != BlankPayAddr {
×
1544
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1545
                }
×
1546

UNCOV
1547
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
×
UNCOV
1548
                // If by any chance deletion failed, then log it instead of
×
UNCOV
1549
                // returning the error, as the invoice itself has already been
×
UNCOV
1550
                // canceled.
×
UNCOV
1551
                if err != nil {
×
1552
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1553
                                err)
×
1554
                }
×
1555
        }
1556

1557
        return nil
3✔
1558
}
1559

1560
// notifyClients notifies all currently registered invoice notification clients
1561
// of a newly added/settled invoice.
1562
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1563
        invoice *Invoice, setID *[32]byte) {
3✔
1564

3✔
1565
        event := &invoiceEvent{
3✔
1566
                invoice: invoice,
3✔
1567
                hash:    hash,
3✔
1568
                setID:   setID,
3✔
1569
        }
3✔
1570

3✔
1571
        select {
3✔
1572
        case i.invoiceEvents <- event:
3✔
1573
        case <-i.quit:
×
1574
        }
1575
}
1576

1577
// invoiceSubscriptionKit defines that are common to both all invoice
1578
// subscribers and single invoice subscribers.
1579
type invoiceSubscriptionKit struct {
1580
        id uint32 // nolint:structcheck
1581

1582
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1583
        quit chan struct{}
1584

1585
        ntfnQueue *queue.ConcurrentQueue
1586

1587
        canceled   uint32 // To be used atomically.
1588
        cancelChan chan struct{}
1589

1590
        // backlogDelivered is closed when the backlog events have been
1591
        // delivered.
1592
        backlogDelivered chan struct{}
1593
}
1594

1595
// InvoiceSubscription represents an intent to receive updates for newly added
1596
// or settled invoices. For each newly added invoice, a copy of the invoice
1597
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1598
// invoice, a copy of the invoice will be sent over the SettledInvoices
1599
// channel.
1600
type InvoiceSubscription struct {
1601
        invoiceSubscriptionKit
1602

1603
        // NewInvoices is a channel that we'll use to send all newly created
1604
        // invoices with an invoice index greater than the specified
1605
        // StartingInvoiceIndex field.
1606
        NewInvoices chan *Invoice
1607

1608
        // SettledInvoices is a channel that we'll use to send all settled
1609
        // invoices with an invoices index greater than the specified
1610
        // StartingInvoiceIndex field.
1611
        SettledInvoices chan *Invoice
1612

1613
        // addIndex is the highest add index the caller knows of. We'll use
1614
        // this information to send out an event backlog to the notifications
1615
        // subscriber. Any new add events with an index greater than this will
1616
        // be dispatched before any new notifications are sent out.
1617
        addIndex uint64
1618

1619
        // settleIndex is the highest settle index the caller knows of. We'll
1620
        // use this information to send out an event backlog to the
1621
        // notifications subscriber. Any new settle events with an index
1622
        // greater than this will be dispatched before any new notifications
1623
        // are sent out.
1624
        settleIndex uint64
1625
}
1626

1627
// SingleInvoiceSubscription represents an intent to receive updates for a
1628
// specific invoice.
1629
type SingleInvoiceSubscription struct {
1630
        invoiceSubscriptionKit
1631

1632
        invoiceRef InvoiceRef
1633

1634
        // Updates is a channel that we'll use to send all invoice events for
1635
        // the invoice that is subscribed to.
1636
        Updates chan *Invoice
1637
}
1638

1639
// PayHash returns the optional payment hash of the target invoice.
1640
//
1641
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1642
// be deleted as soon as invoiceregistery_test is in the same module.
UNCOV
1643
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
×
UNCOV
1644
        return s.invoiceRef.PayHash()
×
UNCOV
1645
}
×
1646

1647
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1648
// resources.
1649
func (i *invoiceSubscriptionKit) Cancel() {
3✔
1650
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
3✔
1651
                return
×
1652
        }
×
1653

1654
        i.ntfnQueue.Stop()
3✔
1655
        close(i.cancelChan)
3✔
1656
}
1657

1658
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
3✔
1659
        select {
3✔
1660
        case i.ntfnQueue.ChanIn() <- event:
3✔
1661

1662
        case <-i.cancelChan:
×
1663
                // This can only be triggered by delivery of non-backlog
×
1664
                // events.
×
1665
                return ErrShuttingDown
×
1666
        case <-i.quit:
×
1667
                return ErrShuttingDown
×
1668
        }
1669

1670
        return nil
3✔
1671
}
1672

1673
// SubscribeNotifications returns an InvoiceSubscription which allows the
1674
// caller to receive async notifications when any invoices are settled or
1675
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1676
// by first sending out all new events with an invoice index _greater_ than
1677
// this value. Afterwards, we'll send out real-time notifications.
1678
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1679
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
3✔
1680

3✔
1681
        client := &InvoiceSubscription{
3✔
1682
                NewInvoices:     make(chan *Invoice),
3✔
1683
                SettledInvoices: make(chan *Invoice),
3✔
1684
                addIndex:        addIndex,
3✔
1685
                settleIndex:     settleIndex,
3✔
1686
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3✔
1687
                        quit:             i.quit,
3✔
1688
                        ntfnQueue:        queue.NewConcurrentQueue(20),
3✔
1689
                        cancelChan:       make(chan struct{}),
3✔
1690
                        backlogDelivered: make(chan struct{}),
3✔
1691
                },
3✔
1692
        }
3✔
1693
        client.ntfnQueue.Start()
3✔
1694

3✔
1695
        // This notifies other goroutines that the backlog phase is over.
3✔
1696
        defer close(client.backlogDelivered)
3✔
1697

3✔
1698
        // Always increment by 1 first, and our client ID will start with 1,
3✔
1699
        // not 0.
3✔
1700
        client.id = atomic.AddUint32(&i.nextClientID, 1)
3✔
1701

3✔
1702
        // Before we register this new invoice subscription, we'll launch a new
3✔
1703
        // goroutine that will proxy all notifications appended to the end of
3✔
1704
        // the concurrent queue to the two client-side channels the caller will
3✔
1705
        // feed off of.
3✔
1706
        i.wg.Add(1)
3✔
1707
        go func() {
6✔
1708
                defer i.wg.Done()
3✔
1709
                defer i.deleteClient(client.id)
3✔
1710

3✔
1711
                for {
6✔
1712
                        select {
3✔
1713
                        // A new invoice event has been sent by the
1714
                        // invoiceRegistry! We'll figure out if this is an add
1715
                        // event or a settle event, then dispatch the event to
1716
                        // the client.
1717
                        case ntfn := <-client.ntfnQueue.ChanOut():
3✔
1718
                                invoiceEvent := ntfn.(*invoiceEvent)
3✔
1719

3✔
1720
                                var targetChan chan *Invoice
3✔
1721
                                state := invoiceEvent.invoice.State
3✔
1722
                                switch {
3✔
1723
                                // AMP invoices never move to settled, but will
1724
                                // be sent with a set ID if an HTLC set is
1725
                                // being settled.
1726
                                case state == ContractOpen &&
1727
                                        invoiceEvent.setID != nil:
3✔
1728
                                        fallthrough
3✔
1729

1730
                                case state == ContractSettled:
3✔
1731
                                        targetChan = client.SettledInvoices
3✔
1732

1733
                                case state == ContractOpen:
3✔
1734
                                        targetChan = client.NewInvoices
3✔
1735

1736
                                default:
×
1737
                                        log.Errorf("unknown invoice state: %v",
×
1738
                                                state)
×
1739

×
1740
                                        continue
×
1741
                                }
1742

1743
                                select {
3✔
1744
                                case targetChan <- invoiceEvent.invoice:
3✔
1745

1746
                                case <-client.cancelChan:
×
1747
                                        return
×
1748

1749
                                case <-i.quit:
×
1750
                                        return
×
1751
                                }
1752

1753
                        case <-client.cancelChan:
3✔
1754
                                return
3✔
1755

1756
                        case <-i.quit:
×
1757
                                return
×
1758
                        }
1759
                }
1760
        }()
1761

1762
        i.notificationClientMux.Lock()
3✔
1763
        i.notificationClients[client.id] = client
3✔
1764
        i.notificationClientMux.Unlock()
3✔
1765

3✔
1766
        // Query the database to see if based on the provided addIndex and
3✔
1767
        // settledIndex we need to deliver any backlog notifications.
3✔
1768
        err := i.deliverBacklogEvents(ctx, client)
3✔
1769
        if err != nil {
3✔
1770
                return nil, err
×
1771
        }
×
1772

1773
        log.Infof("New invoice subscription client: id=%v", client.id)
3✔
1774

3✔
1775
        return client, nil
3✔
1776
}
1777

1778
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1779
// caller to receive async notifications for a specific invoice.
1780
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1781
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
3✔
1782

3✔
1783
        client := &SingleInvoiceSubscription{
3✔
1784
                Updates: make(chan *Invoice),
3✔
1785
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3✔
1786
                        quit:             i.quit,
3✔
1787
                        ntfnQueue:        queue.NewConcurrentQueue(20),
3✔
1788
                        cancelChan:       make(chan struct{}),
3✔
1789
                        backlogDelivered: make(chan struct{}),
3✔
1790
                },
3✔
1791
                invoiceRef: InvoiceRefByHash(hash),
3✔
1792
        }
3✔
1793
        client.ntfnQueue.Start()
3✔
1794

3✔
1795
        // This notifies other goroutines that the backlog phase is done.
3✔
1796
        defer close(client.backlogDelivered)
3✔
1797

3✔
1798
        // Always increment by 1 first, and our client ID will start with 1,
3✔
1799
        // not 0.
3✔
1800
        client.id = atomic.AddUint32(&i.nextClientID, 1)
3✔
1801

3✔
1802
        // Before we register this new invoice subscription, we'll launch a new
3✔
1803
        // goroutine that will proxy all notifications appended to the end of
3✔
1804
        // the concurrent queue to the two client-side channels the caller will
3✔
1805
        // feed off of.
3✔
1806
        i.wg.Add(1)
3✔
1807
        go func() {
6✔
1808
                defer i.wg.Done()
3✔
1809
                defer i.deleteClient(client.id)
3✔
1810

3✔
1811
                for {
6✔
1812
                        select {
3✔
1813
                        // A new invoice event has been sent by the
1814
                        // invoiceRegistry. We will dispatch the event to the
1815
                        // client.
1816
                        case ntfn := <-client.ntfnQueue.ChanOut():
3✔
1817
                                invoiceEvent := ntfn.(*invoiceEvent)
3✔
1818

3✔
1819
                                select {
3✔
1820
                                case client.Updates <- invoiceEvent.invoice:
3✔
1821

1822
                                case <-client.cancelChan:
×
1823
                                        return
×
1824

1825
                                case <-i.quit:
×
1826
                                        return
×
1827
                                }
1828

1829
                        case <-client.cancelChan:
3✔
1830
                                return
3✔
1831

UNCOV
1832
                        case <-i.quit:
×
UNCOV
1833
                                return
×
1834
                        }
1835
                }
1836
        }()
1837

1838
        i.notificationClientMux.Lock()
3✔
1839
        i.singleNotificationClients[client.id] = client
3✔
1840
        i.notificationClientMux.Unlock()
3✔
1841

3✔
1842
        err := i.deliverSingleBacklogEvents(ctx, client)
3✔
1843
        if err != nil {
3✔
1844
                return nil, err
×
1845
        }
×
1846

1847
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
3✔
1848
                client.id, client.invoiceRef)
3✔
1849

3✔
1850
        return client, nil
3✔
1851
}
1852

1853
// notifyHodlSubscribers sends out the htlc resolution to all current
1854
// subscribers.
1855
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
3✔
1856
        i.hodlSubscriptionsMux.Lock()
3✔
1857
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1858

3✔
1859
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
3✔
1860
        if !ok {
6✔
1861
                return
3✔
1862
        }
3✔
1863

1864
        // Notify all interested subscribers and remove subscription from both
1865
        // maps. The subscription can be removed as there only ever will be a
1866
        // single resolution for each hash.
1867
        for subscriber := range subscribers {
6✔
1868
                select {
3✔
1869
                case subscriber <- htlcResolution:
3✔
1870
                case <-i.quit:
×
1871
                        return
×
1872
                }
1873

1874
                delete(
3✔
1875
                        i.hodlReverseSubscriptions[subscriber],
3✔
1876
                        htlcResolution.CircuitKey(),
3✔
1877
                )
3✔
1878
        }
1879

1880
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
3✔
1881
}
1882

1883
// hodlSubscribe adds a new invoice subscription.
1884
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1885
        circuitKey CircuitKey) {
3✔
1886

3✔
1887
        i.hodlSubscriptionsMux.Lock()
3✔
1888
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1889

3✔
1890
        log.Debugf("Hodl subscribe for %v", circuitKey)
3✔
1891

3✔
1892
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
3✔
1893
        if !ok {
6✔
1894
                subscriptions = make(map[chan<- interface{}]struct{})
3✔
1895
                i.hodlSubscriptions[circuitKey] = subscriptions
3✔
1896
        }
3✔
1897
        subscriptions[subscriber] = struct{}{}
3✔
1898

3✔
1899
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
3✔
1900
        if !ok {
6✔
1901
                reverseSubscriptions = make(map[CircuitKey]struct{})
3✔
1902
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
3✔
1903
        }
3✔
1904
        reverseSubscriptions[circuitKey] = struct{}{}
3✔
1905
}
1906

1907
// HodlUnsubscribeAll cancels the subscription.
1908
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
3✔
1909
        i.hodlSubscriptionsMux.Lock()
3✔
1910
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1911

3✔
1912
        hashes := i.hodlReverseSubscriptions[subscriber]
3✔
1913
        for hash := range hashes {
6✔
1914
                delete(i.hodlSubscriptions[hash], subscriber)
3✔
1915
        }
3✔
1916

1917
        delete(i.hodlReverseSubscriptions, subscriber)
3✔
1918
}
1919

1920
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1921
// useful when we need to iterate the map to send notifications.
1922
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
3✔
1923
        i.notificationClientMux.RLock()
3✔
1924
        defer i.notificationClientMux.RUnlock()
3✔
1925

3✔
1926
        clients := make(map[uint32]*SingleInvoiceSubscription)
3✔
1927
        for k, v := range i.singleNotificationClients {
6✔
1928
                clients[k] = v
3✔
1929
        }
3✔
1930
        return clients
3✔
1931
}
1932

1933
// copyClients copies i.notificationClients inside a lock. This is useful when
1934
// we need to iterate the map to send notifications.
1935
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
3✔
1936
        i.notificationClientMux.RLock()
3✔
1937
        defer i.notificationClientMux.RUnlock()
3✔
1938

3✔
1939
        clients := make(map[uint32]*InvoiceSubscription)
3✔
1940
        for k, v := range i.notificationClients {
6✔
1941
                clients[k] = v
3✔
1942
        }
3✔
1943
        return clients
3✔
1944
}
1945

1946
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1947
// not found.
1948
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
3✔
1949
        i.notificationClientMux.Lock()
3✔
1950
        defer i.notificationClientMux.Unlock()
3✔
1951

3✔
1952
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
3✔
1953
        delete(i.notificationClients, clientID)
3✔
1954
        delete(i.singleNotificationClients, clientID)
3✔
1955
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc