• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.48
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77

78
        // HtlcInterceptor is an interface that allows the invoice registry to
79
        // let clients intercept invoices before they are settled.
80
        HtlcInterceptor HtlcInterceptor
81
}
82

83
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
84
// mpp htlcs for which the complete set didn't arrive in time.
85
type htlcReleaseEvent struct {
86
        // invoiceRef identifiers the invoice this htlc belongs to.
87
        invoiceRef InvoiceRef
88

89
        // key is the circuit key of the htlc to release.
90
        key CircuitKey
91

92
        // releaseTime is the time at which to release the htlc.
93
        releaseTime time.Time
94
}
95

96
// Less is used to order PriorityQueueItem's by their release time such that
97
// items with the older release time are at the top of the queue.
98
//
99
// NOTE: Part of the queue.PriorityQueueItem interface.
100
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
6✔
101
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
6✔
102
}
6✔
103

104
// InvoiceRegistry is a central registry of all the outstanding invoices
105
// created by the daemon. The registry is a thin wrapper around a map in order
106
// to ensure that all updates/reads are thread safe.
107
type InvoiceRegistry struct {
108
        started atomic.Bool
109
        stopped atomic.Bool
110

111
        sync.RWMutex
112

113
        nextClientID uint32 // must be used atomically
114

115
        idb InvoiceDB
116

117
        // cfg contains the registry's configuration parameters.
118
        cfg *RegistryConfig
119

120
        // notificationClientMux locks notificationClients and
121
        // singleNotificationClients. Using a separate mutex for these maps is
122
        // necessary to avoid deadlocks in the registry when processing invoice
123
        // events.
124
        notificationClientMux sync.RWMutex
125

126
        notificationClients map[uint32]*InvoiceSubscription
127

128
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
129
        // performance.
130
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
131

132
        // invoiceEvents is a single channel over which invoice updates are
133
        // carried.
134
        invoiceEvents chan *invoiceEvent
135

136
        // hodlSubscriptionsMux locks the hodlSubscriptions and
137
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
138
        // necessary to avoid deadlocks in the registry when processing invoice
139
        // events.
140
        hodlSubscriptionsMux sync.RWMutex
141

142
        // hodlSubscriptions is a map from a circuit key to a list of
143
        // subscribers. It is used for efficient notification of links.
144
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
145

146
        // reverseSubscriptions tracks circuit keys subscribed to per
147
        // subscriber. This is used to unsubscribe from all hashes efficiently.
148
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
149

150
        // htlcAutoReleaseChan contains the new htlcs that need to be
151
        // auto-released.
152
        htlcAutoReleaseChan chan *htlcReleaseEvent
153

154
        expiryWatcher *InvoiceExpiryWatcher
155

156
        wg   sync.WaitGroup
157
        quit chan struct{}
158
}
159

160
// NewRegistry creates a new invoice registry. The invoice registry
161
// wraps the persistent on-disk invoice storage with an additional in-memory
162
// layer. The in-memory layer is in place such that debug invoices can be added
163
// which are volatile yet available system wide within the daemon.
164
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
165
        cfg *RegistryConfig) *InvoiceRegistry {
636✔
166

636✔
167
        notificationClients := make(map[uint32]*InvoiceSubscription)
636✔
168
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
636✔
169
        return &InvoiceRegistry{
636✔
170
                idb:                       idb,
636✔
171
                notificationClients:       notificationClients,
636✔
172
                singleNotificationClients: singleNotificationClients,
636✔
173
                invoiceEvents:             make(chan *invoiceEvent, 100),
636✔
174
                hodlSubscriptions: make(
636✔
175
                        map[CircuitKey]map[chan<- interface{}]struct{},
636✔
176
                ),
636✔
177
                hodlReverseSubscriptions: make(
636✔
178
                        map[chan<- interface{}]map[CircuitKey]struct{},
636✔
179
                ),
636✔
180
                cfg:                 cfg,
636✔
181
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
636✔
182
                expiryWatcher:       expiryWatcher,
636✔
183
                quit:                make(chan struct{}),
636✔
184
        }
636✔
185
}
636✔
186

187
// scanInvoicesOnStart will scan all invoices on start and add active invoices
188
// to the invoice expiry watcher while also attempting to delete all canceled
189
// invoices.
190
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
636✔
191
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
636✔
192
        if err != nil {
636✔
193
                return err
×
194
        }
×
195

196
        var pending []invoiceExpiry
636✔
197
        for paymentHash, invoice := range pendingInvoices {
666✔
198
                invoice := invoice
30✔
199
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
30✔
200
                if expiryRef != nil {
60✔
201
                        pending = append(pending, expiryRef)
30✔
202
                }
30✔
203
        }
204

205
        log.Debugf("Adding %d pending invoices to the expiry watcher",
636✔
206
                len(pending))
636✔
207
        i.expiryWatcher.AddInvoices(pending...)
636✔
208

636✔
209
        if i.cfg.GcCanceledInvoicesOnStartup {
639✔
210
                log.Infof("Deleting canceled invoices")
3✔
211
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
212
                if err != nil {
3✔
213
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
214
                        return err
×
215
                }
×
216
        }
217

218
        return nil
636✔
219
}
220

221
// Start starts the registry and all goroutines it needs to carry out its task.
222
func (i *InvoiceRegistry) Start() error {
636✔
223
        var err error
636✔
224

636✔
225
        log.Info("InvoiceRegistry starting...")
636✔
226

636✔
227
        if i.started.Swap(true) {
636✔
228
                return fmt.Errorf("InvoiceRegistry started more than once")
×
229
        }
×
230
        // Start InvoiceExpiryWatcher and prepopulate it with existing
231
        // active invoices.
232
        err = i.expiryWatcher.Start(
636✔
233
                func(hash lntypes.Hash, force bool) error {
710✔
234
                        return i.cancelInvoiceImpl(
74✔
235
                                context.Background(), hash, force,
74✔
236
                        )
74✔
237
                })
74✔
238
        if err != nil {
636✔
239
                return err
×
240
        }
×
241

242
        i.wg.Add(1)
636✔
243
        go i.invoiceEventLoop()
636✔
244

636✔
245
        // Now scan all pending and removable invoices to the expiry
636✔
246
        // watcher or delete them.
636✔
247
        err = i.scanInvoicesOnStart(context.Background())
636✔
248
        if err != nil {
636✔
249
                _ = i.Stop()
×
250
        }
×
251

252
        log.Debug("InvoiceRegistry started")
636✔
253

636✔
254
        return err
636✔
255
}
256

257
// Stop signals the registry for a graceful shutdown.
258
func (i *InvoiceRegistry) Stop() error {
375✔
259
        log.Info("InvoiceRegistry shutting down...")
375✔
260

375✔
261
        if i.stopped.Swap(true) {
375✔
262
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
263
        }
×
264

265
        log.Info("InvoiceRegistry shutting down...")
375✔
266
        defer log.Debug("InvoiceRegistry shutdown complete")
375✔
267

375✔
268
        var err error
375✔
269
        if i.expiryWatcher == nil {
375✔
270
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
271
                        "initialized")
×
272
        } else {
375✔
273
                i.expiryWatcher.Stop()
375✔
274
        }
375✔
275

276
        close(i.quit)
375✔
277

375✔
278
        i.wg.Wait()
375✔
279

375✔
280
        log.Debug("InvoiceRegistry shutdown complete")
375✔
281

375✔
282
        return err
375✔
283
}
284

285
// invoiceEvent represents a new event that has modified on invoice on disk.
286
// Only two event types are currently supported: newly created invoices, and
287
// instance where invoices are settled.
288
type invoiceEvent struct {
289
        hash    lntypes.Hash
290
        invoice *Invoice
291
        setID   *[32]byte
292
}
293

294
// tickAt returns a channel that ticks at the specified time. If the time has
295
// already passed, it will tick immediately.
296
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
651✔
297
        now := i.cfg.Clock.Now()
651✔
298
        return i.cfg.Clock.TickAfter(t.Sub(now))
651✔
299
}
651✔
300

301
// invoiceEventLoop is the dedicated goroutine responsible for accepting
302
// new notification subscriptions, cancelling old subscriptions, and
303
// dispatching new invoice events.
304
func (i *InvoiceRegistry) invoiceEventLoop() {
636✔
305
        defer i.wg.Done()
636✔
306

636✔
307
        // Set up a heap for htlc auto-releases.
636✔
308
        autoReleaseHeap := &queue.PriorityQueue{}
636✔
309

636✔
310
        for {
3,005✔
311
                // If there is something to release, set up a release tick
2,369✔
312
                // channel.
2,369✔
313
                var nextReleaseTick <-chan time.Time
2,369✔
314
                if autoReleaseHeap.Len() > 0 {
3,020✔
315
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
651✔
316
                        nextReleaseTick = i.tickAt(head.releaseTime)
651✔
317
                }
651✔
318

319
                select {
2,369✔
320
                // A sub-systems has just modified the invoice state, so we'll
321
                // dispatch notifications to all registered clients.
322
                case event := <-i.invoiceEvents:
1,391✔
323
                        // For backwards compatibility, do not notify all
1,391✔
324
                        // invoice subscribers of cancel and accept events.
1,391✔
325
                        state := event.invoice.State
1,391✔
326
                        if state != ContractCanceled &&
1,391✔
327
                                state != ContractAccepted {
2,626✔
328

1,235✔
329
                                i.dispatchToClients(event)
1,235✔
330
                        }
1,235✔
331
                        i.dispatchToSingleClients(event)
1,391✔
332

333
                // A new htlc came in for auto-release.
334
                case event := <-i.htlcAutoReleaseChan:
330✔
335
                        log.Debugf("Scheduling auto-release for htlc: "+
330✔
336
                                "ref=%v, key=%v at %v",
330✔
337
                                event.invoiceRef, event.key, event.releaseTime)
330✔
338

330✔
339
                        // We use an independent timer for every htlc rather
330✔
340
                        // than a set timer that is reset with every htlc coming
330✔
341
                        // in. Otherwise the sender could keep resetting the
330✔
342
                        // timer until the broadcast window is entered and our
330✔
343
                        // channel is force closed.
330✔
344
                        autoReleaseHeap.Push(event)
330✔
345

346
                // The htlc at the top of the heap needs to be auto-released.
347
                case <-nextReleaseTick:
12✔
348
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
12✔
349
                        err := i.cancelSingleHtlc(
12✔
350
                                event.invoiceRef, event.key, ResultMppTimeout,
12✔
351
                        )
12✔
352
                        if err != nil {
12✔
353
                                log.Errorf("HTLC timer: %v", err)
×
354
                        }
×
355

356
                case <-i.quit:
375✔
357
                        return
375✔
358
                }
359
        }
360
}
361

362
// dispatchToSingleClients passes the supplied event to all notification
363
// clients that subscribed to all the invoice this event applies to.
364
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
1,391✔
365
        // Dispatch to single invoice subscribers.
1,391✔
366
        clients := i.copySingleClients()
1,391✔
367
        for _, client := range clients {
1,427✔
368
                payHash := client.invoiceRef.PayHash()
36✔
369

36✔
370
                if payHash == nil || *payHash != event.hash {
36✔
371
                        continue
×
372
                }
373

374
                select {
36✔
375
                case <-client.backlogDelivered:
36✔
376
                        // We won't deliver any events until the backlog has
377
                        // went through first.
378
                case <-i.quit:
×
379
                        return
×
380
                }
381

382
                client.notify(event)
36✔
383
        }
384
}
385

386
// dispatchToClients passes the supplied event to all notification clients that
387
// subscribed to all invoices. Add and settle indices are used to make sure
388
// that clients don't receive duplicate or unwanted events.
389
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
1,235✔
390
        invoice := event.invoice
1,235✔
391

1,235✔
392
        clients := i.copyClients()
1,235✔
393
        for clientID, client := range clients {
1,301✔
394
                // Before we dispatch this event, we'll check
66✔
395
                // to ensure that this client hasn't already
66✔
396
                // received this notification in order to
66✔
397
                // ensure we don't duplicate any events.
66✔
398

66✔
399
                // TODO(joostjager): Refactor switches.
66✔
400
                state := event.invoice.State
66✔
401
                switch {
66✔
402
                // If we've already sent this settle event to
403
                // the client, then we can skip this.
404
                case state == ContractSettled &&
405
                        client.settleIndex >= invoice.SettleIndex:
×
406
                        continue
×
407

408
                // Similarly, if we've already sent this add to
409
                // the client then we can skip this one, but only if this isn't
410
                // an AMP invoice. AMP invoices always remain in the settle
411
                // state as a base invoice.
412
                case event.setID == nil && state == ContractOpen &&
413
                        client.addIndex >= invoice.AddIndex:
×
414
                        continue
×
415

416
                // These two states should never happen, but we
417
                // log them just in case so we can detect this
418
                // instance.
419
                case state == ContractOpen &&
420
                        client.addIndex+1 != invoice.AddIndex:
7✔
421
                        log.Warnf("client=%v for invoice "+
7✔
422
                                "notifications missed an update, "+
7✔
423
                                "add_index=%v, new add event index=%v",
7✔
424
                                clientID, client.addIndex,
7✔
425
                                invoice.AddIndex)
7✔
426

427
                case state == ContractSettled &&
428
                        client.settleIndex+1 != invoice.SettleIndex:
×
429
                        log.Warnf("client=%v for invoice "+
×
430
                                "notifications missed an update, "+
×
431
                                "settle_index=%v, new settle event index=%v",
×
432
                                clientID, client.settleIndex,
×
433
                                invoice.SettleIndex)
×
434
                }
435

436
                select {
66✔
437
                case <-client.backlogDelivered:
66✔
438
                        // We won't deliver any events until the backlog has
439
                        // been processed.
440
                case <-i.quit:
×
441
                        return
×
442
                }
443

444
                err := client.notify(&invoiceEvent{
66✔
445
                        invoice: invoice,
66✔
446
                        setID:   event.setID,
66✔
447
                })
66✔
448
                if err != nil {
66✔
449
                        log.Errorf("Failed dispatching to client: %v", err)
×
450
                        return
×
451
                }
×
452

453
                // Each time we send a notification to a client, we'll record
454
                // the latest add/settle index it has. We'll use this to ensure
455
                // we don't send a notification twice, which can happen if a new
456
                // event is added while we're catching up a new client.
457
                invState := event.invoice.State
66✔
458
                switch {
66✔
459
                case invState == ContractSettled:
18✔
460
                        client.settleIndex = invoice.SettleIndex
18✔
461

462
                case invState == ContractOpen && event.setID == nil:
42✔
463
                        client.addIndex = invoice.AddIndex
42✔
464

465
                // If this is an AMP invoice, then we'll need to use the set ID
466
                // to keep track of the settle index of the client. AMP
467
                // invoices never go to the open state, but if a setID is
468
                // passed, then we know it was just settled and will track the
469
                // highest settle index so far.
470
                case invState == ContractOpen && event.setID != nil:
6✔
471
                        setID := *event.setID
6✔
472
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
6✔
473

474
                default:
×
475
                        log.Errorf("unexpected invoice state: %v",
×
476
                                event.invoice.State)
×
477
                }
478
        }
479
}
480

481
// deliverBacklogEvents will attempts to query the invoice database for any
482
// notifications that the client has missed since it reconnected last.
483
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
484
        client *InvoiceSubscription) error {
45✔
485

45✔
486
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
45✔
487
        if err != nil {
45✔
488
                return err
×
489
        }
×
490

491
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
45✔
492
        if err != nil {
45✔
493
                return err
×
494
        }
×
495

496
        // If we have any to deliver, then we'll append them to the end of the
497
        // notification queue in order to catch up the client before delivering
498
        // any new notifications.
499
        for _, addEvent := range addEvents {
45✔
500
                // We re-bind the loop variable to ensure we don't hold onto
×
501
                // the loop reference causing is to point to the same item.
×
502
                addEvent := addEvent
×
503

×
504
                select {
×
505
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
506
                        invoice: &addEvent,
507
                }:
×
508
                case <-i.quit:
×
509
                        return ErrShuttingDown
×
510
                }
511
        }
512

513
        for _, settleEvent := range settleEvents {
45✔
514
                // We re-bind the loop variable to ensure we don't hold onto
×
515
                // the loop reference causing is to point to the same item.
×
516
                settleEvent := settleEvent
×
517

×
518
                select {
×
519
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
520
                        invoice: &settleEvent,
521
                }:
×
522
                case <-i.quit:
×
523
                        return ErrShuttingDown
×
524
                }
525
        }
526

527
        return nil
45✔
528
}
529

530
// deliverSingleBacklogEvents will attempt to query the invoice database to
531
// retrieve the current invoice state and deliver this to the subscriber. Single
532
// invoice subscribers will always receive the current state right after
533
// subscribing. Only in case the invoice does not yet exist, nothing is sent
534
// yet.
535
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
536
        client *SingleInvoiceSubscription) error {
18✔
537

18✔
538
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
18✔
539

18✔
540
        // It is possible that the invoice does not exist yet, but the client is
18✔
541
        // already watching it in anticipation.
18✔
542
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
18✔
543
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
18✔
544
        if isNotFound || isNotCreated {
36✔
545
                return nil
18✔
546
        }
18✔
547
        if err != nil {
×
548
                return err
×
549
        }
×
550

551
        payHash := client.invoiceRef.PayHash()
×
552
        if payHash == nil {
×
553
                return nil
×
554
        }
×
555

556
        err = client.notify(&invoiceEvent{
×
557
                hash:    *payHash,
×
558
                invoice: &invoice,
×
559
        })
×
560
        if err != nil {
×
561
                return err
×
562
        }
×
563

564
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
×
565
                client.id, payHash)
×
566

×
567
        return nil
×
568
}
569

570
// AddInvoice adds a regular invoice for the specified amount, identified by
571
// the passed preimage. Additionally, any memo or receipt data provided will
572
// also be stored on-disk. Once this invoice is added, subsystems within the
573
// daemon add/forward HTLCs are able to obtain the proper preimage required for
574
// redemption in the case that we're the final destination. We also return the
575
// addIndex of the newly created invoice which monotonically increases for each
576
// new invoice added.  A side effect of this function is that it also sets
577
// AddIndex on the invoice argument.
578
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
579
        paymentHash lntypes.Hash) (uint64, error) {
720✔
580

720✔
581
        i.Lock()
720✔
582

720✔
583
        ref := InvoiceRefByHash(paymentHash)
720✔
584
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
720✔
585

720✔
586
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
720✔
587
        if err != nil {
735✔
588
                i.Unlock()
15✔
589
                return 0, err
15✔
590
        }
15✔
591

592
        // Now that we've added the invoice, we'll send dispatch a message to
593
        // notify the clients of this new invoice.
594
        i.notifyClients(paymentHash, invoice, nil)
705✔
595
        i.Unlock()
705✔
596

705✔
597
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
705✔
598
        // to avoid deadlock when a new invoice is added while an other is being
705✔
599
        // canceled.
705✔
600
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
705✔
601
        if invoiceExpiryRef != nil {
1,410✔
602
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
705✔
603
        }
705✔
604

605
        return addIndex, nil
705✔
606
}
607

608
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
609
// then we're able to pull the funds pending within an HTLC.
610
//
611
// TODO(roasbeef): ignore if settled?
612
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
613
        rHash lntypes.Hash) (Invoice, error) {
395✔
614

395✔
615
        // We'll check the database to see if there's an existing matching
395✔
616
        // invoice.
395✔
617
        ref := InvoiceRefByHash(rHash)
395✔
618
        return i.idb.LookupInvoice(ctx, ref)
395✔
619
}
395✔
620

621
// LookupInvoiceByRef looks up an invoice by the given reference, if found
622
// then we're able to pull the funds pending within an HTLC.
623
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
624
        ref InvoiceRef) (Invoice, error) {
×
625

×
626
        return i.idb.LookupInvoice(ctx, ref)
×
627
}
×
628

629
// startHtlcTimer starts a new timer via the invoice registry main loop that
630
// cancels a single htlc on an invoice when the htlc hold duration has passed.
631
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
632
        key CircuitKey, acceptTime time.Time) error {
330✔
633

330✔
634
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
330✔
635
        event := &htlcReleaseEvent{
330✔
636
                invoiceRef:  invoiceRef,
330✔
637
                key:         key,
330✔
638
                releaseTime: releaseTime,
330✔
639
        }
330✔
640

330✔
641
        select {
330✔
642
        case i.htlcAutoReleaseChan <- event:
330✔
643
                return nil
330✔
644

645
        case <-i.quit:
×
646
                return ErrShuttingDown
×
647
        }
648
}
649

650
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
651
// a resolution result which will be used to notify subscribed links and
652
// resolvers of the details of the htlc cancellation.
653
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
654
        key CircuitKey, result FailResolutionResult) error {
12✔
655

12✔
656
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
24✔
657
                // Only allow individual htlc cancellation on open invoices.
12✔
658
                if invoice.State != ContractOpen {
18✔
659
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
6✔
660
                                "open", invoiceRef)
6✔
661

6✔
662
                        return nil, nil
6✔
663
                }
6✔
664

665
                // Lookup the current status of the htlc in the database.
666
                var (
6✔
667
                        htlcState HtlcState
6✔
668
                        setID     *SetID
6✔
669
                )
6✔
670
                htlc, ok := invoice.Htlcs[key]
6✔
671
                if !ok {
6✔
672
                        // If this is an AMP invoice, then all the HTLCs won't
×
673
                        // be read out, so we'll consult the other mapping to
×
674
                        // try to find the HTLC state in question here.
×
675
                        var found bool
×
676
                        for ampSetID, htlcSet := range invoice.AMPState {
×
677
                                ampSetID := ampSetID
×
678
                                for htlcKey := range htlcSet.InvoiceKeys {
×
679
                                        if htlcKey == key {
×
680
                                                htlcState = htlcSet.State
×
681
                                                setID = &ampSetID
×
682

×
683
                                                found = true
×
684
                                                break
×
685
                                        }
686
                                }
687
                        }
688

689
                        if !found {
×
690
                                return nil, fmt.Errorf("htlc %v not found", key)
×
691
                        }
×
692
                } else {
6✔
693
                        htlcState = htlc.State
6✔
694
                }
6✔
695

696
                // Cancellation is only possible if the htlc wasn't already
697
                // resolved.
698
                if htlcState != HtlcStateAccepted {
6✔
699
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
700
                                "is already resolved", key, invoiceRef)
×
701

×
702
                        return nil, nil
×
703
                }
×
704

705
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
6✔
706
                        key, invoiceRef)
6✔
707

6✔
708
                // Return an update descriptor that cancels htlc and keeps
6✔
709
                // invoice open.
6✔
710
                canceledHtlcs := map[CircuitKey]struct{}{
6✔
711
                        key: {},
6✔
712
                }
6✔
713

6✔
714
                return &InvoiceUpdateDesc{
6✔
715
                        UpdateType:  CancelHTLCsUpdate,
6✔
716
                        CancelHtlcs: canceledHtlcs,
6✔
717
                        SetID:       setID,
6✔
718
                }, nil
6✔
719
        }
720

721
        // Try to mark the specified htlc as canceled in the invoice database.
722
        // Intercept the update descriptor to set the local updated variable. If
723
        // no invoice update is performed, we can return early.
724
        setID := (*SetID)(invoiceRef.SetID())
12✔
725
        var updated bool
12✔
726
        invoice, err := i.idb.UpdateInvoice(
12✔
727
                context.Background(), invoiceRef, setID,
12✔
728
                func(invoice *Invoice) (
12✔
729
                        *InvoiceUpdateDesc, error) {
24✔
730

12✔
731
                        updateDesc, err := updateInvoice(invoice)
12✔
732
                        if err != nil {
12✔
733
                                return nil, err
×
734
                        }
×
735
                        updated = updateDesc != nil
12✔
736

12✔
737
                        return updateDesc, err
12✔
738
                },
739
        )
740
        if err != nil {
12✔
741
                return err
×
742
        }
×
743
        if !updated {
18✔
744
                return nil
6✔
745
        }
6✔
746

747
        // The invoice has been updated. Notify subscribers of the htlc
748
        // resolution.
749
        htlc, ok := invoice.Htlcs[key]
6✔
750
        if !ok {
6✔
751
                return fmt.Errorf("htlc %v not found", key)
×
752
        }
×
753
        if htlc.State == HtlcStateCanceled {
12✔
754
                resolution := NewFailResolution(
6✔
755
                        key, int32(htlc.AcceptHeight), result,
6✔
756
                )
6✔
757

6✔
758
                i.notifyHodlSubscribers(resolution)
6✔
759
        }
6✔
760
        return nil
6✔
761
}
762

763
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
764
// htlc.
765
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
18✔
766
        // Retrieve keysend record if present.
18✔
767
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
18✔
768
        if !ok {
18✔
769
                return nil
×
770
        }
×
771

772
        // Cancel htlc is preimage is invalid.
773
        preimage, err := lntypes.MakePreimage(preimageSlice)
18✔
774
        if err != nil {
21✔
775
                return err
3✔
776
        }
3✔
777
        if preimage.Hash() != ctx.hash {
15✔
778
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
779
                        preimage, ctx.hash)
×
780
        }
×
781

782
        // Only allow keysend for non-mpp payments.
783
        if ctx.mpp != nil {
15✔
784
                return errors.New("no mpp keysend supported")
×
785
        }
×
786

787
        // Create an invoice for the htlc amount.
788
        amt := ctx.amtPaid
15✔
789

15✔
790
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
15✔
791
        // be able to pay to it with keysend.
15✔
792
        rawFeatures := lnwire.NewRawFeatureVector(
15✔
793
                lnwire.TLVOnionPayloadRequired,
15✔
794
        )
15✔
795
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
15✔
796

15✔
797
        // Use the minimum block delta that we require for settling htlcs.
15✔
798
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
15✔
799

15✔
800
        // Pre-check expiry here to prevent inserting an invoice that will not
15✔
801
        // be settled.
15✔
802
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
15✔
803
                return errors.New("final expiry too soon")
×
804
        }
×
805

806
        // The invoice database indexes all invoices by payment address, however
807
        // legacy keysend payment do not have one. In order to avoid a new
808
        // payment type on-disk wrt. to indexing, we'll continue to insert a
809
        // blank payment address which is special cased in the insertion logic
810
        // to not be indexed. In the future, once AMP is merged, this should be
811
        // replaced by generating a random payment address on the behalf of the
812
        // sender.
813
        payAddr := BlankPayAddr
15✔
814

15✔
815
        // Create placeholder invoice.
15✔
816
        invoice := &Invoice{
15✔
817
                CreationDate: i.cfg.Clock.Now(),
15✔
818
                Terms: ContractTerm{
15✔
819
                        FinalCltvDelta:  finalCltvDelta,
15✔
820
                        Value:           amt,
15✔
821
                        PaymentPreimage: &preimage,
15✔
822
                        PaymentAddr:     payAddr,
15✔
823
                        Features:        features,
15✔
824
                },
15✔
825
        }
15✔
826

15✔
827
        if i.cfg.KeysendHoldTime != 0 {
21✔
828
                invoice.HodlInvoice = true
6✔
829
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
830
        }
6✔
831

832
        // Insert invoice into database. Ignore duplicates, because this
833
        // may be a replay.
834
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
15✔
835
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
15✔
836
                return err
×
837
        }
×
838

839
        return nil
15✔
840
}
841

842
// processAMP just-in-time inserts an invoice if this htlc is a keysend
843
// htlc.
844
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
27✔
845
        // AMP payments MUST also include an MPP record.
27✔
846
        if ctx.mpp == nil {
30✔
847
                return errors.New("no MPP record for AMP")
3✔
848
        }
3✔
849

850
        // Create an invoice for the total amount expected, provided in the MPP
851
        // record.
852
        amt := ctx.mpp.TotalMsat()
24✔
853

24✔
854
        // Set the TLV required and MPP optional features on the invoice. We'll
24✔
855
        // also make the AMP features required so that it can't be paid by
24✔
856
        // legacy or MPP htlcs.
24✔
857
        rawFeatures := lnwire.NewRawFeatureVector(
24✔
858
                lnwire.TLVOnionPayloadRequired,
24✔
859
                lnwire.PaymentAddrOptional,
24✔
860
                lnwire.AMPRequired,
24✔
861
        )
24✔
862
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
24✔
863

24✔
864
        // Use the minimum block delta that we require for settling htlcs.
24✔
865
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
24✔
866

24✔
867
        // Pre-check expiry here to prevent inserting an invoice that will not
24✔
868
        // be settled.
24✔
869
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
24✔
870
                return errors.New("final expiry too soon")
×
871
        }
×
872

873
        // We'll use the sender-generated payment address provided in the HTLC
874
        // to create our AMP invoice.
875
        payAddr := ctx.mpp.PaymentAddr()
24✔
876

24✔
877
        // Create placeholder invoice.
24✔
878
        invoice := &Invoice{
24✔
879
                CreationDate: i.cfg.Clock.Now(),
24✔
880
                Terms: ContractTerm{
24✔
881
                        FinalCltvDelta:  finalCltvDelta,
24✔
882
                        Value:           amt,
24✔
883
                        PaymentPreimage: nil,
24✔
884
                        PaymentAddr:     payAddr,
24✔
885
                        Features:        features,
24✔
886
                },
24✔
887
        }
24✔
888

24✔
889
        // Insert invoice into database. Ignore duplicates payment hashes and
24✔
890
        // payment addrs, this may be a replay or a different HTLC for the AMP
24✔
891
        // invoice.
24✔
892
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
24✔
893
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
24✔
894
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
24✔
895
        switch {
24✔
896
        case isDuplicatedInvoice || isDuplicatedPayAddr:
12✔
897
                return nil
12✔
898
        default:
12✔
899
                return err
12✔
900
        }
901
}
902

903
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
904
// describes how the htlc should be resolved.
905
//
906
// When the preimage of the invoice is not yet known (hodl invoice), this
907
// function moves the invoice to the accepted state. When SettleHoldInvoice is
908
// called later, a resolution message will be send back to the caller via the
909
// provided hodlChan. Invoice registry sends on this channel what action needs
910
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
911
// the channel is either buffered or received on from another goroutine to
912
// prevent deadlock.
913
//
914
// In the case that the htlc is part of a larger set of htlcs that pay to the
915
// same invoice (multi-path payment), the htlc is held until the set is
916
// complete. If the set doesn't fully arrive in time, a timer will cancel the
917
// held htlc.
918
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
919
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
920
        circuitKey CircuitKey, hodlChan chan<- interface{},
921
        wireCustomRecords lnwire.CustomRecords,
922
        payload Payload) (HtlcResolution, error) {
946✔
923

946✔
924
        // Create the update context containing the relevant details of the
946✔
925
        // incoming htlc.
946✔
926
        ctx := invoiceUpdateCtx{
946✔
927
                hash:                 rHash,
946✔
928
                circuitKey:           circuitKey,
946✔
929
                amtPaid:              amtPaid,
946✔
930
                expiry:               expiry,
946✔
931
                currentHeight:        currentHeight,
946✔
932
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
946✔
933
                wireCustomRecords:    wireCustomRecords,
946✔
934
                customRecords:        payload.CustomRecords(),
946✔
935
                mpp:                  payload.MultiPath(),
946✔
936
                amp:                  payload.AMPRecord(),
946✔
937
                metadata:             payload.Metadata(),
946✔
938
                pathID:               payload.PathID(),
946✔
939
                totalAmtMsat:         payload.TotalAmtMsat(),
946✔
940
        }
946✔
941

946✔
942
        switch {
946✔
943
        // If we are accepting spontaneous AMP payments and this payload
944
        // contains an AMP record, create an AMP invoice that will be settled
945
        // below.
946
        case i.cfg.AcceptAMP && ctx.amp != nil:
27✔
947
                err := i.processAMP(ctx)
27✔
948
                if err != nil {
30✔
949
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
950

3✔
951
                        return NewFailResolution(
3✔
952
                                circuitKey, currentHeight, ResultAmpError,
3✔
953
                        ), nil
3✔
954
                }
3✔
955

956
        // If we are accepting spontaneous keysend payments, create a regular
957
        // invoice that will be settled below. We also enforce that this is only
958
        // done when no AMP payload is present since it will only be settle-able
959
        // by regular HTLCs.
960
        case i.cfg.AcceptKeySend && ctx.amp == nil:
18✔
961
                err := i.processKeySend(ctx)
18✔
962
                if err != nil {
21✔
963
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
964

3✔
965
                        return NewFailResolution(
3✔
966
                                circuitKey, currentHeight, ResultKeySendError,
3✔
967
                        ), nil
3✔
968
                }
3✔
969
        }
970

971
        // Execute locked notify exit hop logic.
972
        i.Lock()
940✔
973
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
940✔
974
                &ctx, hodlChan,
940✔
975
        )
940✔
976
        i.Unlock()
940✔
977
        if err != nil {
940✔
978
                return nil, err
×
979
        }
×
980

981
        if invoiceToExpire != nil {
1,022✔
982
                i.expiryWatcher.AddInvoices(invoiceToExpire)
82✔
983
        }
82✔
984

985
        switch r := resolution.(type) {
940✔
986
        // The htlc is held. Start a timer outside the lock if the htlc should
987
        // be auto-released, because otherwise a deadlock may happen with the
988
        // main event loop.
989
        case *htlcAcceptResolution:
419✔
990
                if r.autoRelease {
749✔
991
                        var invRef InvoiceRef
330✔
992
                        if ctx.amp != nil {
342✔
993
                                invRef = InvoiceRefBySetID(*ctx.setID())
12✔
994
                        } else {
330✔
995
                                invRef = ctx.invoiceRef()
318✔
996
                        }
318✔
997

998
                        err := i.startHtlcTimer(
330✔
999
                                invRef, circuitKey, r.acceptTime,
330✔
1000
                        )
330✔
1001
                        if err != nil {
330✔
1002
                                return nil, err
×
1003
                        }
×
1004
                }
1005

1006
                // We return a nil resolution because htlc acceptances are
1007
                // represented as nil resolutions externally.
1008
                // TODO(carla) update calling code to handle accept resolutions.
1009
                return nil, nil
419✔
1010

1011
        // A direct resolution was received for this htlc.
1012
        case HtlcResolution:
521✔
1013
                return r, nil
521✔
1014

1015
        // Fail if an unknown resolution type was received.
1016
        default:
×
1017
                return nil, errors.New("invalid resolution type")
×
1018
        }
1019
}
1020

1021
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1022
// that should be executed inside the registry lock. The returned invoiceExpiry
1023
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1024
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1025
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1026
        HtlcResolution, invoiceExpiry, error) {
940✔
1027

940✔
1028
        invoiceRef := ctx.invoiceRef()
940✔
1029
        setID := (*SetID)(ctx.setID())
940✔
1030

940✔
1031
        // We need to look up the current state of the invoice in order to send
940✔
1032
        // the previously accepted/settled HTLCs to the interceptor.
940✔
1033
        existingInvoice, err := i.idb.LookupInvoice(
940✔
1034
                context.Background(), invoiceRef,
940✔
1035
        )
940✔
1036
        switch {
940✔
1037
        case errors.Is(err, ErrInvoiceNotFound) ||
1038
                errors.Is(err, ErrNoInvoicesCreated):
22✔
1039

22✔
1040
                // If the invoice was not found, return a failure resolution
22✔
1041
                // with an invoice not found result.
22✔
1042
                return NewFailResolution(
22✔
1043
                        ctx.circuitKey, ctx.currentHeight,
22✔
1044
                        ResultInvoiceNotFound,
22✔
1045
                ), nil, nil
22✔
1046

1047
        case err != nil:
×
1048
                ctx.log(err.Error())
×
1049
                return nil, nil, err
×
1050
        }
1051

1052
        var cancelSet bool
918✔
1053

918✔
1054
        // Provide the invoice to the settlement interceptor to allow
918✔
1055
        // the interceptor's client an opportunity to manipulate the
918✔
1056
        // settlement process.
918✔
1057
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
918✔
1058
                WireCustomRecords:  ctx.wireCustomRecords,
918✔
1059
                ExitHtlcCircuitKey: ctx.circuitKey,
918✔
1060
                ExitHtlcAmt:        ctx.amtPaid,
918✔
1061
                ExitHtlcExpiry:     ctx.expiry,
918✔
1062
                CurrentHeight:      uint32(ctx.currentHeight),
918✔
1063
                Invoice:            existingInvoice,
918✔
1064
        }, func(resp HtlcModifyResponse) {
918✔
1065
                log.Debugf("Received invoice HTLC interceptor response: %v",
×
1066
                        resp)
×
1067

×
1068
                if resp.AmountPaid != 0 {
×
1069
                        ctx.amtPaid = resp.AmountPaid
×
1070
                }
×
1071

1072
                cancelSet = resp.CancelSet
×
1073
        })
1074
        if err != nil {
918✔
1075
                err := fmt.Errorf("error during invoice HTLC interception: %w",
×
1076
                        err)
×
1077
                ctx.log(err.Error())
×
1078

×
1079
                return nil, nil, err
×
1080
        }
×
1081

1082
        // We'll attempt to settle an invoice matching this rHash on disk (if
1083
        // one exists). The callback will update the invoice state and/or htlcs.
1084
        var (
918✔
1085
                resolution        HtlcResolution
918✔
1086
                updateSubscribers bool
918✔
1087
        )
918✔
1088
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1,836✔
1089
                updateDesc, res, err := updateInvoice(ctx, inv)
918✔
1090
                if err != nil {
918✔
1091
                        return nil, err
×
1092
                }
×
1093

1094
                // Only send an update if the invoice state was changed.
1095
                updateSubscribers = updateDesc != nil &&
918✔
1096
                        updateDesc.State != nil
918✔
1097

918✔
1098
                // Assign resolution to outer scope variable.
918✔
1099
                if cancelSet {
918✔
1100
                        // If a cancel signal was set for the htlc set, we set
×
1101
                        // the resolution as a failure with an underpayment
×
1102
                        // indication. Something was wrong with this htlc, so
×
1103
                        // we probably can't settle the invoice at all.
×
1104
                        resolution = NewFailResolution(
×
1105
                                ctx.circuitKey, ctx.currentHeight,
×
1106
                                ResultAmountTooLow,
×
1107
                        )
×
1108
                } else {
918✔
1109
                        resolution = res
918✔
1110
                }
918✔
1111

1112
                return updateDesc, nil
918✔
1113
        }
1114

1115
        invoice, err := i.idb.UpdateInvoice(
918✔
1116
                context.Background(), invoiceRef, setID, callback,
918✔
1117
        )
918✔
1118

918✔
1119
        var duplicateSetIDErr ErrDuplicateSetID
918✔
1120
        if errors.As(err, &duplicateSetIDErr) {
918✔
1121
                return NewFailResolution(
×
1122
                        ctx.circuitKey, ctx.currentHeight,
×
1123
                        ResultInvoiceNotFound,
×
1124
                ), nil, nil
×
1125
        }
×
1126

1127
        switch {
918✔
1128
        case errors.Is(err, ErrInvoiceNotFound):
×
1129
                // If the invoice was not found, return a failure resolution
×
1130
                // with an invoice not found result.
×
1131
                return NewFailResolution(
×
1132
                        ctx.circuitKey, ctx.currentHeight,
×
1133
                        ResultInvoiceNotFound,
×
1134
                ), nil, nil
×
1135

1136
        case errors.Is(err, ErrInvRefEquivocation):
×
1137
                return NewFailResolution(
×
1138
                        ctx.circuitKey, ctx.currentHeight,
×
1139
                        ResultInvoiceNotFound,
×
1140
                ), nil, nil
×
1141

1142
        case err == nil:
918✔
1143

1144
        default:
×
1145
                ctx.log(err.Error())
×
1146
                return nil, nil, err
×
1147
        }
1148

1149
        var invoiceToExpire invoiceExpiry
918✔
1150

918✔
1151
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
918✔
1152

918✔
1153
        switch res := resolution.(type) {
918✔
1154
        case *HtlcFailResolution:
26✔
1155
                // Inspect latest htlc state on the invoice. If it is found,
26✔
1156
                // we will update the accept height as it was recorded in the
26✔
1157
                // invoice database (which occurs in the case where the htlc
26✔
1158
                // reached the database in a previous call). If the htlc was
26✔
1159
                // not found on the invoice, it was immediately failed so we
26✔
1160
                // send the failure resolution as is, which has the current
26✔
1161
                // height set as the accept height.
26✔
1162
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
26✔
1163
                if ok {
29✔
1164
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1165
                }
3✔
1166

1167
                ctx.log(fmt.Sprintf("failure resolution result "+
26✔
1168
                        "outcome: %v, at accept height: %v",
26✔
1169
                        res.Outcome, res.AcceptHeight))
26✔
1170

26✔
1171
                // Some failures apply to the entire HTLC set. Break here if
26✔
1172
                // this isn't one of them.
26✔
1173
                if !res.Outcome.IsSetFailure() {
46✔
1174
                        break
20✔
1175
                }
1176

1177
                // Also cancel any HTLCs in the HTLC set that are also in the
1178
                // canceled state with the same failure result.
1179
                setID := ctx.setID()
6✔
1180
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
6✔
1181
                for key, htlc := range canceledHtlcSet {
12✔
1182
                        htlcFailResolution := NewFailResolution(
6✔
1183
                                key, int32(htlc.AcceptHeight), res.Outcome,
6✔
1184
                        )
6✔
1185

6✔
1186
                        i.notifyHodlSubscribers(htlcFailResolution)
6✔
1187
                }
6✔
1188

1189
        // If the htlc was settled, we will settle any previously accepted
1190
        // htlcs and notify our peer to settle them.
1191
        case *HtlcSettleResolution:
473✔
1192
                ctx.log(fmt.Sprintf("settle resolution result "+
473✔
1193
                        "outcome: %v, at accept height: %v",
473✔
1194
                        res.Outcome, res.AcceptHeight))
473✔
1195

473✔
1196
                // Also settle any previously accepted htlcs. If a htlc is
473✔
1197
                // marked as settled, we should follow now and settle the htlc
473✔
1198
                // with our peer.
473✔
1199
                setID := ctx.setID()
473✔
1200
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
473✔
1201
                for key, htlc := range settledHtlcSet {
1,258✔
1202
                        preimage := res.Preimage
785✔
1203
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
797✔
1204
                                preimage = *htlc.AMP.Preimage
12✔
1205
                        }
12✔
1206

1207
                        // Notify subscribers that the htlcs should be settled
1208
                        // with our peer. Note that the outcome of the
1209
                        // resolution is set based on the outcome of the single
1210
                        // htlc that we just settled, so may not be accurate
1211
                        // for all htlcs.
1212
                        htlcSettleResolution := NewSettleResolution(
785✔
1213
                                preimage, key,
785✔
1214
                                int32(htlc.AcceptHeight), res.Outcome,
785✔
1215
                        )
785✔
1216

785✔
1217
                        // Notify subscribers that the htlc should be settled
785✔
1218
                        // with our peer.
785✔
1219
                        i.notifyHodlSubscribers(htlcSettleResolution)
785✔
1220
                }
1221

1222
                // If concurrent payments were attempted to this invoice before
1223
                // the current one was ultimately settled, cancel back any of
1224
                // the HTLCs immediately. As a result of the settle, the HTLCs
1225
                // in other HTLC sets are automatically converted to a canceled
1226
                // state when updating the invoice.
1227
                //
1228
                // TODO(roasbeef): can remove now??
1229
                canceledHtlcSet := invoice.HTLCSetCompliment(
473✔
1230
                        setID, HtlcStateCanceled,
473✔
1231
                )
473✔
1232
                for key, htlc := range canceledHtlcSet {
473✔
1233
                        htlcFailResolution := NewFailResolution(
×
1234
                                key, int32(htlc.AcceptHeight),
×
1235
                                ResultInvoiceAlreadySettled,
×
1236
                        )
×
1237

×
1238
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1239
                }
×
1240

1241
        // If we accepted the htlc, subscribe to the hodl invoice and return
1242
        // an accept resolution with the htlc's accept time on it.
1243
        case *htlcAcceptResolution:
419✔
1244
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
419✔
1245
                if !ok {
419✔
1246
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1247
                                " present on invoice: %x", ctx.circuitKey,
×
1248
                                ctx.hash[:])
×
1249
                }
×
1250

1251
                // Determine accepted height of this htlc. If the htlc reached
1252
                // the invoice database (possibly in a previous call to the
1253
                // invoice registry), we'll take the original accepted height
1254
                // as it was recorded in the database.
1255
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
419✔
1256

419✔
1257
                ctx.log(fmt.Sprintf("accept resolution result "+
419✔
1258
                        "outcome: %v, at accept height: %v",
419✔
1259
                        res.outcome, acceptHeight))
419✔
1260

419✔
1261
                // Auto-release the htlc if the invoice is still open. It can
419✔
1262
                // only happen for mpp payments that there are htlcs in state
419✔
1263
                // Accepted while the invoice is Open.
419✔
1264
                if invoice.State == ContractOpen {
749✔
1265
                        res.acceptTime = invoiceHtlc.AcceptTime
330✔
1266
                        res.autoRelease = true
330✔
1267
                }
330✔
1268

1269
                // If we have fully accepted the set of htlcs for this invoice,
1270
                // we can now add it to our invoice expiry watcher. We do not
1271
                // add invoices before they are fully accepted, because it is
1272
                // possible that we MppTimeout the htlcs, and then our relevant
1273
                // expiry height could change.
1274
                if res.outcome == resultAccepted {
501✔
1275
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
82✔
1276
                }
82✔
1277

1278
                i.hodlSubscribe(hodlChan, ctx.circuitKey)
419✔
1279

1280
        default:
×
1281
                panic("unknown action")
×
1282
        }
1283

1284
        // Now that the links have been notified of any state changes to their
1285
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1286
        // state changes.
1287
        if updateSubscribers {
1,470✔
1288
                // We'll add a setID onto the notification, but only if this is
552✔
1289
                // an AMP invoice being settled.
552✔
1290
                var setID *[32]byte
552✔
1291
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,016✔
1292
                        setID = ctx.setID()
464✔
1293
                }
464✔
1294

1295
                i.notifyClients(ctx.hash, invoice, setID)
552✔
1296
        }
1297

1298
        return resolution, invoiceToExpire, nil
918✔
1299
}
1300

1301
// SettleHodlInvoice sets the preimage of a hodl invoice.
1302
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1303
        preimage lntypes.Preimage) error {
69✔
1304

69✔
1305
        i.Lock()
69✔
1306
        defer i.Unlock()
69✔
1307

69✔
1308
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
138✔
1309
                switch invoice.State {
69✔
1310
                case ContractOpen:
×
1311
                        return nil, ErrInvoiceStillOpen
×
1312

1313
                case ContractCanceled:
×
1314
                        return nil, ErrInvoiceAlreadyCanceled
×
1315

1316
                case ContractSettled:
3✔
1317
                        return nil, ErrInvoiceAlreadySettled
3✔
1318
                }
1319

1320
                return &InvoiceUpdateDesc{
66✔
1321
                        UpdateType: SettleHodlInvoiceUpdate,
66✔
1322
                        State: &InvoiceStateUpdateDesc{
66✔
1323
                                NewState: ContractSettled,
66✔
1324
                                Preimage: &preimage,
66✔
1325
                        },
66✔
1326
                }, nil
66✔
1327
        }
1328

1329
        hash := preimage.Hash()
69✔
1330
        invoiceRef := InvoiceRefByHash(hash)
69✔
1331
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
69✔
1332
        if err != nil {
72✔
1333
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1334
                        preimage, err)
3✔
1335

3✔
1336
                return err
3✔
1337
        }
3✔
1338

1339
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
66✔
1340
                invoice.Terms.PaymentPreimage)
66✔
1341

66✔
1342
        // In the callback, we marked the invoice as settled. UpdateInvoice will
66✔
1343
        // have seen this and should have moved all htlcs that were accepted to
66✔
1344
        // the settled state. In the loop below, we go through all of these and
66✔
1345
        // notify links and resolvers that are waiting for resolution. Any htlcs
66✔
1346
        // that were already settled before, will be notified again. This isn't
66✔
1347
        // necessary but doesn't hurt either.
66✔
1348
        for key, htlc := range invoice.Htlcs {
135✔
1349
                if htlc.State != HtlcStateSettled {
69✔
1350
                        continue
×
1351
                }
1352

1353
                resolution := NewSettleResolution(
69✔
1354
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
69✔
1355
                )
69✔
1356

69✔
1357
                i.notifyHodlSubscribers(resolution)
69✔
1358
        }
1359
        i.notifyClients(hash, invoice, nil)
66✔
1360

66✔
1361
        return nil
66✔
1362
}
1363

1364
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1365
// payment hash.
1366
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1367
        payHash lntypes.Hash) error {
29✔
1368

29✔
1369
        return i.cancelInvoiceImpl(ctx, payHash, true)
29✔
1370
}
29✔
1371

1372
// shouldCancel examines the state of an invoice and whether we want to
1373
// cancel already accepted invoices, taking our force cancel boolean into
1374
// account. This is pulled out into its own function so that tests that mock
1375
// cancelInvoiceImpl can reuse this logic.
1376
func shouldCancel(state ContractState, cancelAccepted bool) bool {
94✔
1377
        if state != ContractAccepted {
160✔
1378
                return true
66✔
1379
        }
66✔
1380

1381
        // If the invoice is accepted, we should only cancel if we want to
1382
        // force cancellation of accepted invoices.
1383
        return cancelAccepted
28✔
1384
}
1385

1386
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1387
// payment hash. Accepted invoices will only be canceled if explicitly
1388
// requested to do so. It notifies subscribing links and resolvers that
1389
// the associated htlcs were canceled if they change state.
1390
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1391
        payHash lntypes.Hash, cancelAccepted bool) error {
103✔
1392

103✔
1393
        i.Lock()
103✔
1394
        defer i.Unlock()
103✔
1395

103✔
1396
        ref := InvoiceRefByHash(payHash)
103✔
1397
        log.Debugf("Invoice%v: canceling invoice", ref)
103✔
1398

103✔
1399
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
197✔
1400
                if !shouldCancel(invoice.State, cancelAccepted) {
106✔
1401
                        return nil, nil
12✔
1402
                }
12✔
1403

1404
                // Move invoice to the canceled state. Rely on validation in
1405
                // channeldb to return an error if the invoice is already
1406
                // settled or canceled.
1407
                return &InvoiceUpdateDesc{
82✔
1408
                        UpdateType: CancelInvoiceUpdate,
82✔
1409
                        State: &InvoiceStateUpdateDesc{
82✔
1410
                                NewState: ContractCanceled,
82✔
1411
                        },
82✔
1412
                }, nil
82✔
1413
        }
1414

1415
        invoiceRef := InvoiceRefByHash(payHash)
103✔
1416
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
103✔
1417

103✔
1418
        // Implement idempotency by returning success if the invoice was already
103✔
1419
        // canceled.
103✔
1420
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
106✔
1421
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1422
                return nil
3✔
1423
        }
3✔
1424
        if err != nil {
120✔
1425
                return err
20✔
1426
        }
20✔
1427

1428
        // Return without cancellation if the invoice state is ContractAccepted.
1429
        if invoice.State == ContractAccepted {
92✔
1430
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1431
                        "explicitly requested.", ref)
12✔
1432
                return nil
12✔
1433
        }
12✔
1434

1435
        log.Debugf("Invoice%v: canceled", ref)
68✔
1436

68✔
1437
        // In the callback, some htlcs may have been moved to the canceled
68✔
1438
        // state. We now go through all of these and notify links and resolvers
68✔
1439
        // that are waiting for resolution. Any htlcs that were already canceled
68✔
1440
        // before, will be notified again. This isn't necessary but doesn't hurt
68✔
1441
        // either.
68✔
1442
        for key, htlc := range invoice.Htlcs {
93✔
1443
                if htlc.State != HtlcStateCanceled {
25✔
1444
                        continue
×
1445
                }
1446

1447
                i.notifyHodlSubscribers(
25✔
1448
                        NewFailResolution(
25✔
1449
                                key, int32(htlc.AcceptHeight), ResultCanceled,
25✔
1450
                        ),
25✔
1451
                )
25✔
1452
        }
1453
        i.notifyClients(payHash, invoice, nil)
68✔
1454

68✔
1455
        // Attempt to also delete the invoice if requested through the registry
68✔
1456
        // config.
68✔
1457
        if i.cfg.GcCanceledInvoicesOnTheFly {
71✔
1458
                // Assemble the delete reference and attempt to delete through
3✔
1459
                // the invocice from the DB.
3✔
1460
                deleteRef := InvoiceDeleteRef{
3✔
1461
                        PayHash:     payHash,
3✔
1462
                        AddIndex:    invoice.AddIndex,
3✔
1463
                        SettleIndex: invoice.SettleIndex,
3✔
1464
                }
3✔
1465
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1466
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1467
                }
×
1468

1469
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1470
                // If by any chance deletion failed, then log it instead of
3✔
1471
                // returning the error, as the invoice itself has already been
3✔
1472
                // canceled.
3✔
1473
                if err != nil {
3✔
1474
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1475
                                err)
×
1476
                }
×
1477
        }
1478

1479
        return nil
68✔
1480
}
1481

1482
// notifyClients notifies all currently registered invoice notification clients
1483
// of a newly added/settled invoice.
1484
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1485
        invoice *Invoice, setID *[32]byte) {
1,391✔
1486

1,391✔
1487
        event := &invoiceEvent{
1,391✔
1488
                invoice: invoice,
1,391✔
1489
                hash:    hash,
1,391✔
1490
                setID:   setID,
1,391✔
1491
        }
1,391✔
1492

1,391✔
1493
        select {
1,391✔
1494
        case i.invoiceEvents <- event:
1,391✔
1495
        case <-i.quit:
×
1496
        }
1497
}
1498

1499
// invoiceSubscriptionKit defines that are common to both all invoice
1500
// subscribers and single invoice subscribers.
1501
type invoiceSubscriptionKit struct {
1502
        id uint32 // nolint:structcheck
1503

1504
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1505
        quit chan struct{}
1506

1507
        ntfnQueue *queue.ConcurrentQueue
1508

1509
        canceled   uint32 // To be used atomically.
1510
        cancelChan chan struct{}
1511

1512
        // backlogDelivered is closed when the backlog events have been
1513
        // delivered.
1514
        backlogDelivered chan struct{}
1515
}
1516

1517
// InvoiceSubscription represents an intent to receive updates for newly added
1518
// or settled invoices. For each newly added invoice, a copy of the invoice
1519
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1520
// invoice, a copy of the invoice will be sent over the SettledInvoices
1521
// channel.
1522
type InvoiceSubscription struct {
1523
        invoiceSubscriptionKit
1524

1525
        // NewInvoices is a channel that we'll use to send all newly created
1526
        // invoices with an invoice index greater than the specified
1527
        // StartingInvoiceIndex field.
1528
        NewInvoices chan *Invoice
1529

1530
        // SettledInvoices is a channel that we'll use to send all settled
1531
        // invoices with an invoices index greater than the specified
1532
        // StartingInvoiceIndex field.
1533
        SettledInvoices chan *Invoice
1534

1535
        // addIndex is the highest add index the caller knows of. We'll use
1536
        // this information to send out an event backlog to the notifications
1537
        // subscriber. Any new add events with an index greater than this will
1538
        // be dispatched before any new notifications are sent out.
1539
        addIndex uint64
1540

1541
        // settleIndex is the highest settle index the caller knows of. We'll
1542
        // use this information to send out an event backlog to the
1543
        // notifications subscriber. Any new settle events with an index
1544
        // greater than this will be dispatched before any new notifications
1545
        // are sent out.
1546
        settleIndex uint64
1547
}
1548

1549
// SingleInvoiceSubscription represents an intent to receive updates for a
1550
// specific invoice.
1551
type SingleInvoiceSubscription struct {
1552
        invoiceSubscriptionKit
1553

1554
        invoiceRef InvoiceRef
1555

1556
        // Updates is a channel that we'll use to send all invoice events for
1557
        // the invoice that is subscribed to.
1558
        Updates chan *Invoice
1559
}
1560

1561
// PayHash returns the optional payment hash of the target invoice.
1562
//
1563
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1564
// be deleted as soon as invoiceregistery_test is in the same module.
1565
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1566
        return s.invoiceRef.PayHash()
18✔
1567
}
18✔
1568

1569
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1570
// resources.
1571
func (i *invoiceSubscriptionKit) Cancel() {
63✔
1572
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
63✔
1573
                return
×
1574
        }
×
1575

1576
        i.ntfnQueue.Stop()
63✔
1577
        close(i.cancelChan)
63✔
1578
}
1579

1580
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
102✔
1581
        select {
102✔
1582
        case i.ntfnQueue.ChanIn() <- event:
102✔
1583

1584
        case <-i.cancelChan:
×
1585
                // This can only be triggered by delivery of non-backlog
×
1586
                // events.
×
1587
                return ErrShuttingDown
×
1588
        case <-i.quit:
×
1589
                return ErrShuttingDown
×
1590
        }
1591

1592
        return nil
102✔
1593
}
1594

1595
// SubscribeNotifications returns an InvoiceSubscription which allows the
1596
// caller to receive async notifications when any invoices are settled or
1597
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1598
// by first sending out all new events with an invoice index _greater_ than
1599
// this value. Afterwards, we'll send out real-time notifications.
1600
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1601
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
45✔
1602

45✔
1603
        client := &InvoiceSubscription{
45✔
1604
                NewInvoices:     make(chan *Invoice),
45✔
1605
                SettledInvoices: make(chan *Invoice),
45✔
1606
                addIndex:        addIndex,
45✔
1607
                settleIndex:     settleIndex,
45✔
1608
                invoiceSubscriptionKit: invoiceSubscriptionKit{
45✔
1609
                        quit:             i.quit,
45✔
1610
                        ntfnQueue:        queue.NewConcurrentQueue(20),
45✔
1611
                        cancelChan:       make(chan struct{}),
45✔
1612
                        backlogDelivered: make(chan struct{}),
45✔
1613
                },
45✔
1614
        }
45✔
1615
        client.ntfnQueue.Start()
45✔
1616

45✔
1617
        // This notifies other goroutines that the backlog phase is over.
45✔
1618
        defer close(client.backlogDelivered)
45✔
1619

45✔
1620
        // Always increment by 1 first, and our client ID will start with 1,
45✔
1621
        // not 0.
45✔
1622
        client.id = atomic.AddUint32(&i.nextClientID, 1)
45✔
1623

45✔
1624
        // Before we register this new invoice subscription, we'll launch a new
45✔
1625
        // goroutine that will proxy all notifications appended to the end of
45✔
1626
        // the concurrent queue to the two client-side channels the caller will
45✔
1627
        // feed off of.
45✔
1628
        i.wg.Add(1)
45✔
1629
        go func() {
90✔
1630
                defer i.wg.Done()
45✔
1631
                defer i.deleteClient(client.id)
45✔
1632

45✔
1633
                for {
156✔
1634
                        select {
111✔
1635
                        // A new invoice event has been sent by the
1636
                        // invoiceRegistry! We'll figure out if this is an add
1637
                        // event or a settle event, then dispatch the event to
1638
                        // the client.
1639
                        case ntfn := <-client.ntfnQueue.ChanOut():
66✔
1640
                                invoiceEvent := ntfn.(*invoiceEvent)
66✔
1641

66✔
1642
                                var targetChan chan *Invoice
66✔
1643
                                state := invoiceEvent.invoice.State
66✔
1644
                                switch {
66✔
1645
                                // AMP invoices never move to settled, but will
1646
                                // be sent with a set ID if an HTLC set is
1647
                                // being settled.
1648
                                case state == ContractOpen &&
1649
                                        invoiceEvent.setID != nil:
6✔
1650
                                        fallthrough
6✔
1651

1652
                                case state == ContractSettled:
24✔
1653
                                        targetChan = client.SettledInvoices
24✔
1654

1655
                                case state == ContractOpen:
42✔
1656
                                        targetChan = client.NewInvoices
42✔
1657

1658
                                default:
×
1659
                                        log.Errorf("unknown invoice state: %v",
×
1660
                                                state)
×
1661

×
1662
                                        continue
×
1663
                                }
1664

1665
                                select {
66✔
1666
                                case targetChan <- invoiceEvent.invoice:
66✔
1667

1668
                                case <-client.cancelChan:
×
1669
                                        return
×
1670

1671
                                case <-i.quit:
×
1672
                                        return
×
1673
                                }
1674

1675
                        case <-client.cancelChan:
44✔
1676
                                return
44✔
1677

1678
                        case <-i.quit:
1✔
1679
                                return
1✔
1680
                        }
1681
                }
1682
        }()
1683

1684
        i.notificationClientMux.Lock()
45✔
1685
        i.notificationClients[client.id] = client
45✔
1686
        i.notificationClientMux.Unlock()
45✔
1687

45✔
1688
        // Query the database to see if based on the provided addIndex and
45✔
1689
        // settledIndex we need to deliver any backlog notifications.
45✔
1690
        err := i.deliverBacklogEvents(ctx, client)
45✔
1691
        if err != nil {
45✔
1692
                return nil, err
×
1693
        }
×
1694

1695
        log.Infof("New invoice subscription client: id=%v", client.id)
45✔
1696

45✔
1697
        return client, nil
45✔
1698
}
1699

1700
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1701
// caller to receive async notifications for a specific invoice.
1702
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1703
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
18✔
1704

18✔
1705
        client := &SingleInvoiceSubscription{
18✔
1706
                Updates: make(chan *Invoice),
18✔
1707
                invoiceSubscriptionKit: invoiceSubscriptionKit{
18✔
1708
                        quit:             i.quit,
18✔
1709
                        ntfnQueue:        queue.NewConcurrentQueue(20),
18✔
1710
                        cancelChan:       make(chan struct{}),
18✔
1711
                        backlogDelivered: make(chan struct{}),
18✔
1712
                },
18✔
1713
                invoiceRef: InvoiceRefByHash(hash),
18✔
1714
        }
18✔
1715
        client.ntfnQueue.Start()
18✔
1716

18✔
1717
        // This notifies other goroutines that the backlog phase is done.
18✔
1718
        defer close(client.backlogDelivered)
18✔
1719

18✔
1720
        // Always increment by 1 first, and our client ID will start with 1,
18✔
1721
        // not 0.
18✔
1722
        client.id = atomic.AddUint32(&i.nextClientID, 1)
18✔
1723

18✔
1724
        // Before we register this new invoice subscription, we'll launch a new
18✔
1725
        // goroutine that will proxy all notifications appended to the end of
18✔
1726
        // the concurrent queue to the two client-side channels the caller will
18✔
1727
        // feed off of.
18✔
1728
        i.wg.Add(1)
18✔
1729
        go func() {
36✔
1730
                defer i.wg.Done()
18✔
1731
                defer i.deleteClient(client.id)
18✔
1732

18✔
1733
                for {
72✔
1734
                        select {
54✔
1735
                        // A new invoice event has been sent by the
1736
                        // invoiceRegistry. We will dispatch the event to the
1737
                        // client.
1738
                        case ntfn := <-client.ntfnQueue.ChanOut():
36✔
1739
                                invoiceEvent := ntfn.(*invoiceEvent)
36✔
1740

36✔
1741
                                select {
36✔
1742
                                case client.Updates <- invoiceEvent.invoice:
36✔
1743

1744
                                case <-client.cancelChan:
×
1745
                                        return
×
1746

1747
                                case <-i.quit:
×
1748
                                        return
×
1749
                                }
1750

1751
                        case <-client.cancelChan:
17✔
1752
                                return
17✔
1753

1754
                        case <-i.quit:
1✔
1755
                                return
1✔
1756
                        }
1757
                }
1758
        }()
1759

1760
        i.notificationClientMux.Lock()
18✔
1761
        i.singleNotificationClients[client.id] = client
18✔
1762
        i.notificationClientMux.Unlock()
18✔
1763

18✔
1764
        err := i.deliverSingleBacklogEvents(ctx, client)
18✔
1765
        if err != nil {
18✔
1766
                return nil, err
×
1767
        }
×
1768

1769
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
18✔
1770
                client.id, client.invoiceRef)
18✔
1771

18✔
1772
        return client, nil
18✔
1773
}
1774

1775
// notifyHodlSubscribers sends out the htlc resolution to all current
1776
// subscribers.
1777
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
891✔
1778
        i.hodlSubscriptionsMux.Lock()
891✔
1779
        defer i.hodlSubscriptionsMux.Unlock()
891✔
1780

891✔
1781
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
891✔
1782
        if !ok {
1,370✔
1783
                return
479✔
1784
        }
479✔
1785

1786
        // Notify all interested subscribers and remove subscription from both
1787
        // maps. The subscription can be removed as there only ever will be a
1788
        // single resolution for each hash.
1789
        for subscriber := range subscribers {
824✔
1790
                select {
412✔
1791
                case subscriber <- htlcResolution:
412✔
1792
                case <-i.quit:
×
1793
                        return
×
1794
                }
1795

1796
                delete(
412✔
1797
                        i.hodlReverseSubscriptions[subscriber],
412✔
1798
                        htlcResolution.CircuitKey(),
412✔
1799
                )
412✔
1800
        }
1801

1802
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
412✔
1803
}
1804

1805
// hodlSubscribe adds a new invoice subscription.
1806
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1807
        circuitKey CircuitKey) {
419✔
1808

419✔
1809
        i.hodlSubscriptionsMux.Lock()
419✔
1810
        defer i.hodlSubscriptionsMux.Unlock()
419✔
1811

419✔
1812
        log.Debugf("Hodl subscribe for %v", circuitKey)
419✔
1813

419✔
1814
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
419✔
1815
        if !ok {
831✔
1816
                subscriptions = make(map[chan<- interface{}]struct{})
412✔
1817
                i.hodlSubscriptions[circuitKey] = subscriptions
412✔
1818
        }
412✔
1819
        subscriptions[subscriber] = struct{}{}
419✔
1820

419✔
1821
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
419✔
1822
        if !ok {
776✔
1823
                reverseSubscriptions = make(map[CircuitKey]struct{})
357✔
1824
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
357✔
1825
        }
357✔
1826
        reverseSubscriptions[circuitKey] = struct{}{}
419✔
1827
}
1828

1829
// HodlUnsubscribeAll cancels the subscription.
1830
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
202✔
1831
        i.hodlSubscriptionsMux.Lock()
202✔
1832
        defer i.hodlSubscriptionsMux.Unlock()
202✔
1833

202✔
1834
        hashes := i.hodlReverseSubscriptions[subscriber]
202✔
1835
        for hash := range hashes {
203✔
1836
                delete(i.hodlSubscriptions[hash], subscriber)
1✔
1837
        }
1✔
1838

1839
        delete(i.hodlReverseSubscriptions, subscriber)
202✔
1840
}
1841

1842
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1843
// useful when we need to iterate the map to send notifications.
1844
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
1,391✔
1845
        i.notificationClientMux.RLock()
1,391✔
1846
        defer i.notificationClientMux.RUnlock()
1,391✔
1847

1,391✔
1848
        clients := make(map[uint32]*SingleInvoiceSubscription)
1,391✔
1849
        for k, v := range i.singleNotificationClients {
1,427✔
1850
                clients[k] = v
36✔
1851
        }
36✔
1852
        return clients
1,391✔
1853
}
1854

1855
// copyClients copies i.notificationClients inside a lock. This is useful when
1856
// we need to iterate the map to send notifications.
1857
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
1,235✔
1858
        i.notificationClientMux.RLock()
1,235✔
1859
        defer i.notificationClientMux.RUnlock()
1,235✔
1860

1,235✔
1861
        clients := make(map[uint32]*InvoiceSubscription)
1,235✔
1862
        for k, v := range i.notificationClients {
1,301✔
1863
                clients[k] = v
66✔
1864
        }
66✔
1865
        return clients
1,235✔
1866
}
1867

1868
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1869
// not found.
1870
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
63✔
1871
        i.notificationClientMux.Lock()
63✔
1872
        defer i.notificationClientMux.Unlock()
63✔
1873

63✔
1874
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
63✔
1875
        delete(i.notificationClients, clientID)
63✔
1876
        delete(i.singleNotificationClients, clientID)
63✔
1877
}
63✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc