• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12300125568

12 Dec 2024 03:57PM UTC coverage: 58.636% (-0.006%) from 58.642%
12300125568

Pull #9355

github

Roasbeef
contractcourt: add rapid derived fuzz test for HtlcAuxBlob

In this commit, we add a rapid derived fuzz test for the HtlcAuxBlob
test. This uses the rapid (randomized property testing) into Go's built
in fuzzer. This wrapper will use the fuzz stream, and pass that into
rapid where the stream is used to make structured test inputs which are
tested against the existing properties.

This can be done more widely in the codebase, we pick a simple example
to port first before tackling others.
Pull Request #9355: contractcourt: add rapid derived fuzz test for HtlcAuxBlob

134438 of 229277 relevant lines covered (58.64%)

19324.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.63
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77

78
        // HtlcInterceptor is an interface that allows the invoice registry to
79
        // let clients intercept invoices before they are settled.
80
        HtlcInterceptor HtlcInterceptor
81
}
82

83
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
84
// mpp htlcs for which the complete set didn't arrive in time.
85
type htlcReleaseEvent struct {
86
        // invoiceRef identifiers the invoice this htlc belongs to.
87
        invoiceRef InvoiceRef
88

89
        // key is the circuit key of the htlc to release.
90
        key CircuitKey
91

92
        // releaseTime is the time at which to release the htlc.
93
        releaseTime time.Time
94
}
95

96
// Less is used to order PriorityQueueItem's by their release time such that
97
// items with the older release time are at the top of the queue.
98
//
99
// NOTE: Part of the queue.PriorityQueueItem interface.
100
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
10✔
101
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
10✔
102
}
10✔
103

104
// InvoiceRegistry is a central registry of all the outstanding invoices
105
// created by the daemon. The registry is a thin wrapper around a map in order
106
// to ensure that all updates/reads are thread safe.
107
type InvoiceRegistry struct {
108
        started atomic.Bool
109
        stopped atomic.Bool
110

111
        sync.RWMutex
112

113
        nextClientID uint32 // must be used atomically
114

115
        idb InvoiceDB
116

117
        // cfg contains the registry's configuration parameters.
118
        cfg *RegistryConfig
119

120
        // notificationClientMux locks notificationClients and
121
        // singleNotificationClients. Using a separate mutex for these maps is
122
        // necessary to avoid deadlocks in the registry when processing invoice
123
        // events.
124
        notificationClientMux sync.RWMutex
125

126
        notificationClients map[uint32]*InvoiceSubscription
127

128
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
129
        // performance.
130
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
131

132
        // invoiceEvents is a single channel over which invoice updates are
133
        // carried.
134
        invoiceEvents chan *invoiceEvent
135

136
        // hodlSubscriptionsMux locks the hodlSubscriptions and
137
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
138
        // necessary to avoid deadlocks in the registry when processing invoice
139
        // events.
140
        hodlSubscriptionsMux sync.RWMutex
141

142
        // hodlSubscriptions is a map from a circuit key to a list of
143
        // subscribers. It is used for efficient notification of links.
144
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
145

146
        // reverseSubscriptions tracks circuit keys subscribed to per
147
        // subscriber. This is used to unsubscribe from all hashes efficiently.
148
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
149

150
        // htlcAutoReleaseChan contains the new htlcs that need to be
151
        // auto-released.
152
        htlcAutoReleaseChan chan *htlcReleaseEvent
153

154
        expiryWatcher *InvoiceExpiryWatcher
155

156
        wg   sync.WaitGroup
157
        quit chan struct{}
158
}
159

160
// NewRegistry creates a new invoice registry. The invoice registry
161
// wraps the persistent on-disk invoice storage with an additional in-memory
162
// layer. The in-memory layer is in place such that debug invoices can be added
163
// which are volatile yet available system wide within the daemon.
164
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
165
        cfg *RegistryConfig) *InvoiceRegistry {
640✔
166

640✔
167
        notificationClients := make(map[uint32]*InvoiceSubscription)
640✔
168
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
640✔
169
        return &InvoiceRegistry{
640✔
170
                idb:                       idb,
640✔
171
                notificationClients:       notificationClients,
640✔
172
                singleNotificationClients: singleNotificationClients,
640✔
173
                invoiceEvents:             make(chan *invoiceEvent, 100),
640✔
174
                hodlSubscriptions: make(
640✔
175
                        map[CircuitKey]map[chan<- interface{}]struct{},
640✔
176
                ),
640✔
177
                hodlReverseSubscriptions: make(
640✔
178
                        map[chan<- interface{}]map[CircuitKey]struct{},
640✔
179
                ),
640✔
180
                cfg:                 cfg,
640✔
181
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
640✔
182
                expiryWatcher:       expiryWatcher,
640✔
183
                quit:                make(chan struct{}),
640✔
184
        }
640✔
185
}
640✔
186

187
// scanInvoicesOnStart will scan all invoices on start and add active invoices
188
// to the invoice expiry watcher while also attempting to delete all canceled
189
// invoices.
190
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
640✔
191
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
640✔
192
        if err != nil {
640✔
193
                return err
×
194
        }
×
195

196
        var pending []invoiceExpiry
640✔
197
        for paymentHash, invoice := range pendingInvoices {
674✔
198
                invoice := invoice
34✔
199
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
34✔
200
                if expiryRef != nil {
68✔
201
                        pending = append(pending, expiryRef)
34✔
202
                }
34✔
203
        }
204

205
        log.Debugf("Adding %d pending invoices to the expiry watcher",
640✔
206
                len(pending))
640✔
207
        i.expiryWatcher.AddInvoices(pending...)
640✔
208

640✔
209
        if i.cfg.GcCanceledInvoicesOnStartup {
643✔
210
                log.Infof("Deleting canceled invoices")
3✔
211
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
212
                if err != nil {
3✔
213
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
214
                        return err
×
215
                }
×
216
        }
217

218
        return nil
640✔
219
}
220

221
// Start starts the registry and all goroutines it needs to carry out its task.
222
func (i *InvoiceRegistry) Start() error {
640✔
223
        var err error
640✔
224

640✔
225
        log.Info("InvoiceRegistry starting...")
640✔
226

640✔
227
        if i.started.Swap(true) {
640✔
228
                return fmt.Errorf("InvoiceRegistry started more than once")
×
229
        }
×
230
        // Start InvoiceExpiryWatcher and prepopulate it with existing
231
        // active invoices.
232
        err = i.expiryWatcher.Start(
640✔
233
                func(hash lntypes.Hash, force bool) error {
717✔
234
                        return i.cancelInvoiceImpl(
77✔
235
                                context.Background(), hash, force,
77✔
236
                        )
77✔
237
                })
77✔
238
        if err != nil {
640✔
239
                return err
×
240
        }
×
241

242
        i.wg.Add(1)
640✔
243
        go i.invoiceEventLoop()
640✔
244

640✔
245
        // Now scan all pending and removable invoices to the expiry
640✔
246
        // watcher or delete them.
640✔
247
        err = i.scanInvoicesOnStart(context.Background())
640✔
248
        if err != nil {
640✔
249
                _ = i.Stop()
×
250
        }
×
251

252
        log.Debug("InvoiceRegistry started")
640✔
253

640✔
254
        return err
640✔
255
}
256

257
// Stop signals the registry for a graceful shutdown.
258
func (i *InvoiceRegistry) Stop() error {
379✔
259
        log.Info("InvoiceRegistry shutting down...")
379✔
260

379✔
261
        if i.stopped.Swap(true) {
379✔
262
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
263
        }
×
264

265
        log.Info("InvoiceRegistry shutting down...")
379✔
266
        defer log.Debug("InvoiceRegistry shutdown complete")
379✔
267

379✔
268
        var err error
379✔
269
        if i.expiryWatcher == nil {
379✔
270
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
271
                        "initialized")
×
272
        } else {
379✔
273
                i.expiryWatcher.Stop()
379✔
274
        }
379✔
275

276
        close(i.quit)
379✔
277

379✔
278
        i.wg.Wait()
379✔
279

379✔
280
        log.Debug("InvoiceRegistry shutdown complete")
379✔
281

379✔
282
        return err
379✔
283
}
284

285
// invoiceEvent represents a new event that has modified on invoice on disk.
286
// Only two event types are currently supported: newly created invoices, and
287
// instance where invoices are settled.
288
type invoiceEvent struct {
289
        hash    lntypes.Hash
290
        invoice *Invoice
291
        setID   *[32]byte
292
}
293

294
// tickAt returns a channel that ticks at the specified time. If the time has
295
// already passed, it will tick immediately.
296
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
654✔
297
        now := i.cfg.Clock.Now()
654✔
298
        return i.cfg.Clock.TickAfter(t.Sub(now))
654✔
299
}
654✔
300

301
// invoiceEventLoop is the dedicated goroutine responsible for accepting
302
// new notification subscriptions, cancelling old subscriptions, and
303
// dispatching new invoice events.
304
func (i *InvoiceRegistry) invoiceEventLoop() {
640✔
305
        defer i.wg.Done()
640✔
306

640✔
307
        // Set up a heap for htlc auto-releases.
640✔
308
        autoReleaseHeap := &queue.PriorityQueue{}
640✔
309

640✔
310
        for {
3,012✔
311
                // If there is something to release, set up a release tick
2,372✔
312
                // channel.
2,372✔
313
                var nextReleaseTick <-chan time.Time
2,372✔
314
                if autoReleaseHeap.Len() > 0 {
3,026✔
315
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
654✔
316
                        nextReleaseTick = i.tickAt(head.releaseTime)
654✔
317
                }
654✔
318

319
                select {
2,372✔
320
                // A sub-systems has just modified the invoice state, so we'll
321
                // dispatch notifications to all registered clients.
322
                case event := <-i.invoiceEvents:
1,394✔
323
                        // For backwards compatibility, do not notify all
1,394✔
324
                        // invoice subscribers of cancel and accept events.
1,394✔
325
                        state := event.invoice.State
1,394✔
326
                        if state != ContractCanceled &&
1,394✔
327
                                state != ContractAccepted {
2,632✔
328

1,238✔
329
                                i.dispatchToClients(event)
1,238✔
330
                        }
1,238✔
331
                        i.dispatchToSingleClients(event)
1,394✔
332

333
                // A new htlc came in for auto-release.
334
                case event := <-i.htlcAutoReleaseChan:
334✔
335
                        log.Debugf("Scheduling auto-release for htlc: "+
334✔
336
                                "ref=%v, key=%v at %v",
334✔
337
                                event.invoiceRef, event.key, event.releaseTime)
334✔
338

334✔
339
                        // We use an independent timer for every htlc rather
334✔
340
                        // than a set timer that is reset with every htlc coming
334✔
341
                        // in. Otherwise the sender could keep resetting the
334✔
342
                        // timer until the broadcast window is entered and our
334✔
343
                        // channel is force closed.
334✔
344
                        autoReleaseHeap.Push(event)
334✔
345

346
                // The htlc at the top of the heap needs to be auto-released.
347
                case <-nextReleaseTick:
12✔
348
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
12✔
349
                        err := i.cancelSingleHtlc(
12✔
350
                                event.invoiceRef, event.key, ResultMppTimeout,
12✔
351
                        )
12✔
352
                        if err != nil {
12✔
353
                                log.Errorf("HTLC timer: %v", err)
×
354
                        }
×
355

356
                case <-i.quit:
379✔
357
                        return
379✔
358
                }
359
        }
360
}
361

362
// dispatchToSingleClients passes the supplied event to all notification
363
// clients that subscribed to all the invoice this event applies to.
364
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
1,394✔
365
        // Dispatch to single invoice subscribers.
1,394✔
366
        clients := i.copySingleClients()
1,394✔
367
        for _, client := range clients {
1,434✔
368
                payHash := client.invoiceRef.PayHash()
40✔
369

40✔
370
                if payHash == nil || *payHash != event.hash {
44✔
371
                        continue
4✔
372
                }
373

374
                select {
40✔
375
                case <-client.backlogDelivered:
40✔
376
                        // We won't deliver any events until the backlog has
377
                        // went through first.
378
                case <-i.quit:
×
379
                        return
×
380
                }
381

382
                client.notify(event)
40✔
383
        }
384
}
385

386
// dispatchToClients passes the supplied event to all notification clients that
387
// subscribed to all invoices. Add and settle indices are used to make sure
388
// that clients don't receive duplicate or unwanted events.
389
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
1,238✔
390
        invoice := event.invoice
1,238✔
391

1,238✔
392
        clients := i.copyClients()
1,238✔
393
        for clientID, client := range clients {
1,308✔
394
                // Before we dispatch this event, we'll check
70✔
395
                // to ensure that this client hasn't already
70✔
396
                // received this notification in order to
70✔
397
                // ensure we don't duplicate any events.
70✔
398

70✔
399
                // TODO(joostjager): Refactor switches.
70✔
400
                state := event.invoice.State
70✔
401
                switch {
70✔
402
                // If we've already sent this settle event to
403
                // the client, then we can skip this.
404
                case state == ContractSettled &&
405
                        client.settleIndex >= invoice.SettleIndex:
×
406
                        continue
×
407

408
                // Similarly, if we've already sent this add to
409
                // the client then we can skip this one, but only if this isn't
410
                // an AMP invoice. AMP invoices always remain in the settle
411
                // state as a base invoice.
412
                case event.setID == nil && state == ContractOpen &&
413
                        client.addIndex >= invoice.AddIndex:
×
414
                        continue
×
415

416
                // These two states should never happen, but we
417
                // log them just in case so we can detect this
418
                // instance.
419
                case state == ContractOpen &&
420
                        client.addIndex+1 != invoice.AddIndex:
11✔
421
                        log.Warnf("client=%v for invoice "+
11✔
422
                                "notifications missed an update, "+
11✔
423
                                "add_index=%v, new add event index=%v",
11✔
424
                                clientID, client.addIndex,
11✔
425
                                invoice.AddIndex)
11✔
426

427
                case state == ContractSettled &&
428
                        client.settleIndex+1 != invoice.SettleIndex:
4✔
429
                        log.Warnf("client=%v for invoice "+
4✔
430
                                "notifications missed an update, "+
4✔
431
                                "settle_index=%v, new settle event index=%v",
4✔
432
                                clientID, client.settleIndex,
4✔
433
                                invoice.SettleIndex)
4✔
434
                }
435

436
                select {
70✔
437
                case <-client.backlogDelivered:
70✔
438
                        // We won't deliver any events until the backlog has
439
                        // been processed.
440
                case <-i.quit:
×
441
                        return
×
442
                }
443

444
                err := client.notify(&invoiceEvent{
70✔
445
                        invoice: invoice,
70✔
446
                        setID:   event.setID,
70✔
447
                })
70✔
448
                if err != nil {
70✔
449
                        log.Errorf("Failed dispatching to client: %v", err)
×
450
                        return
×
451
                }
×
452

453
                // Each time we send a notification to a client, we'll record
454
                // the latest add/settle index it has. We'll use this to ensure
455
                // we don't send a notification twice, which can happen if a new
456
                // event is added while we're catching up a new client.
457
                invState := event.invoice.State
70✔
458
                switch {
70✔
459
                case invState == ContractSettled:
22✔
460
                        client.settleIndex = invoice.SettleIndex
22✔
461

462
                case invState == ContractOpen && event.setID == nil:
46✔
463
                        client.addIndex = invoice.AddIndex
46✔
464

465
                // If this is an AMP invoice, then we'll need to use the set ID
466
                // to keep track of the settle index of the client. AMP
467
                // invoices never go to the open state, but if a setID is
468
                // passed, then we know it was just settled and will track the
469
                // highest settle index so far.
470
                case invState == ContractOpen && event.setID != nil:
10✔
471
                        setID := *event.setID
10✔
472
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
10✔
473

474
                default:
×
475
                        log.Errorf("unexpected invoice state: %v",
×
476
                                event.invoice.State)
×
477
                }
478
        }
479
}
480

481
// deliverBacklogEvents will attempts to query the invoice database for any
482
// notifications that the client has missed since it reconnected last.
483
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
484
        client *InvoiceSubscription) error {
49✔
485

49✔
486
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
49✔
487
        if err != nil {
49✔
488
                return err
×
489
        }
×
490

491
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
49✔
492
        if err != nil {
49✔
493
                return err
×
494
        }
×
495

496
        // If we have any to deliver, then we'll append them to the end of the
497
        // notification queue in order to catch up the client before delivering
498
        // any new notifications.
499
        for _, addEvent := range addEvents {
53✔
500
                // We re-bind the loop variable to ensure we don't hold onto
4✔
501
                // the loop reference causing is to point to the same item.
4✔
502
                addEvent := addEvent
4✔
503

4✔
504
                select {
4✔
505
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
506
                        invoice: &addEvent,
507
                }:
4✔
508
                case <-i.quit:
×
509
                        return ErrShuttingDown
×
510
                }
511
        }
512

513
        for _, settleEvent := range settleEvents {
53✔
514
                // We re-bind the loop variable to ensure we don't hold onto
4✔
515
                // the loop reference causing is to point to the same item.
4✔
516
                settleEvent := settleEvent
4✔
517

4✔
518
                select {
4✔
519
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
520
                        invoice: &settleEvent,
521
                }:
4✔
522
                case <-i.quit:
×
523
                        return ErrShuttingDown
×
524
                }
525
        }
526

527
        return nil
49✔
528
}
529

530
// deliverSingleBacklogEvents will attempt to query the invoice database to
531
// retrieve the current invoice state and deliver this to the subscriber. Single
532
// invoice subscribers will always receive the current state right after
533
// subscribing. Only in case the invoice does not yet exist, nothing is sent
534
// yet.
535
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
536
        client *SingleInvoiceSubscription) error {
22✔
537

22✔
538
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
22✔
539

22✔
540
        // It is possible that the invoice does not exist yet, but the client is
22✔
541
        // already watching it in anticipation.
22✔
542
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
22✔
543
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
22✔
544
        if isNotFound || isNotCreated {
44✔
545
                return nil
22✔
546
        }
22✔
547
        if err != nil {
4✔
548
                return err
×
549
        }
×
550

551
        payHash := client.invoiceRef.PayHash()
4✔
552
        if payHash == nil {
4✔
553
                return nil
×
554
        }
×
555

556
        err = client.notify(&invoiceEvent{
4✔
557
                hash:    *payHash,
4✔
558
                invoice: &invoice,
4✔
559
        })
4✔
560
        if err != nil {
4✔
561
                return err
×
562
        }
×
563

564
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
4✔
565
                client.id, payHash)
4✔
566

4✔
567
        return nil
4✔
568
}
569

570
// AddInvoice adds a regular invoice for the specified amount, identified by
571
// the passed preimage. Additionally, any memo or receipt data provided will
572
// also be stored on-disk. Once this invoice is added, subsystems within the
573
// daemon add/forward HTLCs are able to obtain the proper preimage required for
574
// redemption in the case that we're the final destination. We also return the
575
// addIndex of the newly created invoice which monotonically increases for each
576
// new invoice added.  A side effect of this function is that it also sets
577
// AddIndex on the invoice argument.
578
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
579
        paymentHash lntypes.Hash) (uint64, error) {
724✔
580

724✔
581
        i.Lock()
724✔
582

724✔
583
        ref := InvoiceRefByHash(paymentHash)
724✔
584
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
724✔
585

724✔
586
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
724✔
587
        if err != nil {
743✔
588
                i.Unlock()
19✔
589
                return 0, err
19✔
590
        }
19✔
591

592
        // Now that we've added the invoice, we'll send dispatch a message to
593
        // notify the clients of this new invoice.
594
        i.notifyClients(paymentHash, invoice, nil)
709✔
595
        i.Unlock()
709✔
596

709✔
597
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
709✔
598
        // to avoid deadlock when a new invoice is added while an other is being
709✔
599
        // canceled.
709✔
600
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
709✔
601
        if invoiceExpiryRef != nil {
1,418✔
602
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
709✔
603
        }
709✔
604

605
        return addIndex, nil
709✔
606
}
607

608
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
609
// then we're able to pull the funds pending within an HTLC.
610
//
611
// TODO(roasbeef): ignore if settled?
612
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
613
        rHash lntypes.Hash) (Invoice, error) {
399✔
614

399✔
615
        // We'll check the database to see if there's an existing matching
399✔
616
        // invoice.
399✔
617
        ref := InvoiceRefByHash(rHash)
399✔
618
        return i.idb.LookupInvoice(ctx, ref)
399✔
619
}
399✔
620

621
// LookupInvoiceByRef looks up an invoice by the given reference, if found
622
// then we're able to pull the funds pending within an HTLC.
623
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
624
        ref InvoiceRef) (Invoice, error) {
4✔
625

4✔
626
        return i.idb.LookupInvoice(ctx, ref)
4✔
627
}
4✔
628

629
// startHtlcTimer starts a new timer via the invoice registry main loop that
630
// cancels a single htlc on an invoice when the htlc hold duration has passed.
631
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
632
        key CircuitKey, acceptTime time.Time) error {
334✔
633

334✔
634
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
334✔
635
        event := &htlcReleaseEvent{
334✔
636
                invoiceRef:  invoiceRef,
334✔
637
                key:         key,
334✔
638
                releaseTime: releaseTime,
334✔
639
        }
334✔
640

334✔
641
        select {
334✔
642
        case i.htlcAutoReleaseChan <- event:
334✔
643
                return nil
334✔
644

645
        case <-i.quit:
×
646
                return ErrShuttingDown
×
647
        }
648
}
649

650
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
651
// a resolution result which will be used to notify subscribed links and
652
// resolvers of the details of the htlc cancellation.
653
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
654
        key CircuitKey, result FailResolutionResult) error {
12✔
655

12✔
656
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
24✔
657
                // Only allow individual htlc cancellation on open invoices.
12✔
658
                if invoice.State != ContractOpen {
18✔
659
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
6✔
660
                                "open", invoiceRef)
6✔
661

6✔
662
                        return nil, nil
6✔
663
                }
6✔
664

665
                // Lookup the current status of the htlc in the database.
666
                var (
6✔
667
                        htlcState HtlcState
6✔
668
                        setID     *SetID
6✔
669
                )
6✔
670
                htlc, ok := invoice.Htlcs[key]
6✔
671
                if !ok {
6✔
672
                        // If this is an AMP invoice, then all the HTLCs won't
×
673
                        // be read out, so we'll consult the other mapping to
×
674
                        // try to find the HTLC state in question here.
×
675
                        var found bool
×
676
                        for ampSetID, htlcSet := range invoice.AMPState {
×
677
                                ampSetID := ampSetID
×
678
                                for htlcKey := range htlcSet.InvoiceKeys {
×
679
                                        if htlcKey == key {
×
680
                                                htlcState = htlcSet.State
×
681
                                                setID = &ampSetID
×
682

×
683
                                                found = true
×
684
                                                break
×
685
                                        }
686
                                }
687
                        }
688

689
                        if !found {
×
690
                                return nil, fmt.Errorf("htlc %v not found", key)
×
691
                        }
×
692
                } else {
6✔
693
                        htlcState = htlc.State
6✔
694
                }
6✔
695

696
                // Cancellation is only possible if the htlc wasn't already
697
                // resolved.
698
                if htlcState != HtlcStateAccepted {
6✔
699
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
700
                                "is already resolved", key, invoiceRef)
×
701

×
702
                        return nil, nil
×
703
                }
×
704

705
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
6✔
706
                        key, invoiceRef)
6✔
707

6✔
708
                // Return an update descriptor that cancels htlc and keeps
6✔
709
                // invoice open.
6✔
710
                canceledHtlcs := map[CircuitKey]struct{}{
6✔
711
                        key: {},
6✔
712
                }
6✔
713

6✔
714
                return &InvoiceUpdateDesc{
6✔
715
                        UpdateType:  CancelHTLCsUpdate,
6✔
716
                        CancelHtlcs: canceledHtlcs,
6✔
717
                        SetID:       setID,
6✔
718
                }, nil
6✔
719
        }
720

721
        // Try to mark the specified htlc as canceled in the invoice database.
722
        // Intercept the update descriptor to set the local updated variable. If
723
        // no invoice update is performed, we can return early.
724
        setID := (*SetID)(invoiceRef.SetID())
12✔
725
        var updated bool
12✔
726
        invoice, err := i.idb.UpdateInvoice(
12✔
727
                context.Background(), invoiceRef, setID,
12✔
728
                func(invoice *Invoice) (
12✔
729
                        *InvoiceUpdateDesc, error) {
24✔
730

12✔
731
                        updateDesc, err := updateInvoice(invoice)
12✔
732
                        if err != nil {
12✔
733
                                return nil, err
×
734
                        }
×
735
                        updated = updateDesc != nil
12✔
736

12✔
737
                        return updateDesc, err
12✔
738
                },
739
        )
740
        if err != nil {
12✔
741
                return err
×
742
        }
×
743
        if !updated {
18✔
744
                return nil
6✔
745
        }
6✔
746

747
        // The invoice has been updated. Notify subscribers of the htlc
748
        // resolution.
749
        htlc, ok := invoice.Htlcs[key]
6✔
750
        if !ok {
6✔
751
                return fmt.Errorf("htlc %v not found", key)
×
752
        }
×
753
        if htlc.State == HtlcStateCanceled {
12✔
754
                resolution := NewFailResolution(
6✔
755
                        key, int32(htlc.AcceptHeight), result,
6✔
756
                )
6✔
757

6✔
758
                i.notifyHodlSubscribers(resolution)
6✔
759
        }
6✔
760
        return nil
6✔
761
}
762

763
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
764
// htlc.
765
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
22✔
766
        // Retrieve keysend record if present.
22✔
767
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
22✔
768
        if !ok {
26✔
769
                return nil
4✔
770
        }
4✔
771

772
        // Cancel htlc is preimage is invalid.
773
        preimage, err := lntypes.MakePreimage(preimageSlice)
22✔
774
        if err != nil {
25✔
775
                return err
3✔
776
        }
3✔
777
        if preimage.Hash() != ctx.hash {
19✔
778
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
779
                        preimage, ctx.hash)
×
780
        }
×
781

782
        // Only allow keysend for non-mpp payments.
783
        if ctx.mpp != nil {
19✔
784
                return errors.New("no mpp keysend supported")
×
785
        }
×
786

787
        // Create an invoice for the htlc amount.
788
        amt := ctx.amtPaid
19✔
789

19✔
790
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
19✔
791
        // be able to pay to it with keysend.
19✔
792
        rawFeatures := lnwire.NewRawFeatureVector(
19✔
793
                lnwire.TLVOnionPayloadRequired,
19✔
794
        )
19✔
795
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
19✔
796

19✔
797
        // Use the minimum block delta that we require for settling htlcs.
19✔
798
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
19✔
799

19✔
800
        // Pre-check expiry here to prevent inserting an invoice that will not
19✔
801
        // be settled.
19✔
802
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
19✔
803
                return errors.New("final expiry too soon")
×
804
        }
×
805

806
        // The invoice database indexes all invoices by payment address, however
807
        // legacy keysend payment do not have one. In order to avoid a new
808
        // payment type on-disk wrt. to indexing, we'll continue to insert a
809
        // blank payment address which is special cased in the insertion logic
810
        // to not be indexed. In the future, once AMP is merged, this should be
811
        // replaced by generating a random payment address on the behalf of the
812
        // sender.
813
        payAddr := BlankPayAddr
19✔
814

19✔
815
        // Create placeholder invoice.
19✔
816
        invoice := &Invoice{
19✔
817
                CreationDate: i.cfg.Clock.Now(),
19✔
818
                Terms: ContractTerm{
19✔
819
                        FinalCltvDelta:  finalCltvDelta,
19✔
820
                        Value:           amt,
19✔
821
                        PaymentPreimage: &preimage,
19✔
822
                        PaymentAddr:     payAddr,
19✔
823
                        Features:        features,
19✔
824
                },
19✔
825
        }
19✔
826

19✔
827
        if i.cfg.KeysendHoldTime != 0 {
25✔
828
                invoice.HodlInvoice = true
6✔
829
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
830
        }
6✔
831

832
        // Insert invoice into database. Ignore duplicates, because this
833
        // may be a replay.
834
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
19✔
835
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
19✔
836
                return err
×
837
        }
×
838

839
        return nil
19✔
840
}
841

842
// processAMP just-in-time inserts an invoice if this htlc is a keysend
843
// htlc.
844
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
31✔
845
        // AMP payments MUST also include an MPP record.
31✔
846
        if ctx.mpp == nil {
34✔
847
                return errors.New("no MPP record for AMP")
3✔
848
        }
3✔
849

850
        // Create an invoice for the total amount expected, provided in the MPP
851
        // record.
852
        amt := ctx.mpp.TotalMsat()
28✔
853

28✔
854
        // Set the TLV required and MPP optional features on the invoice. We'll
28✔
855
        // also make the AMP features required so that it can't be paid by
28✔
856
        // legacy or MPP htlcs.
28✔
857
        rawFeatures := lnwire.NewRawFeatureVector(
28✔
858
                lnwire.TLVOnionPayloadRequired,
28✔
859
                lnwire.PaymentAddrOptional,
28✔
860
                lnwire.AMPRequired,
28✔
861
        )
28✔
862
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
28✔
863

28✔
864
        // Use the minimum block delta that we require for settling htlcs.
28✔
865
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
28✔
866

28✔
867
        // Pre-check expiry here to prevent inserting an invoice that will not
28✔
868
        // be settled.
28✔
869
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
28✔
870
                return errors.New("final expiry too soon")
×
871
        }
×
872

873
        // We'll use the sender-generated payment address provided in the HTLC
874
        // to create our AMP invoice.
875
        payAddr := ctx.mpp.PaymentAddr()
28✔
876

28✔
877
        // Create placeholder invoice.
28✔
878
        invoice := &Invoice{
28✔
879
                CreationDate: i.cfg.Clock.Now(),
28✔
880
                Terms: ContractTerm{
28✔
881
                        FinalCltvDelta:  finalCltvDelta,
28✔
882
                        Value:           amt,
28✔
883
                        PaymentPreimage: nil,
28✔
884
                        PaymentAddr:     payAddr,
28✔
885
                        Features:        features,
28✔
886
                },
28✔
887
        }
28✔
888

28✔
889
        // Insert invoice into database. Ignore duplicates payment hashes and
28✔
890
        // payment addrs, this may be a replay or a different HTLC for the AMP
28✔
891
        // invoice.
28✔
892
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
28✔
893
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
28✔
894
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
28✔
895
        switch {
28✔
896
        case isDuplicatedInvoice || isDuplicatedPayAddr:
16✔
897
                return nil
16✔
898
        default:
16✔
899
                return err
16✔
900
        }
901
}
902

903
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
904
// describes how the htlc should be resolved.
905
//
906
// When the preimage of the invoice is not yet known (hodl invoice), this
907
// function moves the invoice to the accepted state. When SettleHoldInvoice is
908
// called later, a resolution message will be send back to the caller via the
909
// provided hodlChan. Invoice registry sends on this channel what action needs
910
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
911
// the channel is either buffered or received on from another goroutine to
912
// prevent deadlock.
913
//
914
// In the case that the htlc is part of a larger set of htlcs that pay to the
915
// same invoice (multi-path payment), the htlc is held until the set is
916
// complete. If the set doesn't fully arrive in time, a timer will cancel the
917
// held htlc.
918
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
919
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
920
        circuitKey CircuitKey, hodlChan chan<- interface{},
921
        wireCustomRecords lnwire.CustomRecords,
922
        payload Payload) (HtlcResolution, error) {
950✔
923

950✔
924
        // Create the update context containing the relevant details of the
950✔
925
        // incoming htlc.
950✔
926
        ctx := invoiceUpdateCtx{
950✔
927
                hash:                 rHash,
950✔
928
                circuitKey:           circuitKey,
950✔
929
                amtPaid:              amtPaid,
950✔
930
                expiry:               expiry,
950✔
931
                currentHeight:        currentHeight,
950✔
932
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
950✔
933
                wireCustomRecords:    wireCustomRecords,
950✔
934
                customRecords:        payload.CustomRecords(),
950✔
935
                mpp:                  payload.MultiPath(),
950✔
936
                amp:                  payload.AMPRecord(),
950✔
937
                metadata:             payload.Metadata(),
950✔
938
                pathID:               payload.PathID(),
950✔
939
                totalAmtMsat:         payload.TotalAmtMsat(),
950✔
940
        }
950✔
941

950✔
942
        switch {
950✔
943
        // If we are accepting spontaneous AMP payments and this payload
944
        // contains an AMP record, create an AMP invoice that will be settled
945
        // below.
946
        case i.cfg.AcceptAMP && ctx.amp != nil:
31✔
947
                err := i.processAMP(ctx)
31✔
948
                if err != nil {
34✔
949
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
950

3✔
951
                        return NewFailResolution(
3✔
952
                                circuitKey, currentHeight, ResultAmpError,
3✔
953
                        ), nil
3✔
954
                }
3✔
955

956
        // If we are accepting spontaneous keysend payments, create a regular
957
        // invoice that will be settled below. We also enforce that this is only
958
        // done when no AMP payload is present since it will only be settle-able
959
        // by regular HTLCs.
960
        case i.cfg.AcceptKeySend && ctx.amp == nil:
22✔
961
                err := i.processKeySend(ctx)
22✔
962
                if err != nil {
25✔
963
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
964

3✔
965
                        return NewFailResolution(
3✔
966
                                circuitKey, currentHeight, ResultKeySendError,
3✔
967
                        ), nil
3✔
968
                }
3✔
969
        }
970

971
        // Execute locked notify exit hop logic.
972
        i.Lock()
944✔
973
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
944✔
974
                &ctx, hodlChan,
944✔
975
        )
944✔
976
        i.Unlock()
944✔
977
        if err != nil {
948✔
978
                return nil, err
4✔
979
        }
4✔
980

981
        if invoiceToExpire != nil {
1,030✔
982
                i.expiryWatcher.AddInvoices(invoiceToExpire)
86✔
983
        }
86✔
984

985
        switch r := resolution.(type) {
944✔
986
        // The htlc is held. Start a timer outside the lock if the htlc should
987
        // be auto-released, because otherwise a deadlock may happen with the
988
        // main event loop.
989
        case *htlcAcceptResolution:
423✔
990
                if r.autoRelease {
757✔
991
                        var invRef InvoiceRef
334✔
992
                        if ctx.amp != nil {
350✔
993
                                invRef = InvoiceRefBySetID(*ctx.setID())
16✔
994
                        } else {
338✔
995
                                invRef = ctx.invoiceRef()
322✔
996
                        }
322✔
997

998
                        err := i.startHtlcTimer(
334✔
999
                                invRef, circuitKey, r.acceptTime,
334✔
1000
                        )
334✔
1001
                        if err != nil {
334✔
1002
                                return nil, err
×
1003
                        }
×
1004
                }
1005

1006
                // We return a nil resolution because htlc acceptances are
1007
                // represented as nil resolutions externally.
1008
                // TODO(carla) update calling code to handle accept resolutions.
1009
                return nil, nil
423✔
1010

1011
        // A direct resolution was received for this htlc.
1012
        case HtlcResolution:
525✔
1013
                return r, nil
525✔
1014

1015
        // Fail if an unknown resolution type was received.
1016
        default:
×
1017
                return nil, errors.New("invalid resolution type")
×
1018
        }
1019
}
1020

1021
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1022
// that should be executed inside the registry lock. The returned invoiceExpiry
1023
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1024
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1025
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1026
        HtlcResolution, invoiceExpiry, error) {
944✔
1027

944✔
1028
        invoiceRef := ctx.invoiceRef()
944✔
1029
        setID := (*SetID)(ctx.setID())
944✔
1030

944✔
1031
        // We need to look up the current state of the invoice in order to send
944✔
1032
        // the previously accepted/settled HTLCs to the interceptor.
944✔
1033
        existingInvoice, err := i.idb.LookupInvoice(
944✔
1034
                context.Background(), invoiceRef,
944✔
1035
        )
944✔
1036
        switch {
944✔
1037
        case errors.Is(err, ErrInvoiceNotFound) ||
1038
                errors.Is(err, ErrNoInvoicesCreated):
26✔
1039

26✔
1040
                // If the invoice was not found, return a failure resolution
26✔
1041
                // with an invoice not found result.
26✔
1042
                return NewFailResolution(
26✔
1043
                        ctx.circuitKey, ctx.currentHeight,
26✔
1044
                        ResultInvoiceNotFound,
26✔
1045
                ), nil, nil
26✔
1046

1047
        case err != nil:
×
1048
                ctx.log(err.Error())
×
1049
                return nil, nil, err
×
1050
        }
1051

1052
        var cancelSet bool
922✔
1053

922✔
1054
        // Provide the invoice to the settlement interceptor to allow
922✔
1055
        // the interceptor's client an opportunity to manipulate the
922✔
1056
        // settlement process.
922✔
1057
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
922✔
1058
                WireCustomRecords:  ctx.wireCustomRecords,
922✔
1059
                ExitHtlcCircuitKey: ctx.circuitKey,
922✔
1060
                ExitHtlcAmt:        ctx.amtPaid,
922✔
1061
                ExitHtlcExpiry:     ctx.expiry,
922✔
1062
                CurrentHeight:      uint32(ctx.currentHeight),
922✔
1063
                Invoice:            existingInvoice,
922✔
1064
        }, func(resp HtlcModifyResponse) {
926✔
1065
                log.Debugf("Received invoice HTLC interceptor response: %v",
4✔
1066
                        resp)
4✔
1067

4✔
1068
                if resp.AmountPaid != 0 {
8✔
1069
                        ctx.amtPaid = resp.AmountPaid
4✔
1070
                }
4✔
1071

1072
                cancelSet = resp.CancelSet
4✔
1073
        })
1074
        if err != nil {
926✔
1075
                err := fmt.Errorf("error during invoice HTLC interception: %w",
4✔
1076
                        err)
4✔
1077
                ctx.log(err.Error())
4✔
1078

4✔
1079
                return nil, nil, err
4✔
1080
        }
4✔
1081

1082
        // We'll attempt to settle an invoice matching this rHash on disk (if
1083
        // one exists). The callback will update the invoice state and/or htlcs.
1084
        var (
922✔
1085
                resolution        HtlcResolution
922✔
1086
                updateSubscribers bool
922✔
1087
        )
922✔
1088
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1,844✔
1089
                updateDesc, res, err := updateInvoice(ctx, inv)
922✔
1090
                if err != nil {
922✔
1091
                        return nil, err
×
1092
                }
×
1093

1094
                // Only send an update if the invoice state was changed.
1095
                updateSubscribers = updateDesc != nil &&
922✔
1096
                        updateDesc.State != nil
922✔
1097

922✔
1098
                // Assign resolution to outer scope variable.
922✔
1099
                if cancelSet {
922✔
1100
                        // If a cancel signal was set for the htlc set, we set
×
1101
                        // the resolution as a failure with an underpayment
×
1102
                        // indication. Something was wrong with this htlc, so
×
1103
                        // we probably can't settle the invoice at all.
×
1104
                        resolution = NewFailResolution(
×
1105
                                ctx.circuitKey, ctx.currentHeight,
×
1106
                                ResultAmountTooLow,
×
1107
                        )
×
1108
                } else {
922✔
1109
                        resolution = res
922✔
1110
                }
922✔
1111

1112
                return updateDesc, nil
922✔
1113
        }
1114

1115
        invoice, err := i.idb.UpdateInvoice(
922✔
1116
                context.Background(), invoiceRef, setID, callback,
922✔
1117
        )
922✔
1118

922✔
1119
        var duplicateSetIDErr ErrDuplicateSetID
922✔
1120
        if errors.As(err, &duplicateSetIDErr) {
922✔
1121
                return NewFailResolution(
×
1122
                        ctx.circuitKey, ctx.currentHeight,
×
1123
                        ResultInvoiceNotFound,
×
1124
                ), nil, nil
×
1125
        }
×
1126

1127
        switch {
922✔
1128
        case errors.Is(err, ErrInvoiceNotFound):
×
1129
                // If the invoice was not found, return a failure resolution
×
1130
                // with an invoice not found result.
×
1131
                return NewFailResolution(
×
1132
                        ctx.circuitKey, ctx.currentHeight,
×
1133
                        ResultInvoiceNotFound,
×
1134
                ), nil, nil
×
1135

1136
        case errors.Is(err, ErrInvRefEquivocation):
×
1137
                return NewFailResolution(
×
1138
                        ctx.circuitKey, ctx.currentHeight,
×
1139
                        ResultInvoiceNotFound,
×
1140
                ), nil, nil
×
1141

1142
        case err == nil:
922✔
1143

1144
        default:
×
1145
                ctx.log(err.Error())
×
1146
                return nil, nil, err
×
1147
        }
1148

1149
        var invoiceToExpire invoiceExpiry
922✔
1150

922✔
1151
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
922✔
1152

922✔
1153
        switch res := resolution.(type) {
922✔
1154
        case *HtlcFailResolution:
30✔
1155
                // Inspect latest htlc state on the invoice. If it is found,
30✔
1156
                // we will update the accept height as it was recorded in the
30✔
1157
                // invoice database (which occurs in the case where the htlc
30✔
1158
                // reached the database in a previous call). If the htlc was
30✔
1159
                // not found on the invoice, it was immediately failed so we
30✔
1160
                // send the failure resolution as is, which has the current
30✔
1161
                // height set as the accept height.
30✔
1162
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
30✔
1163
                if ok {
37✔
1164
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
7✔
1165
                }
7✔
1166

1167
                ctx.log(fmt.Sprintf("failure resolution result "+
30✔
1168
                        "outcome: %v, at accept height: %v",
30✔
1169
                        res.Outcome, res.AcceptHeight))
30✔
1170

30✔
1171
                // Some failures apply to the entire HTLC set. Break here if
30✔
1172
                // this isn't one of them.
30✔
1173
                if !res.Outcome.IsSetFailure() {
54✔
1174
                        break
24✔
1175
                }
1176

1177
                // Also cancel any HTLCs in the HTLC set that are also in the
1178
                // canceled state with the same failure result.
1179
                setID := ctx.setID()
6✔
1180
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
6✔
1181
                for key, htlc := range canceledHtlcSet {
12✔
1182
                        htlcFailResolution := NewFailResolution(
6✔
1183
                                key, int32(htlc.AcceptHeight), res.Outcome,
6✔
1184
                        )
6✔
1185

6✔
1186
                        i.notifyHodlSubscribers(htlcFailResolution)
6✔
1187
                }
6✔
1188

1189
        // If the htlc was settled, we will settle any previously accepted
1190
        // htlcs and notify our peer to settle them.
1191
        case *HtlcSettleResolution:
477✔
1192
                ctx.log(fmt.Sprintf("settle resolution result "+
477✔
1193
                        "outcome: %v, at accept height: %v",
477✔
1194
                        res.Outcome, res.AcceptHeight))
477✔
1195

477✔
1196
                // Also settle any previously accepted htlcs. If a htlc is
477✔
1197
                // marked as settled, we should follow now and settle the htlc
477✔
1198
                // with our peer.
477✔
1199
                setID := ctx.setID()
477✔
1200
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
477✔
1201
                for key, htlc := range settledHtlcSet {
1,266✔
1202
                        preimage := res.Preimage
789✔
1203
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
805✔
1204
                                preimage = *htlc.AMP.Preimage
16✔
1205
                        }
16✔
1206

1207
                        // Notify subscribers that the htlcs should be settled
1208
                        // with our peer. Note that the outcome of the
1209
                        // resolution is set based on the outcome of the single
1210
                        // htlc that we just settled, so may not be accurate
1211
                        // for all htlcs.
1212
                        htlcSettleResolution := NewSettleResolution(
789✔
1213
                                preimage, key,
789✔
1214
                                int32(htlc.AcceptHeight), res.Outcome,
789✔
1215
                        )
789✔
1216

789✔
1217
                        // Notify subscribers that the htlc should be settled
789✔
1218
                        // with our peer.
789✔
1219
                        i.notifyHodlSubscribers(htlcSettleResolution)
789✔
1220
                }
1221

1222
                // If concurrent payments were attempted to this invoice before
1223
                // the current one was ultimately settled, cancel back any of
1224
                // the HTLCs immediately. As a result of the settle, the HTLCs
1225
                // in other HTLC sets are automatically converted to a canceled
1226
                // state when updating the invoice.
1227
                //
1228
                // TODO(roasbeef): can remove now??
1229
                canceledHtlcSet := invoice.HTLCSetCompliment(
477✔
1230
                        setID, HtlcStateCanceled,
477✔
1231
                )
477✔
1232
                for key, htlc := range canceledHtlcSet {
477✔
1233
                        htlcFailResolution := NewFailResolution(
×
1234
                                key, int32(htlc.AcceptHeight),
×
1235
                                ResultInvoiceAlreadySettled,
×
1236
                        )
×
1237

×
1238
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1239
                }
×
1240

1241
        // If we accepted the htlc, subscribe to the hodl invoice and return
1242
        // an accept resolution with the htlc's accept time on it.
1243
        case *htlcAcceptResolution:
423✔
1244
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
423✔
1245
                if !ok {
423✔
1246
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1247
                                " present on invoice: %x", ctx.circuitKey,
×
1248
                                ctx.hash[:])
×
1249
                }
×
1250

1251
                // Determine accepted height of this htlc. If the htlc reached
1252
                // the invoice database (possibly in a previous call to the
1253
                // invoice registry), we'll take the original accepted height
1254
                // as it was recorded in the database.
1255
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
423✔
1256

423✔
1257
                ctx.log(fmt.Sprintf("accept resolution result "+
423✔
1258
                        "outcome: %v, at accept height: %v",
423✔
1259
                        res.outcome, acceptHeight))
423✔
1260

423✔
1261
                // Auto-release the htlc if the invoice is still open. It can
423✔
1262
                // only happen for mpp payments that there are htlcs in state
423✔
1263
                // Accepted while the invoice is Open.
423✔
1264
                if invoice.State == ContractOpen {
757✔
1265
                        res.acceptTime = invoiceHtlc.AcceptTime
334✔
1266
                        res.autoRelease = true
334✔
1267
                }
334✔
1268

1269
                // If we have fully accepted the set of htlcs for this invoice,
1270
                // we can now add it to our invoice expiry watcher. We do not
1271
                // add invoices before they are fully accepted, because it is
1272
                // possible that we MppTimeout the htlcs, and then our relevant
1273
                // expiry height could change.
1274
                if res.outcome == resultAccepted {
509✔
1275
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
86✔
1276
                }
86✔
1277

1278
                i.hodlSubscribe(hodlChan, ctx.circuitKey)
423✔
1279

1280
        default:
×
1281
                panic("unknown action")
×
1282
        }
1283

1284
        // Now that the links have been notified of any state changes to their
1285
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1286
        // state changes.
1287
        if updateSubscribers {
1,478✔
1288
                // We'll add a setID onto the notification, but only if this is
556✔
1289
                // an AMP invoice being settled.
556✔
1290
                var setID *[32]byte
556✔
1291
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,024✔
1292
                        setID = ctx.setID()
468✔
1293
                }
468✔
1294

1295
                i.notifyClients(ctx.hash, invoice, setID)
556✔
1296
        }
1297

1298
        return resolution, invoiceToExpire, nil
922✔
1299
}
1300

1301
// SettleHodlInvoice sets the preimage of a hodl invoice.
1302
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1303
        preimage lntypes.Preimage) error {
73✔
1304

73✔
1305
        i.Lock()
73✔
1306
        defer i.Unlock()
73✔
1307

73✔
1308
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
146✔
1309
                switch invoice.State {
73✔
1310
                case ContractOpen:
×
1311
                        return nil, ErrInvoiceStillOpen
×
1312

1313
                case ContractCanceled:
×
1314
                        return nil, ErrInvoiceAlreadyCanceled
×
1315

1316
                case ContractSettled:
3✔
1317
                        return nil, ErrInvoiceAlreadySettled
3✔
1318
                }
1319

1320
                return &InvoiceUpdateDesc{
70✔
1321
                        UpdateType: SettleHodlInvoiceUpdate,
70✔
1322
                        State: &InvoiceStateUpdateDesc{
70✔
1323
                                NewState: ContractSettled,
70✔
1324
                                Preimage: &preimage,
70✔
1325
                        },
70✔
1326
                }, nil
70✔
1327
        }
1328

1329
        hash := preimage.Hash()
73✔
1330
        invoiceRef := InvoiceRefByHash(hash)
73✔
1331
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
73✔
1332
        if err != nil {
76✔
1333
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1334
                        preimage, err)
3✔
1335

3✔
1336
                return err
3✔
1337
        }
3✔
1338

1339
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
70✔
1340
                invoice.Terms.PaymentPreimage)
70✔
1341

70✔
1342
        // In the callback, we marked the invoice as settled. UpdateInvoice will
70✔
1343
        // have seen this and should have moved all htlcs that were accepted to
70✔
1344
        // the settled state. In the loop below, we go through all of these and
70✔
1345
        // notify links and resolvers that are waiting for resolution. Any htlcs
70✔
1346
        // that were already settled before, will be notified again. This isn't
70✔
1347
        // necessary but doesn't hurt either.
70✔
1348
        for key, htlc := range invoice.Htlcs {
143✔
1349
                if htlc.State != HtlcStateSettled {
73✔
1350
                        continue
×
1351
                }
1352

1353
                resolution := NewSettleResolution(
73✔
1354
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
73✔
1355
                )
73✔
1356

73✔
1357
                i.notifyHodlSubscribers(resolution)
73✔
1358
        }
1359
        i.notifyClients(hash, invoice, nil)
70✔
1360

70✔
1361
        return nil
70✔
1362
}
1363

1364
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1365
// payment hash.
1366
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1367
        payHash lntypes.Hash) error {
33✔
1368

33✔
1369
        return i.cancelInvoiceImpl(ctx, payHash, true)
33✔
1370
}
33✔
1371

1372
// shouldCancel examines the state of an invoice and whether we want to
1373
// cancel already accepted invoices, taking our force cancel boolean into
1374
// account. This is pulled out into its own function so that tests that mock
1375
// cancelInvoiceImpl can reuse this logic.
1376
func shouldCancel(state ContractState, cancelAccepted bool) bool {
97✔
1377
        if state != ContractAccepted {
166✔
1378
                return true
69✔
1379
        }
69✔
1380

1381
        // If the invoice is accepted, we should only cancel if we want to
1382
        // force cancellation of accepted invoices.
1383
        return cancelAccepted
32✔
1384
}
1385

1386
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1387
// payment hash. Accepted invoices will only be canceled if explicitly
1388
// requested to do so. It notifies subscribing links and resolvers that
1389
// the associated htlcs were canceled if they change state.
1390
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1391
        payHash lntypes.Hash, cancelAccepted bool) error {
106✔
1392

106✔
1393
        i.Lock()
106✔
1394
        defer i.Unlock()
106✔
1395

106✔
1396
        ref := InvoiceRefByHash(payHash)
106✔
1397
        log.Debugf("Invoice%v: canceling invoice", ref)
106✔
1398

106✔
1399
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
203✔
1400
                if !shouldCancel(invoice.State, cancelAccepted) {
109✔
1401
                        return nil, nil
12✔
1402
                }
12✔
1403

1404
                // Move invoice to the canceled state. Rely on validation in
1405
                // channeldb to return an error if the invoice is already
1406
                // settled or canceled.
1407
                return &InvoiceUpdateDesc{
85✔
1408
                        UpdateType: CancelInvoiceUpdate,
85✔
1409
                        State: &InvoiceStateUpdateDesc{
85✔
1410
                                NewState: ContractCanceled,
85✔
1411
                        },
85✔
1412
                }, nil
85✔
1413
        }
1414

1415
        invoiceRef := InvoiceRefByHash(payHash)
106✔
1416
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
106✔
1417

106✔
1418
        // Implement idempotency by returning success if the invoice was already
106✔
1419
        // canceled.
106✔
1420
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
109✔
1421
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1422
                return nil
3✔
1423
        }
3✔
1424
        if err != nil {
126✔
1425
                return err
23✔
1426
        }
23✔
1427

1428
        // Return without cancellation if the invoice state is ContractAccepted.
1429
        if invoice.State == ContractAccepted {
96✔
1430
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1431
                        "explicitly requested.", ref)
12✔
1432
                return nil
12✔
1433
        }
12✔
1434

1435
        log.Debugf("Invoice%v: canceled", ref)
72✔
1436

72✔
1437
        // In the callback, some htlcs may have been moved to the canceled
72✔
1438
        // state. We now go through all of these and notify links and resolvers
72✔
1439
        // that are waiting for resolution. Any htlcs that were already canceled
72✔
1440
        // before, will be notified again. This isn't necessary but doesn't hurt
72✔
1441
        // either.
72✔
1442
        for key, htlc := range invoice.Htlcs {
101✔
1443
                if htlc.State != HtlcStateCanceled {
29✔
1444
                        continue
×
1445
                }
1446

1447
                i.notifyHodlSubscribers(
29✔
1448
                        NewFailResolution(
29✔
1449
                                key, int32(htlc.AcceptHeight), ResultCanceled,
29✔
1450
                        ),
29✔
1451
                )
29✔
1452
        }
1453
        i.notifyClients(payHash, invoice, nil)
72✔
1454

72✔
1455
        // Attempt to also delete the invoice if requested through the registry
72✔
1456
        // config.
72✔
1457
        if i.cfg.GcCanceledInvoicesOnTheFly {
75✔
1458
                // Assemble the delete reference and attempt to delete through
3✔
1459
                // the invocice from the DB.
3✔
1460
                deleteRef := InvoiceDeleteRef{
3✔
1461
                        PayHash:     payHash,
3✔
1462
                        AddIndex:    invoice.AddIndex,
3✔
1463
                        SettleIndex: invoice.SettleIndex,
3✔
1464
                }
3✔
1465
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1466
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1467
                }
×
1468

1469
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1470
                // If by any chance deletion failed, then log it instead of
3✔
1471
                // returning the error, as the invoice itself has already been
3✔
1472
                // canceled.
3✔
1473
                if err != nil {
3✔
1474
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1475
                                err)
×
1476
                }
×
1477
        }
1478

1479
        return nil
72✔
1480
}
1481

1482
// notifyClients notifies all currently registered invoice notification clients
1483
// of a newly added/settled invoice.
1484
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1485
        invoice *Invoice, setID *[32]byte) {
1,395✔
1486

1,395✔
1487
        event := &invoiceEvent{
1,395✔
1488
                invoice: invoice,
1,395✔
1489
                hash:    hash,
1,395✔
1490
                setID:   setID,
1,395✔
1491
        }
1,395✔
1492

1,395✔
1493
        select {
1,395✔
1494
        case i.invoiceEvents <- event:
1,395✔
1495
        case <-i.quit:
×
1496
        }
1497
}
1498

1499
// invoiceSubscriptionKit defines that are common to both all invoice
1500
// subscribers and single invoice subscribers.
1501
type invoiceSubscriptionKit struct {
1502
        id uint32 // nolint:structcheck
1503

1504
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1505
        quit chan struct{}
1506

1507
        ntfnQueue *queue.ConcurrentQueue
1508

1509
        canceled   uint32 // To be used atomically.
1510
        cancelChan chan struct{}
1511

1512
        // backlogDelivered is closed when the backlog events have been
1513
        // delivered.
1514
        backlogDelivered chan struct{}
1515
}
1516

1517
// InvoiceSubscription represents an intent to receive updates for newly added
1518
// or settled invoices. For each newly added invoice, a copy of the invoice
1519
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1520
// invoice, a copy of the invoice will be sent over the SettledInvoices
1521
// channel.
1522
type InvoiceSubscription struct {
1523
        invoiceSubscriptionKit
1524

1525
        // NewInvoices is a channel that we'll use to send all newly created
1526
        // invoices with an invoice index greater than the specified
1527
        // StartingInvoiceIndex field.
1528
        NewInvoices chan *Invoice
1529

1530
        // SettledInvoices is a channel that we'll use to send all settled
1531
        // invoices with an invoices index greater than the specified
1532
        // StartingInvoiceIndex field.
1533
        SettledInvoices chan *Invoice
1534

1535
        // addIndex is the highest add index the caller knows of. We'll use
1536
        // this information to send out an event backlog to the notifications
1537
        // subscriber. Any new add events with an index greater than this will
1538
        // be dispatched before any new notifications are sent out.
1539
        addIndex uint64
1540

1541
        // settleIndex is the highest settle index the caller knows of. We'll
1542
        // use this information to send out an event backlog to the
1543
        // notifications subscriber. Any new settle events with an index
1544
        // greater than this will be dispatched before any new notifications
1545
        // are sent out.
1546
        settleIndex uint64
1547
}
1548

1549
// SingleInvoiceSubscription represents an intent to receive updates for a
1550
// specific invoice.
1551
type SingleInvoiceSubscription struct {
1552
        invoiceSubscriptionKit
1553

1554
        invoiceRef InvoiceRef
1555

1556
        // Updates is a channel that we'll use to send all invoice events for
1557
        // the invoice that is subscribed to.
1558
        Updates chan *Invoice
1559
}
1560

1561
// PayHash returns the optional payment hash of the target invoice.
1562
//
1563
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1564
// be deleted as soon as invoiceregistery_test is in the same module.
1565
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1566
        return s.invoiceRef.PayHash()
18✔
1567
}
18✔
1568

1569
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1570
// resources.
1571
func (i *invoiceSubscriptionKit) Cancel() {
67✔
1572
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
67✔
1573
                return
×
1574
        }
×
1575

1576
        i.ntfnQueue.Stop()
67✔
1577
        close(i.cancelChan)
67✔
1578
}
1579

1580
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
106✔
1581
        select {
106✔
1582
        case i.ntfnQueue.ChanIn() <- event:
106✔
1583

1584
        case <-i.cancelChan:
×
1585
                // This can only be triggered by delivery of non-backlog
×
1586
                // events.
×
1587
                return ErrShuttingDown
×
1588
        case <-i.quit:
×
1589
                return ErrShuttingDown
×
1590
        }
1591

1592
        return nil
106✔
1593
}
1594

1595
// SubscribeNotifications returns an InvoiceSubscription which allows the
1596
// caller to receive async notifications when any invoices are settled or
1597
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1598
// by first sending out all new events with an invoice index _greater_ than
1599
// this value. Afterwards, we'll send out real-time notifications.
1600
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1601
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
49✔
1602

49✔
1603
        client := &InvoiceSubscription{
49✔
1604
                NewInvoices:     make(chan *Invoice),
49✔
1605
                SettledInvoices: make(chan *Invoice),
49✔
1606
                addIndex:        addIndex,
49✔
1607
                settleIndex:     settleIndex,
49✔
1608
                invoiceSubscriptionKit: invoiceSubscriptionKit{
49✔
1609
                        quit:             i.quit,
49✔
1610
                        ntfnQueue:        queue.NewConcurrentQueue(20),
49✔
1611
                        cancelChan:       make(chan struct{}),
49✔
1612
                        backlogDelivered: make(chan struct{}),
49✔
1613
                },
49✔
1614
        }
49✔
1615
        client.ntfnQueue.Start()
49✔
1616

49✔
1617
        // This notifies other goroutines that the backlog phase is over.
49✔
1618
        defer close(client.backlogDelivered)
49✔
1619

49✔
1620
        // Always increment by 1 first, and our client ID will start with 1,
49✔
1621
        // not 0.
49✔
1622
        client.id = atomic.AddUint32(&i.nextClientID, 1)
49✔
1623

49✔
1624
        // Before we register this new invoice subscription, we'll launch a new
49✔
1625
        // goroutine that will proxy all notifications appended to the end of
49✔
1626
        // the concurrent queue to the two client-side channels the caller will
49✔
1627
        // feed off of.
49✔
1628
        i.wg.Add(1)
49✔
1629
        go func() {
98✔
1630
                defer i.wg.Done()
49✔
1631
                defer i.deleteClient(client.id)
49✔
1632

49✔
1633
                for {
164✔
1634
                        select {
115✔
1635
                        // A new invoice event has been sent by the
1636
                        // invoiceRegistry! We'll figure out if this is an add
1637
                        // event or a settle event, then dispatch the event to
1638
                        // the client.
1639
                        case ntfn := <-client.ntfnQueue.ChanOut():
70✔
1640
                                invoiceEvent := ntfn.(*invoiceEvent)
70✔
1641

70✔
1642
                                var targetChan chan *Invoice
70✔
1643
                                state := invoiceEvent.invoice.State
70✔
1644
                                switch {
70✔
1645
                                // AMP invoices never move to settled, but will
1646
                                // be sent with a set ID if an HTLC set is
1647
                                // being settled.
1648
                                case state == ContractOpen &&
1649
                                        invoiceEvent.setID != nil:
10✔
1650
                                        fallthrough
10✔
1651

1652
                                case state == ContractSettled:
28✔
1653
                                        targetChan = client.SettledInvoices
28✔
1654

1655
                                case state == ContractOpen:
46✔
1656
                                        targetChan = client.NewInvoices
46✔
1657

1658
                                default:
×
1659
                                        log.Errorf("unknown invoice state: %v",
×
1660
                                                state)
×
1661

×
1662
                                        continue
×
1663
                                }
1664

1665
                                select {
70✔
1666
                                case targetChan <- invoiceEvent.invoice:
70✔
1667

1668
                                case <-client.cancelChan:
×
1669
                                        return
×
1670

1671
                                case <-i.quit:
×
1672
                                        return
×
1673
                                }
1674

1675
                        case <-client.cancelChan:
49✔
1676
                                return
49✔
1677

1678
                        case <-i.quit:
×
1679
                                return
×
1680
                        }
1681
                }
1682
        }()
1683

1684
        i.notificationClientMux.Lock()
49✔
1685
        i.notificationClients[client.id] = client
49✔
1686
        i.notificationClientMux.Unlock()
49✔
1687

49✔
1688
        // Query the database to see if based on the provided addIndex and
49✔
1689
        // settledIndex we need to deliver any backlog notifications.
49✔
1690
        err := i.deliverBacklogEvents(ctx, client)
49✔
1691
        if err != nil {
49✔
1692
                return nil, err
×
1693
        }
×
1694

1695
        log.Infof("New invoice subscription client: id=%v", client.id)
49✔
1696

49✔
1697
        return client, nil
49✔
1698
}
1699

1700
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1701
// caller to receive async notifications for a specific invoice.
1702
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1703
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
22✔
1704

22✔
1705
        client := &SingleInvoiceSubscription{
22✔
1706
                Updates: make(chan *Invoice),
22✔
1707
                invoiceSubscriptionKit: invoiceSubscriptionKit{
22✔
1708
                        quit:             i.quit,
22✔
1709
                        ntfnQueue:        queue.NewConcurrentQueue(20),
22✔
1710
                        cancelChan:       make(chan struct{}),
22✔
1711
                        backlogDelivered: make(chan struct{}),
22✔
1712
                },
22✔
1713
                invoiceRef: InvoiceRefByHash(hash),
22✔
1714
        }
22✔
1715
        client.ntfnQueue.Start()
22✔
1716

22✔
1717
        // This notifies other goroutines that the backlog phase is done.
22✔
1718
        defer close(client.backlogDelivered)
22✔
1719

22✔
1720
        // Always increment by 1 first, and our client ID will start with 1,
22✔
1721
        // not 0.
22✔
1722
        client.id = atomic.AddUint32(&i.nextClientID, 1)
22✔
1723

22✔
1724
        // Before we register this new invoice subscription, we'll launch a new
22✔
1725
        // goroutine that will proxy all notifications appended to the end of
22✔
1726
        // the concurrent queue to the two client-side channels the caller will
22✔
1727
        // feed off of.
22✔
1728
        i.wg.Add(1)
22✔
1729
        go func() {
44✔
1730
                defer i.wg.Done()
22✔
1731
                defer i.deleteClient(client.id)
22✔
1732

22✔
1733
                for {
80✔
1734
                        select {
58✔
1735
                        // A new invoice event has been sent by the
1736
                        // invoiceRegistry. We will dispatch the event to the
1737
                        // client.
1738
                        case ntfn := <-client.ntfnQueue.ChanOut():
40✔
1739
                                invoiceEvent := ntfn.(*invoiceEvent)
40✔
1740

40✔
1741
                                select {
40✔
1742
                                case client.Updates <- invoiceEvent.invoice:
40✔
1743

1744
                                case <-client.cancelChan:
×
1745
                                        return
×
1746

1747
                                case <-i.quit:
×
1748
                                        return
×
1749
                                }
1750

1751
                        case <-client.cancelChan:
22✔
1752
                                return
22✔
1753

1754
                        case <-i.quit:
×
1755
                                return
×
1756
                        }
1757
                }
1758
        }()
1759

1760
        i.notificationClientMux.Lock()
22✔
1761
        i.singleNotificationClients[client.id] = client
22✔
1762
        i.notificationClientMux.Unlock()
22✔
1763

22✔
1764
        err := i.deliverSingleBacklogEvents(ctx, client)
22✔
1765
        if err != nil {
22✔
1766
                return nil, err
×
1767
        }
×
1768

1769
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
22✔
1770
                client.id, client.invoiceRef)
22✔
1771

22✔
1772
        return client, nil
22✔
1773
}
1774

1775
// notifyHodlSubscribers sends out the htlc resolution to all current
1776
// subscribers.
1777
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
895✔
1778
        i.hodlSubscriptionsMux.Lock()
895✔
1779
        defer i.hodlSubscriptionsMux.Unlock()
895✔
1780

895✔
1781
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
895✔
1782
        if !ok {
1,378✔
1783
                return
483✔
1784
        }
483✔
1785

1786
        // Notify all interested subscribers and remove subscription from both
1787
        // maps. The subscription can be removed as there only ever will be a
1788
        // single resolution for each hash.
1789
        for subscriber := range subscribers {
832✔
1790
                select {
416✔
1791
                case subscriber <- htlcResolution:
416✔
1792
                case <-i.quit:
×
1793
                        return
×
1794
                }
1795

1796
                delete(
416✔
1797
                        i.hodlReverseSubscriptions[subscriber],
416✔
1798
                        htlcResolution.CircuitKey(),
416✔
1799
                )
416✔
1800
        }
1801

1802
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
416✔
1803
}
1804

1805
// hodlSubscribe adds a new invoice subscription.
1806
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1807
        circuitKey CircuitKey) {
423✔
1808

423✔
1809
        i.hodlSubscriptionsMux.Lock()
423✔
1810
        defer i.hodlSubscriptionsMux.Unlock()
423✔
1811

423✔
1812
        log.Debugf("Hodl subscribe for %v", circuitKey)
423✔
1813

423✔
1814
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
423✔
1815
        if !ok {
839✔
1816
                subscriptions = make(map[chan<- interface{}]struct{})
416✔
1817
                i.hodlSubscriptions[circuitKey] = subscriptions
416✔
1818
        }
416✔
1819
        subscriptions[subscriber] = struct{}{}
423✔
1820

423✔
1821
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
423✔
1822
        if !ok {
784✔
1823
                reverseSubscriptions = make(map[CircuitKey]struct{})
361✔
1824
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
361✔
1825
        }
361✔
1826
        reverseSubscriptions[circuitKey] = struct{}{}
423✔
1827
}
1828

1829
// HodlUnsubscribeAll cancels the subscription.
1830
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
206✔
1831
        i.hodlSubscriptionsMux.Lock()
206✔
1832
        defer i.hodlSubscriptionsMux.Unlock()
206✔
1833

206✔
1834
        hashes := i.hodlReverseSubscriptions[subscriber]
206✔
1835
        for hash := range hashes {
211✔
1836
                delete(i.hodlSubscriptions[hash], subscriber)
5✔
1837
        }
5✔
1838

1839
        delete(i.hodlReverseSubscriptions, subscriber)
206✔
1840
}
1841

1842
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1843
// useful when we need to iterate the map to send notifications.
1844
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
1,394✔
1845
        i.notificationClientMux.RLock()
1,394✔
1846
        defer i.notificationClientMux.RUnlock()
1,394✔
1847

1,394✔
1848
        clients := make(map[uint32]*SingleInvoiceSubscription)
1,394✔
1849
        for k, v := range i.singleNotificationClients {
1,434✔
1850
                clients[k] = v
40✔
1851
        }
40✔
1852
        return clients
1,394✔
1853
}
1854

1855
// copyClients copies i.notificationClients inside a lock. This is useful when
1856
// we need to iterate the map to send notifications.
1857
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
1,238✔
1858
        i.notificationClientMux.RLock()
1,238✔
1859
        defer i.notificationClientMux.RUnlock()
1,238✔
1860

1,238✔
1861
        clients := make(map[uint32]*InvoiceSubscription)
1,238✔
1862
        for k, v := range i.notificationClients {
1,308✔
1863
                clients[k] = v
70✔
1864
        }
70✔
1865
        return clients
1,238✔
1866
}
1867

1868
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1869
// not found.
1870
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
67✔
1871
        i.notificationClientMux.Lock()
67✔
1872
        defer i.notificationClientMux.Unlock()
67✔
1873

67✔
1874
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
67✔
1875
        delete(i.notificationClients, clientID)
67✔
1876
        delete(i.singleNotificationClients, clientID)
67✔
1877
}
67✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc