• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.34
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77

78
        // HtlcInterceptor is an interface that allows the invoice registry to
79
        // let clients intercept invoices before they are settled.
80
        HtlcInterceptor HtlcInterceptor
81
}
82

83
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
84
// mpp htlcs for which the complete set didn't arrive in time.
85
type htlcReleaseEvent struct {
86
        // invoiceRef identifiers the invoice this htlc belongs to.
87
        invoiceRef InvoiceRef
88

89
        // key is the circuit key of the htlc to release.
90
        key CircuitKey
91

92
        // releaseTime is the time at which to release the htlc.
93
        releaseTime time.Time
94
}
95

96
// Less is used to order PriorityQueueItem's by their release time such that
97
// items with the older release time are at the top of the queue.
98
//
99
// NOTE: Part of the queue.PriorityQueueItem interface.
100
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
6✔
101
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
6✔
102
}
6✔
103

104
// InvoiceRegistry is a central registry of all the outstanding invoices
105
// created by the daemon. The registry is a thin wrapper around a map in order
106
// to ensure that all updates/reads are thread safe.
107
type InvoiceRegistry struct {
108
        started atomic.Bool
109
        stopped atomic.Bool
110

111
        sync.RWMutex
112

113
        nextClientID uint32 // must be used atomically
114

115
        idb InvoiceDB
116

117
        // cfg contains the registry's configuration parameters.
118
        cfg *RegistryConfig
119

120
        // notificationClientMux locks notificationClients and
121
        // singleNotificationClients. Using a separate mutex for these maps is
122
        // necessary to avoid deadlocks in the registry when processing invoice
123
        // events.
124
        notificationClientMux sync.RWMutex
125

126
        notificationClients map[uint32]*InvoiceSubscription
127

128
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
129
        // performance.
130
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
131

132
        // invoiceEvents is a single channel over which invoice updates are
133
        // carried.
134
        invoiceEvents chan *invoiceEvent
135

136
        // hodlSubscriptionsMux locks the hodlSubscriptions and
137
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
138
        // necessary to avoid deadlocks in the registry when processing invoice
139
        // events.
140
        hodlSubscriptionsMux sync.RWMutex
141

142
        // hodlSubscriptions is a map from a circuit key to a list of
143
        // subscribers. It is used for efficient notification of links.
144
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
145

146
        // reverseSubscriptions tracks circuit keys subscribed to per
147
        // subscriber. This is used to unsubscribe from all hashes efficiently.
148
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
149

150
        // htlcAutoReleaseChan contains the new htlcs that need to be
151
        // auto-released.
152
        htlcAutoReleaseChan chan *htlcReleaseEvent
153

154
        expiryWatcher *InvoiceExpiryWatcher
155

156
        wg   sync.WaitGroup
157
        quit chan struct{}
158
}
159

160
// NewRegistry creates a new invoice registry. The invoice registry
161
// wraps the persistent on-disk invoice storage with an additional in-memory
162
// layer. The in-memory layer is in place such that debug invoices can be added
163
// which are volatile yet available system wide within the daemon.
164
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
165
        cfg *RegistryConfig) *InvoiceRegistry {
636✔
166

636✔
167
        notificationClients := make(map[uint32]*InvoiceSubscription)
636✔
168
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
636✔
169
        return &InvoiceRegistry{
636✔
170
                idb:                       idb,
636✔
171
                notificationClients:       notificationClients,
636✔
172
                singleNotificationClients: singleNotificationClients,
636✔
173
                invoiceEvents:             make(chan *invoiceEvent, 100),
636✔
174
                hodlSubscriptions: make(
636✔
175
                        map[CircuitKey]map[chan<- interface{}]struct{},
636✔
176
                ),
636✔
177
                hodlReverseSubscriptions: make(
636✔
178
                        map[chan<- interface{}]map[CircuitKey]struct{},
636✔
179
                ),
636✔
180
                cfg:                 cfg,
636✔
181
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
636✔
182
                expiryWatcher:       expiryWatcher,
636✔
183
                quit:                make(chan struct{}),
636✔
184
        }
636✔
185
}
636✔
186

187
// scanInvoicesOnStart will scan all invoices on start and add active invoices
188
// to the invoice expiry watcher while also attempting to delete all canceled
189
// invoices.
190
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
636✔
191
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
636✔
192
        if err != nil {
636✔
193
                return err
×
194
        }
×
195

196
        var pending []invoiceExpiry
636✔
197
        for paymentHash, invoice := range pendingInvoices {
666✔
198
                invoice := invoice
30✔
199
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
30✔
200
                if expiryRef != nil {
60✔
201
                        pending = append(pending, expiryRef)
30✔
202
                }
30✔
203
        }
204

205
        log.Debugf("Adding %d pending invoices to the expiry watcher",
636✔
206
                len(pending))
636✔
207
        i.expiryWatcher.AddInvoices(pending...)
636✔
208

636✔
209
        if i.cfg.GcCanceledInvoicesOnStartup {
639✔
210
                log.Infof("Deleting canceled invoices")
3✔
211
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
212
                if err != nil {
3✔
213
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
214
                        return err
×
215
                }
×
216
        }
217

218
        return nil
636✔
219
}
220

221
// Start starts the registry and all goroutines it needs to carry out its task.
222
func (i *InvoiceRegistry) Start() error {
636✔
223
        var err error
636✔
224

636✔
225
        log.Info("InvoiceRegistry starting...")
636✔
226

636✔
227
        if i.started.Swap(true) {
636✔
228
                return fmt.Errorf("InvoiceRegistry started more than once")
×
229
        }
×
230
        // Start InvoiceExpiryWatcher and prepopulate it with existing
231
        // active invoices.
232
        err = i.expiryWatcher.Start(
636✔
233
                func(hash lntypes.Hash, force bool) error {
710✔
234
                        return i.cancelInvoiceImpl(
74✔
235
                                context.Background(), hash, force,
74✔
236
                        )
74✔
237
                })
74✔
238
        if err != nil {
636✔
239
                return err
×
240
        }
×
241

242
        i.wg.Add(1)
636✔
243
        go i.invoiceEventLoop()
636✔
244

636✔
245
        // Now scan all pending and removable invoices to the expiry
636✔
246
        // watcher or delete them.
636✔
247
        err = i.scanInvoicesOnStart(context.Background())
636✔
248
        if err != nil {
636✔
249
                _ = i.Stop()
×
250
        }
×
251

252
        log.Debug("InvoiceRegistry started")
636✔
253

636✔
254
        return err
636✔
255
}
256

257
// Stop signals the registry for a graceful shutdown.
258
func (i *InvoiceRegistry) Stop() error {
375✔
259
        log.Info("InvoiceRegistry shutting down...")
375✔
260

375✔
261
        if i.stopped.Swap(true) {
375✔
262
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
263
        }
×
264

265
        log.Info("InvoiceRegistry shutting down...")
375✔
266
        defer log.Debug("InvoiceRegistry shutdown complete")
375✔
267

375✔
268
        var err error
375✔
269
        if i.expiryWatcher == nil {
375✔
270
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
271
                        "initialized")
×
272
        } else {
375✔
273
                i.expiryWatcher.Stop()
375✔
274
        }
375✔
275

276
        close(i.quit)
375✔
277

375✔
278
        i.wg.Wait()
375✔
279

375✔
280
        log.Debug("InvoiceRegistry shutdown complete")
375✔
281

375✔
282
        return err
375✔
283
}
284

285
// invoiceEvent represents a new event that has modified on invoice on disk.
286
// Only two event types are currently supported: newly created invoices, and
287
// instance where invoices are settled.
288
type invoiceEvent struct {
289
        hash    lntypes.Hash
290
        invoice *Invoice
291
        setID   *[32]byte
292
}
293

294
// tickAt returns a channel that ticks at the specified time. If the time has
295
// already passed, it will tick immediately.
296
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
652✔
297
        now := i.cfg.Clock.Now()
652✔
298
        return i.cfg.Clock.TickAfter(t.Sub(now))
652✔
299
}
652✔
300

301
// invoiceEventLoop is the dedicated goroutine responsible for accepting
302
// new notification subscriptions, cancelling old subscriptions, and
303
// dispatching new invoice events.
304
func (i *InvoiceRegistry) invoiceEventLoop() {
636✔
305
        defer i.wg.Done()
636✔
306

636✔
307
        // Set up a heap for htlc auto-releases.
636✔
308
        autoReleaseHeap := &queue.PriorityQueue{}
636✔
309

636✔
310
        for {
3,005✔
311
                // If there is something to release, set up a release tick
2,369✔
312
                // channel.
2,369✔
313
                var nextReleaseTick <-chan time.Time
2,369✔
314
                if autoReleaseHeap.Len() > 0 {
3,021✔
315
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
652✔
316
                        nextReleaseTick = i.tickAt(head.releaseTime)
652✔
317
                }
652✔
318

319
                select {
2,369✔
320
                // A sub-systems has just modified the invoice state, so we'll
321
                // dispatch notifications to all registered clients.
322
                case event := <-i.invoiceEvents:
1,391✔
323
                        // For backwards compatibility, do not notify all
1,391✔
324
                        // invoice subscribers of cancel and accept events.
1,391✔
325
                        state := event.invoice.State
1,391✔
326
                        if state != ContractCanceled &&
1,391✔
327
                                state != ContractAccepted {
2,626✔
328

1,235✔
329
                                i.dispatchToClients(event)
1,235✔
330
                        }
1,235✔
331
                        i.dispatchToSingleClients(event)
1,391✔
332

333
                // A new htlc came in for auto-release.
334
                case event := <-i.htlcAutoReleaseChan:
330✔
335
                        log.Debugf("Scheduling auto-release for htlc: "+
330✔
336
                                "ref=%v, key=%v at %v",
330✔
337
                                event.invoiceRef, event.key, event.releaseTime)
330✔
338

330✔
339
                        // We use an independent timer for every htlc rather
330✔
340
                        // than a set timer that is reset with every htlc coming
330✔
341
                        // in. Otherwise the sender could keep resetting the
330✔
342
                        // timer until the broadcast window is entered and our
330✔
343
                        // channel is force closed.
330✔
344
                        autoReleaseHeap.Push(event)
330✔
345

346
                // The htlc at the top of the heap needs to be auto-released.
347
                case <-nextReleaseTick:
12✔
348
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
12✔
349
                        err := i.cancelSingleHtlc(
12✔
350
                                event.invoiceRef, event.key, ResultMppTimeout,
12✔
351
                        )
12✔
352
                        if err != nil {
12✔
353
                                log.Errorf("HTLC timer: %v", err)
×
354
                        }
×
355

356
                case <-i.quit:
375✔
357
                        return
375✔
358
                }
359
        }
360
}
361

362
// dispatchToSingleClients passes the supplied event to all notification
363
// clients that subscribed to all the invoice this event applies to.
364
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
1,391✔
365
        // Dispatch to single invoice subscribers.
1,391✔
366
        clients := i.copySingleClients()
1,391✔
367
        for _, client := range clients {
1,427✔
368
                payHash := client.invoiceRef.PayHash()
36✔
369

36✔
370
                if payHash == nil || *payHash != event.hash {
36✔
UNCOV
371
                        continue
×
372
                }
373

374
                select {
36✔
375
                case <-client.backlogDelivered:
36✔
376
                        // We won't deliver any events until the backlog has
377
                        // went through first.
378
                case <-i.quit:
×
379
                        return
×
380
                }
381

382
                client.notify(event)
36✔
383
        }
384
}
385

386
// dispatchToClients passes the supplied event to all notification clients that
387
// subscribed to all invoices. Add and settle indices are used to make sure
388
// that clients don't receive duplicate or unwanted events.
389
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
1,235✔
390
        invoice := event.invoice
1,235✔
391

1,235✔
392
        clients := i.copyClients()
1,235✔
393
        for clientID, client := range clients {
1,301✔
394
                // Before we dispatch this event, we'll check
66✔
395
                // to ensure that this client hasn't already
66✔
396
                // received this notification in order to
66✔
397
                // ensure we don't duplicate any events.
66✔
398

66✔
399
                // TODO(joostjager): Refactor switches.
66✔
400
                state := event.invoice.State
66✔
401
                switch {
66✔
402
                // If we've already sent this settle event to
403
                // the client, then we can skip this.
404
                case state == ContractSettled &&
405
                        client.settleIndex >= invoice.SettleIndex:
×
406
                        continue
×
407

408
                // Similarly, if we've already sent this add to
409
                // the client then we can skip this one, but only if this isn't
410
                // an AMP invoice. AMP invoices always remain in the settle
411
                // state as a base invoice.
412
                case event.setID == nil && state == ContractOpen &&
413
                        client.addIndex >= invoice.AddIndex:
×
414
                        continue
×
415

416
                // These two states should never happen, but we
417
                // log them just in case so we can detect this
418
                // instance.
419
                case state == ContractOpen &&
420
                        client.addIndex+1 != invoice.AddIndex:
7✔
421
                        log.Warnf("client=%v for invoice "+
7✔
422
                                "notifications missed an update, "+
7✔
423
                                "add_index=%v, new add event index=%v",
7✔
424
                                clientID, client.addIndex,
7✔
425
                                invoice.AddIndex)
7✔
426

427
                case state == ContractSettled &&
UNCOV
428
                        client.settleIndex+1 != invoice.SettleIndex:
×
UNCOV
429
                        log.Warnf("client=%v for invoice "+
×
UNCOV
430
                                "notifications missed an update, "+
×
UNCOV
431
                                "settle_index=%v, new settle event index=%v",
×
UNCOV
432
                                clientID, client.settleIndex,
×
UNCOV
433
                                invoice.SettleIndex)
×
434
                }
435

436
                select {
66✔
437
                case <-client.backlogDelivered:
66✔
438
                        // We won't deliver any events until the backlog has
439
                        // been processed.
440
                case <-i.quit:
×
441
                        return
×
442
                }
443

444
                err := client.notify(&invoiceEvent{
66✔
445
                        invoice: invoice,
66✔
446
                        setID:   event.setID,
66✔
447
                })
66✔
448
                if err != nil {
66✔
449
                        log.Errorf("Failed dispatching to client: %v", err)
×
450
                        return
×
451
                }
×
452

453
                // Each time we send a notification to a client, we'll record
454
                // the latest add/settle index it has. We'll use this to ensure
455
                // we don't send a notification twice, which can happen if a new
456
                // event is added while we're catching up a new client.
457
                invState := event.invoice.State
66✔
458
                switch {
66✔
459
                case invState == ContractSettled:
18✔
460
                        client.settleIndex = invoice.SettleIndex
18✔
461

462
                case invState == ContractOpen && event.setID == nil:
42✔
463
                        client.addIndex = invoice.AddIndex
42✔
464

465
                // If this is an AMP invoice, then we'll need to use the set ID
466
                // to keep track of the settle index of the client. AMP
467
                // invoices never go to the open state, but if a setID is
468
                // passed, then we know it was just settled and will track the
469
                // highest settle index so far.
470
                case invState == ContractOpen && event.setID != nil:
6✔
471
                        setID := *event.setID
6✔
472
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
6✔
473

474
                default:
×
475
                        log.Errorf("unexpected invoice state: %v",
×
476
                                event.invoice.State)
×
477
                }
478
        }
479
}
480

481
// deliverBacklogEvents will attempts to query the invoice database for any
482
// notifications that the client has missed since it reconnected last.
483
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
484
        client *InvoiceSubscription) error {
45✔
485

45✔
486
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
45✔
487
        if err != nil {
45✔
488
                return err
×
489
        }
×
490

491
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
45✔
492
        if err != nil {
45✔
493
                return err
×
494
        }
×
495

496
        // If we have any to deliver, then we'll append them to the end of the
497
        // notification queue in order to catch up the client before delivering
498
        // any new notifications.
499
        for _, addEvent := range addEvents {
45✔
UNCOV
500
                // We re-bind the loop variable to ensure we don't hold onto
×
UNCOV
501
                // the loop reference causing is to point to the same item.
×
UNCOV
502
                addEvent := addEvent
×
UNCOV
503

×
UNCOV
504
                select {
×
505
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
506
                        invoice: &addEvent,
UNCOV
507
                }:
×
508
                case <-i.quit:
×
509
                        return ErrShuttingDown
×
510
                }
511
        }
512

513
        for _, settleEvent := range settleEvents {
45✔
UNCOV
514
                // We re-bind the loop variable to ensure we don't hold onto
×
UNCOV
515
                // the loop reference causing is to point to the same item.
×
UNCOV
516
                settleEvent := settleEvent
×
UNCOV
517

×
UNCOV
518
                select {
×
519
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
520
                        invoice: &settleEvent,
UNCOV
521
                }:
×
522
                case <-i.quit:
×
523
                        return ErrShuttingDown
×
524
                }
525
        }
526

527
        return nil
45✔
528
}
529

530
// deliverSingleBacklogEvents will attempt to query the invoice database to
531
// retrieve the current invoice state and deliver this to the subscriber. Single
532
// invoice subscribers will always receive the current state right after
533
// subscribing. Only in case the invoice does not yet exist, nothing is sent
534
// yet.
535
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
536
        client *SingleInvoiceSubscription) error {
18✔
537

18✔
538
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
18✔
539

18✔
540
        // It is possible that the invoice does not exist yet, but the client is
18✔
541
        // already watching it in anticipation.
18✔
542
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
18✔
543
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
18✔
544
        if isNotFound || isNotCreated {
36✔
545
                return nil
18✔
546
        }
18✔
UNCOV
547
        if err != nil {
×
548
                return err
×
549
        }
×
550

UNCOV
551
        payHash := client.invoiceRef.PayHash()
×
UNCOV
552
        if payHash == nil {
×
553
                return nil
×
554
        }
×
555

UNCOV
556
        err = client.notify(&invoiceEvent{
×
UNCOV
557
                hash:    *payHash,
×
UNCOV
558
                invoice: &invoice,
×
UNCOV
559
        })
×
UNCOV
560
        if err != nil {
×
561
                return err
×
562
        }
×
563

UNCOV
564
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
×
UNCOV
565
                client.id, payHash)
×
UNCOV
566

×
UNCOV
567
        return nil
×
568
}
569

570
// AddInvoice adds a regular invoice for the specified amount, identified by
571
// the passed preimage. Additionally, any memo or receipt data provided will
572
// also be stored on-disk. Once this invoice is added, subsystems within the
573
// daemon add/forward HTLCs are able to obtain the proper preimage required for
574
// redemption in the case that we're the final destination. We also return the
575
// addIndex of the newly created invoice which monotonically increases for each
576
// new invoice added.  A side effect of this function is that it also sets
577
// AddIndex on the invoice argument.
578
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
579
        paymentHash lntypes.Hash) (uint64, error) {
720✔
580

720✔
581
        i.Lock()
720✔
582

720✔
583
        ref := InvoiceRefByHash(paymentHash)
720✔
584
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
720✔
585

720✔
586
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
720✔
587
        if err != nil {
735✔
588
                i.Unlock()
15✔
589
                return 0, err
15✔
590
        }
15✔
591

592
        // Now that we've added the invoice, we'll send dispatch a message to
593
        // notify the clients of this new invoice.
594
        i.notifyClients(paymentHash, invoice, nil)
705✔
595
        i.Unlock()
705✔
596

705✔
597
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
705✔
598
        // to avoid deadlock when a new invoice is added while an other is being
705✔
599
        // canceled.
705✔
600
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
705✔
601
        if invoiceExpiryRef != nil {
1,410✔
602
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
705✔
603
        }
705✔
604

605
        return addIndex, nil
705✔
606
}
607

608
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
609
// then we're able to pull the funds pending within an HTLC.
610
//
611
// TODO(roasbeef): ignore if settled?
612
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
613
        rHash lntypes.Hash) (Invoice, error) {
395✔
614

395✔
615
        // We'll check the database to see if there's an existing matching
395✔
616
        // invoice.
395✔
617
        ref := InvoiceRefByHash(rHash)
395✔
618
        return i.idb.LookupInvoice(ctx, ref)
395✔
619
}
395✔
620

621
// LookupInvoiceByRef looks up an invoice by the given reference, if found
622
// then we're able to pull the funds pending within an HTLC.
623
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
UNCOV
624
        ref InvoiceRef) (Invoice, error) {
×
UNCOV
625

×
UNCOV
626
        return i.idb.LookupInvoice(ctx, ref)
×
UNCOV
627
}
×
628

629
// startHtlcTimer starts a new timer via the invoice registry main loop that
630
// cancels a single htlc on an invoice when the htlc hold duration has passed.
631
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
632
        key CircuitKey, acceptTime time.Time) error {
330✔
633

330✔
634
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
330✔
635
        event := &htlcReleaseEvent{
330✔
636
                invoiceRef:  invoiceRef,
330✔
637
                key:         key,
330✔
638
                releaseTime: releaseTime,
330✔
639
        }
330✔
640

330✔
641
        select {
330✔
642
        case i.htlcAutoReleaseChan <- event:
330✔
643
                return nil
330✔
644

645
        case <-i.quit:
×
646
                return ErrShuttingDown
×
647
        }
648
}
649

650
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
651
// a resolution result which will be used to notify subscribed links and
652
// resolvers of the details of the htlc cancellation.
653
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
654
        key CircuitKey, result FailResolutionResult) error {
12✔
655

12✔
656
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
24✔
657
                // Only allow individual htlc cancellation on open invoices.
12✔
658
                if invoice.State != ContractOpen {
18✔
659
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
6✔
660
                                "open", invoiceRef)
6✔
661

6✔
662
                        return nil, nil
6✔
663
                }
6✔
664

665
                // Lookup the current status of the htlc in the database.
666
                var (
6✔
667
                        htlcState HtlcState
6✔
668
                        setID     *SetID
6✔
669
                )
6✔
670
                htlc, ok := invoice.Htlcs[key]
6✔
671
                if !ok {
6✔
672
                        // If this is an AMP invoice, then all the HTLCs won't
×
673
                        // be read out, so we'll consult the other mapping to
×
674
                        // try to find the HTLC state in question here.
×
675
                        var found bool
×
676
                        for ampSetID, htlcSet := range invoice.AMPState {
×
677
                                ampSetID := ampSetID
×
678
                                for htlcKey := range htlcSet.InvoiceKeys {
×
679
                                        if htlcKey == key {
×
680
                                                htlcState = htlcSet.State
×
681
                                                setID = &ampSetID
×
682

×
683
                                                found = true
×
684
                                                break
×
685
                                        }
686
                                }
687
                        }
688

689
                        if !found {
×
690
                                return nil, fmt.Errorf("htlc %v not found", key)
×
691
                        }
×
692
                } else {
6✔
693
                        htlcState = htlc.State
6✔
694
                }
6✔
695

696
                // Cancellation is only possible if the htlc wasn't already
697
                // resolved.
698
                if htlcState != HtlcStateAccepted {
6✔
699
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
700
                                "is already resolved", key, invoiceRef)
×
701

×
702
                        return nil, nil
×
703
                }
×
704

705
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
6✔
706
                        key, invoiceRef)
6✔
707

6✔
708
                // Return an update descriptor that cancels htlc and keeps
6✔
709
                // invoice open.
6✔
710
                canceledHtlcs := map[CircuitKey]struct{}{
6✔
711
                        key: {},
6✔
712
                }
6✔
713

6✔
714
                return &InvoiceUpdateDesc{
6✔
715
                        UpdateType:  CancelHTLCsUpdate,
6✔
716
                        CancelHtlcs: canceledHtlcs,
6✔
717
                        SetID:       setID,
6✔
718
                }, nil
6✔
719
        }
720

721
        // Try to mark the specified htlc as canceled in the invoice database.
722
        // Intercept the update descriptor to set the local updated variable. If
723
        // no invoice update is performed, we can return early.
724
        setID := (*SetID)(invoiceRef.SetID())
12✔
725
        var updated bool
12✔
726
        invoice, err := i.idb.UpdateInvoice(
12✔
727
                context.Background(), invoiceRef, setID,
12✔
728
                func(invoice *Invoice) (
12✔
729
                        *InvoiceUpdateDesc, error) {
24✔
730

12✔
731
                        updateDesc, err := updateInvoice(invoice)
12✔
732
                        if err != nil {
12✔
733
                                return nil, err
×
734
                        }
×
735
                        updated = updateDesc != nil
12✔
736

12✔
737
                        return updateDesc, err
12✔
738
                },
739
        )
740
        if err != nil {
12✔
741
                return err
×
742
        }
×
743
        if !updated {
18✔
744
                return nil
6✔
745
        }
6✔
746

747
        // The invoice has been updated. Notify subscribers of the htlc
748
        // resolution.
749
        htlc, ok := invoice.Htlcs[key]
6✔
750
        if !ok {
6✔
751
                return fmt.Errorf("htlc %v not found", key)
×
752
        }
×
753
        if htlc.State == HtlcStateCanceled {
12✔
754
                resolution := NewFailResolution(
6✔
755
                        key, int32(htlc.AcceptHeight), result,
6✔
756
                )
6✔
757

6✔
758
                i.notifyHodlSubscribers(resolution)
6✔
759
        }
6✔
760
        return nil
6✔
761
}
762

763
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
764
// htlc.
765
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
18✔
766
        // Retrieve keysend record if present.
18✔
767
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
18✔
768
        if !ok {
18✔
UNCOV
769
                return nil
×
UNCOV
770
        }
×
771

772
        // Cancel htlc is preimage is invalid.
773
        preimage, err := lntypes.MakePreimage(preimageSlice)
18✔
774
        if err != nil {
21✔
775
                return err
3✔
776
        }
3✔
777
        if preimage.Hash() != ctx.hash {
15✔
778
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
779
                        preimage, ctx.hash)
×
780
        }
×
781

782
        // Only allow keysend for non-mpp payments.
783
        if ctx.mpp != nil {
15✔
784
                return errors.New("no mpp keysend supported")
×
785
        }
×
786

787
        // Create an invoice for the htlc amount.
788
        amt := ctx.amtPaid
15✔
789

15✔
790
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
15✔
791
        // be able to pay to it with keysend.
15✔
792
        rawFeatures := lnwire.NewRawFeatureVector(
15✔
793
                lnwire.TLVOnionPayloadRequired,
15✔
794
        )
15✔
795
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
15✔
796

15✔
797
        // Use the minimum block delta that we require for settling htlcs.
15✔
798
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
15✔
799

15✔
800
        // Pre-check expiry here to prevent inserting an invoice that will not
15✔
801
        // be settled.
15✔
802
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
15✔
803
                return errors.New("final expiry too soon")
×
804
        }
×
805

806
        // The invoice database indexes all invoices by payment address, however
807
        // legacy keysend payment do not have one. In order to avoid a new
808
        // payment type on-disk wrt. to indexing, we'll continue to insert a
809
        // blank payment address which is special cased in the insertion logic
810
        // to not be indexed. In the future, once AMP is merged, this should be
811
        // replaced by generating a random payment address on the behalf of the
812
        // sender.
813
        payAddr := BlankPayAddr
15✔
814

15✔
815
        // Create placeholder invoice.
15✔
816
        invoice := &Invoice{
15✔
817
                CreationDate: i.cfg.Clock.Now(),
15✔
818
                Terms: ContractTerm{
15✔
819
                        FinalCltvDelta:  finalCltvDelta,
15✔
820
                        Value:           amt,
15✔
821
                        PaymentPreimage: &preimage,
15✔
822
                        PaymentAddr:     payAddr,
15✔
823
                        Features:        features,
15✔
824
                },
15✔
825
        }
15✔
826

15✔
827
        if i.cfg.KeysendHoldTime != 0 {
21✔
828
                invoice.HodlInvoice = true
6✔
829
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
830
        }
6✔
831

832
        // Insert invoice into database. Ignore duplicates, because this
833
        // may be a replay.
834
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
15✔
835
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
15✔
836
                return err
×
837
        }
×
838

839
        return nil
15✔
840
}
841

842
// processAMP just-in-time inserts an invoice if this htlc is a keysend
843
// htlc.
844
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
27✔
845
        // AMP payments MUST also include an MPP record.
27✔
846
        if ctx.mpp == nil {
30✔
847
                return errors.New("no MPP record for AMP")
3✔
848
        }
3✔
849

850
        // Create an invoice for the total amount expected, provided in the MPP
851
        // record.
852
        amt := ctx.mpp.TotalMsat()
24✔
853

24✔
854
        // Set the TLV required and MPP optional features on the invoice. We'll
24✔
855
        // also make the AMP features required so that it can't be paid by
24✔
856
        // legacy or MPP htlcs.
24✔
857
        rawFeatures := lnwire.NewRawFeatureVector(
24✔
858
                lnwire.TLVOnionPayloadRequired,
24✔
859
                lnwire.PaymentAddrOptional,
24✔
860
                lnwire.AMPRequired,
24✔
861
        )
24✔
862
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
24✔
863

24✔
864
        // Use the minimum block delta that we require for settling htlcs.
24✔
865
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
24✔
866

24✔
867
        // Pre-check expiry here to prevent inserting an invoice that will not
24✔
868
        // be settled.
24✔
869
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
24✔
870
                return errors.New("final expiry too soon")
×
871
        }
×
872

873
        // We'll use the sender-generated payment address provided in the HTLC
874
        // to create our AMP invoice.
875
        payAddr := ctx.mpp.PaymentAddr()
24✔
876

24✔
877
        // Create placeholder invoice.
24✔
878
        invoice := &Invoice{
24✔
879
                CreationDate: i.cfg.Clock.Now(),
24✔
880
                Terms: ContractTerm{
24✔
881
                        FinalCltvDelta:  finalCltvDelta,
24✔
882
                        Value:           amt,
24✔
883
                        PaymentPreimage: nil,
24✔
884
                        PaymentAddr:     payAddr,
24✔
885
                        Features:        features,
24✔
886
                },
24✔
887
        }
24✔
888

24✔
889
        // Insert invoice into database. Ignore duplicates payment hashes and
24✔
890
        // payment addrs, this may be a replay or a different HTLC for the AMP
24✔
891
        // invoice.
24✔
892
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
24✔
893
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
24✔
894
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
24✔
895
        switch {
24✔
896
        case isDuplicatedInvoice || isDuplicatedPayAddr:
12✔
897
                return nil
12✔
898
        default:
12✔
899
                return err
12✔
900
        }
901
}
902

903
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
904
// describes how the htlc should be resolved.
905
//
906
// When the preimage of the invoice is not yet known (hodl invoice), this
907
// function moves the invoice to the accepted state. When SettleHoldInvoice is
908
// called later, a resolution message will be send back to the caller via the
909
// provided hodlChan. Invoice registry sends on this channel what action needs
910
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
911
// the channel is either buffered or received on from another goroutine to
912
// prevent deadlock.
913
//
914
// In the case that the htlc is part of a larger set of htlcs that pay to the
915
// same invoice (multi-path payment), the htlc is held until the set is
916
// complete. If the set doesn't fully arrive in time, a timer will cancel the
917
// held htlc.
918
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
919
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
920
        circuitKey CircuitKey, hodlChan chan<- interface{},
921
        wireCustomRecords lnwire.CustomRecords,
922
        payload Payload) (HtlcResolution, error) {
946✔
923

946✔
924
        // Create the update context containing the relevant details of the
946✔
925
        // incoming htlc.
946✔
926
        ctx := invoiceUpdateCtx{
946✔
927
                hash:                 rHash,
946✔
928
                circuitKey:           circuitKey,
946✔
929
                amtPaid:              amtPaid,
946✔
930
                expiry:               expiry,
946✔
931
                currentHeight:        currentHeight,
946✔
932
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
946✔
933
                wireCustomRecords:    wireCustomRecords,
946✔
934
                customRecords:        payload.CustomRecords(),
946✔
935
                mpp:                  payload.MultiPath(),
946✔
936
                amp:                  payload.AMPRecord(),
946✔
937
                metadata:             payload.Metadata(),
946✔
938
                pathID:               payload.PathID(),
946✔
939
                totalAmtMsat:         payload.TotalAmtMsat(),
946✔
940
        }
946✔
941

946✔
942
        switch {
946✔
943
        // If we are accepting spontaneous AMP payments and this payload
944
        // contains an AMP record, create an AMP invoice that will be settled
945
        // below.
946
        case i.cfg.AcceptAMP && ctx.amp != nil:
27✔
947
                err := i.processAMP(ctx)
27✔
948
                if err != nil {
30✔
949
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
950

3✔
951
                        return NewFailResolution(
3✔
952
                                circuitKey, currentHeight, ResultAmpError,
3✔
953
                        ), nil
3✔
954
                }
3✔
955

956
        // If we are accepting spontaneous keysend payments, create a regular
957
        // invoice that will be settled below. We also enforce that this is only
958
        // done when no AMP payload is present since it will only be settle-able
959
        // by regular HTLCs.
960
        case i.cfg.AcceptKeySend && ctx.amp == nil:
18✔
961
                err := i.processKeySend(ctx)
18✔
962
                if err != nil {
21✔
963
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
964

3✔
965
                        return NewFailResolution(
3✔
966
                                circuitKey, currentHeight, ResultKeySendError,
3✔
967
                        ), nil
3✔
968
                }
3✔
969
        }
970

971
        // Execute locked notify exit hop logic.
972
        i.Lock()
940✔
973
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
940✔
974
                &ctx, hodlChan,
940✔
975
        )
940✔
976
        i.Unlock()
940✔
977
        if err != nil {
940✔
UNCOV
978
                return nil, err
×
UNCOV
979
        }
×
980

981
        if invoiceToExpire != nil {
1,022✔
982
                i.expiryWatcher.AddInvoices(invoiceToExpire)
82✔
983
        }
82✔
984

985
        switch r := resolution.(type) {
940✔
986
        // The htlc is held. Start a timer outside the lock if the htlc should
987
        // be auto-released, because otherwise a deadlock may happen with the
988
        // main event loop.
989
        case *htlcAcceptResolution:
419✔
990
                if r.autoRelease {
749✔
991
                        var invRef InvoiceRef
330✔
992
                        if ctx.amp != nil {
342✔
993
                                invRef = InvoiceRefBySetID(*ctx.setID())
12✔
994
                        } else {
330✔
995
                                invRef = ctx.invoiceRef()
318✔
996
                        }
318✔
997

998
                        err := i.startHtlcTimer(
330✔
999
                                invRef, circuitKey, r.acceptTime,
330✔
1000
                        )
330✔
1001
                        if err != nil {
330✔
1002
                                return nil, err
×
1003
                        }
×
1004
                }
1005

1006
                // We return a nil resolution because htlc acceptances are
1007
                // represented as nil resolutions externally.
1008
                // TODO(carla) update calling code to handle accept resolutions.
1009
                return nil, nil
419✔
1010

1011
        // A direct resolution was received for this htlc.
1012
        case HtlcResolution:
521✔
1013
                return r, nil
521✔
1014

1015
        // Fail if an unknown resolution type was received.
1016
        default:
×
1017
                return nil, errors.New("invalid resolution type")
×
1018
        }
1019
}
1020

1021
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1022
// that should be executed inside the registry lock. The returned invoiceExpiry
1023
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1024
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1025
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1026
        HtlcResolution, invoiceExpiry, error) {
940✔
1027

940✔
1028
        invoiceRef := ctx.invoiceRef()
940✔
1029
        setID := (*SetID)(ctx.setID())
940✔
1030

940✔
1031
        // We need to look up the current state of the invoice in order to send
940✔
1032
        // the previously accepted/settled HTLCs to the interceptor.
940✔
1033
        existingInvoice, err := i.idb.LookupInvoice(
940✔
1034
                context.Background(), invoiceRef,
940✔
1035
        )
940✔
1036
        switch {
940✔
1037
        case errors.Is(err, ErrInvoiceNotFound) ||
1038
                errors.Is(err, ErrNoInvoicesCreated):
22✔
1039

22✔
1040
                // If the invoice was not found, return a failure resolution
22✔
1041
                // with an invoice not found result.
22✔
1042
                return NewFailResolution(
22✔
1043
                        ctx.circuitKey, ctx.currentHeight,
22✔
1044
                        ResultInvoiceNotFound,
22✔
1045
                ), nil, nil
22✔
1046

1047
        case err != nil:
×
1048
                ctx.log(err.Error())
×
1049
                return nil, nil, err
×
1050
        }
1051

1052
        var cancelSet bool
918✔
1053

918✔
1054
        // Provide the invoice to the settlement interceptor to allow
918✔
1055
        // the interceptor's client an opportunity to manipulate the
918✔
1056
        // settlement process.
918✔
1057
        err = i.cfg.HtlcInterceptor.Intercept(HtlcModifyRequest{
918✔
1058
                WireCustomRecords:  ctx.wireCustomRecords,
918✔
1059
                ExitHtlcCircuitKey: ctx.circuitKey,
918✔
1060
                ExitHtlcAmt:        ctx.amtPaid,
918✔
1061
                ExitHtlcExpiry:     ctx.expiry,
918✔
1062
                CurrentHeight:      uint32(ctx.currentHeight),
918✔
1063
                Invoice:            existingInvoice,
918✔
1064
        }, func(resp HtlcModifyResponse) {
918✔
UNCOV
1065
                log.Debugf("Received invoice HTLC interceptor response: %v",
×
UNCOV
1066
                        resp)
×
UNCOV
1067

×
UNCOV
1068
                if resp.AmountPaid != 0 {
×
UNCOV
1069
                        ctx.amtPaid = resp.AmountPaid
×
UNCOV
1070
                }
×
1071

UNCOV
1072
                cancelSet = resp.CancelSet
×
1073
        })
1074
        if err != nil {
918✔
UNCOV
1075
                err := fmt.Errorf("error during invoice HTLC interception: %w",
×
UNCOV
1076
                        err)
×
UNCOV
1077
                ctx.log(err.Error())
×
UNCOV
1078

×
UNCOV
1079
                return nil, nil, err
×
UNCOV
1080
        }
×
1081

1082
        // We'll attempt to settle an invoice matching this rHash on disk (if
1083
        // one exists). The callback will update the invoice state and/or htlcs.
1084
        var (
918✔
1085
                resolution        HtlcResolution
918✔
1086
                updateSubscribers bool
918✔
1087
        )
918✔
1088
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
1,836✔
1089
                updateDesc, res, err := updateInvoice(ctx, inv)
918✔
1090
                if err != nil {
918✔
1091
                        return nil, err
×
1092
                }
×
1093

1094
                // Only send an update if the invoice state was changed.
1095
                updateSubscribers = updateDesc != nil &&
918✔
1096
                        updateDesc.State != nil
918✔
1097

918✔
1098
                // Assign resolution to outer scope variable.
918✔
1099
                if cancelSet {
918✔
1100
                        // If a cancel signal was set for the htlc set, we set
×
1101
                        // the resolution as a failure with an underpayment
×
1102
                        // indication. Something was wrong with this htlc, so
×
1103
                        // we probably can't settle the invoice at all.
×
1104
                        resolution = NewFailResolution(
×
1105
                                ctx.circuitKey, ctx.currentHeight,
×
1106
                                ResultAmountTooLow,
×
1107
                        )
×
1108
                } else {
918✔
1109
                        resolution = res
918✔
1110
                }
918✔
1111

1112
                return updateDesc, nil
918✔
1113
        }
1114

1115
        invoice, err := i.idb.UpdateInvoice(
918✔
1116
                context.Background(), invoiceRef, setID, callback,
918✔
1117
        )
918✔
1118

918✔
1119
        var duplicateSetIDErr ErrDuplicateSetID
918✔
1120
        if errors.As(err, &duplicateSetIDErr) {
918✔
1121
                return NewFailResolution(
×
1122
                        ctx.circuitKey, ctx.currentHeight,
×
1123
                        ResultInvoiceNotFound,
×
1124
                ), nil, nil
×
1125
        }
×
1126

1127
        switch {
918✔
1128
        case errors.Is(err, ErrInvoiceNotFound):
×
1129
                // If the invoice was not found, return a failure resolution
×
1130
                // with an invoice not found result.
×
1131
                return NewFailResolution(
×
1132
                        ctx.circuitKey, ctx.currentHeight,
×
1133
                        ResultInvoiceNotFound,
×
1134
                ), nil, nil
×
1135

1136
        case errors.Is(err, ErrInvRefEquivocation):
×
1137
                return NewFailResolution(
×
1138
                        ctx.circuitKey, ctx.currentHeight,
×
1139
                        ResultInvoiceNotFound,
×
1140
                ), nil, nil
×
1141

1142
        case err == nil:
918✔
1143

1144
        default:
×
1145
                ctx.log(err.Error())
×
1146
                return nil, nil, err
×
1147
        }
1148

1149
        var invoiceToExpire invoiceExpiry
918✔
1150

918✔
1151
        log.Tracef("Settlement resolution: %T %v", resolution, resolution)
918✔
1152

918✔
1153
        switch res := resolution.(type) {
918✔
1154
        case *HtlcFailResolution:
26✔
1155
                // Inspect latest htlc state on the invoice. If it is found,
26✔
1156
                // we will update the accept height as it was recorded in the
26✔
1157
                // invoice database (which occurs in the case where the htlc
26✔
1158
                // reached the database in a previous call). If the htlc was
26✔
1159
                // not found on the invoice, it was immediately failed so we
26✔
1160
                // send the failure resolution as is, which has the current
26✔
1161
                // height set as the accept height.
26✔
1162
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
26✔
1163
                if ok {
29✔
1164
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1165
                }
3✔
1166

1167
                ctx.log(fmt.Sprintf("failure resolution result "+
26✔
1168
                        "outcome: %v, at accept height: %v",
26✔
1169
                        res.Outcome, res.AcceptHeight))
26✔
1170

26✔
1171
                // Some failures apply to the entire HTLC set. Break here if
26✔
1172
                // this isn't one of them.
26✔
1173
                if !res.Outcome.IsSetFailure() {
46✔
1174
                        break
20✔
1175
                }
1176

1177
                // Also cancel any HTLCs in the HTLC set that are also in the
1178
                // canceled state with the same failure result.
1179
                setID := ctx.setID()
6✔
1180
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
6✔
1181
                for key, htlc := range canceledHtlcSet {
12✔
1182
                        htlcFailResolution := NewFailResolution(
6✔
1183
                                key, int32(htlc.AcceptHeight), res.Outcome,
6✔
1184
                        )
6✔
1185

6✔
1186
                        i.notifyHodlSubscribers(htlcFailResolution)
6✔
1187
                }
6✔
1188

1189
        // If the htlc was settled, we will settle any previously accepted
1190
        // htlcs and notify our peer to settle them.
1191
        case *HtlcSettleResolution:
473✔
1192
                ctx.log(fmt.Sprintf("settle resolution result "+
473✔
1193
                        "outcome: %v, at accept height: %v",
473✔
1194
                        res.Outcome, res.AcceptHeight))
473✔
1195

473✔
1196
                // Also settle any previously accepted htlcs. If a htlc is
473✔
1197
                // marked as settled, we should follow now and settle the htlc
473✔
1198
                // with our peer.
473✔
1199
                setID := ctx.setID()
473✔
1200
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
473✔
1201
                for key, htlc := range settledHtlcSet {
1,258✔
1202
                        preimage := res.Preimage
785✔
1203
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
797✔
1204
                                preimage = *htlc.AMP.Preimage
12✔
1205
                        }
12✔
1206

1207
                        // Notify subscribers that the htlcs should be settled
1208
                        // with our peer. Note that the outcome of the
1209
                        // resolution is set based on the outcome of the single
1210
                        // htlc that we just settled, so may not be accurate
1211
                        // for all htlcs.
1212
                        htlcSettleResolution := NewSettleResolution(
785✔
1213
                                preimage, key,
785✔
1214
                                int32(htlc.AcceptHeight), res.Outcome,
785✔
1215
                        )
785✔
1216

785✔
1217
                        // Notify subscribers that the htlc should be settled
785✔
1218
                        // with our peer.
785✔
1219
                        i.notifyHodlSubscribers(htlcSettleResolution)
785✔
1220
                }
1221

1222
                // If concurrent payments were attempted to this invoice before
1223
                // the current one was ultimately settled, cancel back any of
1224
                // the HTLCs immediately. As a result of the settle, the HTLCs
1225
                // in other HTLC sets are automatically converted to a canceled
1226
                // state when updating the invoice.
1227
                //
1228
                // TODO(roasbeef): can remove now??
1229
                canceledHtlcSet := invoice.HTLCSetCompliment(
473✔
1230
                        setID, HtlcStateCanceled,
473✔
1231
                )
473✔
1232
                for key, htlc := range canceledHtlcSet {
473✔
1233
                        htlcFailResolution := NewFailResolution(
×
1234
                                key, int32(htlc.AcceptHeight),
×
1235
                                ResultInvoiceAlreadySettled,
×
1236
                        )
×
1237

×
1238
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1239
                }
×
1240

1241
        // If we accepted the htlc, subscribe to the hodl invoice and return
1242
        // an accept resolution with the htlc's accept time on it.
1243
        case *htlcAcceptResolution:
419✔
1244
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
419✔
1245
                if !ok {
419✔
1246
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1247
                                " present on invoice: %x", ctx.circuitKey,
×
1248
                                ctx.hash[:])
×
1249
                }
×
1250

1251
                // Determine accepted height of this htlc. If the htlc reached
1252
                // the invoice database (possibly in a previous call to the
1253
                // invoice registry), we'll take the original accepted height
1254
                // as it was recorded in the database.
1255
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
419✔
1256

419✔
1257
                ctx.log(fmt.Sprintf("accept resolution result "+
419✔
1258
                        "outcome: %v, at accept height: %v",
419✔
1259
                        res.outcome, acceptHeight))
419✔
1260

419✔
1261
                // Auto-release the htlc if the invoice is still open. It can
419✔
1262
                // only happen for mpp payments that there are htlcs in state
419✔
1263
                // Accepted while the invoice is Open.
419✔
1264
                if invoice.State == ContractOpen {
749✔
1265
                        res.acceptTime = invoiceHtlc.AcceptTime
330✔
1266
                        res.autoRelease = true
330✔
1267
                }
330✔
1268

1269
                // If we have fully accepted the set of htlcs for this invoice,
1270
                // we can now add it to our invoice expiry watcher. We do not
1271
                // add invoices before they are fully accepted, because it is
1272
                // possible that we MppTimeout the htlcs, and then our relevant
1273
                // expiry height could change.
1274
                if res.outcome == resultAccepted {
501✔
1275
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
82✔
1276
                }
82✔
1277

1278
                // Subscribe to the resolution if the caller specified a
1279
                // notification channel.
1280
                if hodlChan != nil {
838✔
1281
                        i.hodlSubscribe(hodlChan, ctx.circuitKey)
419✔
1282
                }
419✔
1283

1284
        default:
×
1285
                panic("unknown action")
×
1286
        }
1287

1288
        // Now that the links have been notified of any state changes to their
1289
        // HTLCs, we'll go ahead and notify any clients waiting on the invoice
1290
        // state changes.
1291
        if updateSubscribers {
1,470✔
1292
                // We'll add a setID onto the notification, but only if this is
552✔
1293
                // an AMP invoice being settled.
552✔
1294
                var setID *[32]byte
552✔
1295
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,016✔
1296
                        setID = ctx.setID()
464✔
1297
                }
464✔
1298

1299
                i.notifyClients(ctx.hash, invoice, setID)
552✔
1300
        }
1301

1302
        return resolution, invoiceToExpire, nil
918✔
1303
}
1304

1305
// SettleHodlInvoice sets the preimage of a hodl invoice.
1306
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1307
        preimage lntypes.Preimage) error {
69✔
1308

69✔
1309
        i.Lock()
69✔
1310
        defer i.Unlock()
69✔
1311

69✔
1312
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
138✔
1313
                switch invoice.State {
69✔
1314
                case ContractOpen:
×
1315
                        return nil, ErrInvoiceStillOpen
×
1316

1317
                case ContractCanceled:
×
1318
                        return nil, ErrInvoiceAlreadyCanceled
×
1319

1320
                case ContractSettled:
3✔
1321
                        return nil, ErrInvoiceAlreadySettled
3✔
1322
                }
1323

1324
                return &InvoiceUpdateDesc{
66✔
1325
                        UpdateType: SettleHodlInvoiceUpdate,
66✔
1326
                        State: &InvoiceStateUpdateDesc{
66✔
1327
                                NewState: ContractSettled,
66✔
1328
                                Preimage: &preimage,
66✔
1329
                        },
66✔
1330
                }, nil
66✔
1331
        }
1332

1333
        hash := preimage.Hash()
69✔
1334
        invoiceRef := InvoiceRefByHash(hash)
69✔
1335
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
69✔
1336
        if err != nil {
72✔
1337
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1338
                        preimage, err)
3✔
1339

3✔
1340
                return err
3✔
1341
        }
3✔
1342

1343
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
66✔
1344
                invoice.Terms.PaymentPreimage)
66✔
1345

66✔
1346
        // In the callback, we marked the invoice as settled. UpdateInvoice will
66✔
1347
        // have seen this and should have moved all htlcs that were accepted to
66✔
1348
        // the settled state. In the loop below, we go through all of these and
66✔
1349
        // notify links and resolvers that are waiting for resolution. Any htlcs
66✔
1350
        // that were already settled before, will be notified again. This isn't
66✔
1351
        // necessary but doesn't hurt either.
66✔
1352
        for key, htlc := range invoice.Htlcs {
135✔
1353
                if htlc.State != HtlcStateSettled {
69✔
1354
                        continue
×
1355
                }
1356

1357
                resolution := NewSettleResolution(
69✔
1358
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
69✔
1359
                )
69✔
1360

69✔
1361
                i.notifyHodlSubscribers(resolution)
69✔
1362
        }
1363
        i.notifyClients(hash, invoice, nil)
66✔
1364

66✔
1365
        return nil
66✔
1366
}
1367

1368
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1369
// payment hash.
1370
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1371
        payHash lntypes.Hash) error {
29✔
1372

29✔
1373
        return i.cancelInvoiceImpl(ctx, payHash, true)
29✔
1374
}
29✔
1375

1376
// shouldCancel examines the state of an invoice and whether we want to
1377
// cancel already accepted invoices, taking our force cancel boolean into
1378
// account. This is pulled out into its own function so that tests that mock
1379
// cancelInvoiceImpl can reuse this logic.
1380
func shouldCancel(state ContractState, cancelAccepted bool) bool {
94✔
1381
        if state != ContractAccepted {
160✔
1382
                return true
66✔
1383
        }
66✔
1384

1385
        // If the invoice is accepted, we should only cancel if we want to
1386
        // force cancellation of accepted invoices.
1387
        return cancelAccepted
28✔
1388
}
1389

1390
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1391
// payment hash. Accepted invoices will only be canceled if explicitly
1392
// requested to do so. It notifies subscribing links and resolvers that
1393
// the associated htlcs were canceled if they change state.
1394
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1395
        payHash lntypes.Hash, cancelAccepted bool) error {
103✔
1396

103✔
1397
        i.Lock()
103✔
1398
        defer i.Unlock()
103✔
1399

103✔
1400
        ref := InvoiceRefByHash(payHash)
103✔
1401
        log.Debugf("Invoice%v: canceling invoice", ref)
103✔
1402

103✔
1403
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
197✔
1404
                if !shouldCancel(invoice.State, cancelAccepted) {
106✔
1405
                        return nil, nil
12✔
1406
                }
12✔
1407

1408
                // Move invoice to the canceled state. Rely on validation in
1409
                // channeldb to return an error if the invoice is already
1410
                // settled or canceled.
1411
                return &InvoiceUpdateDesc{
82✔
1412
                        UpdateType: CancelInvoiceUpdate,
82✔
1413
                        State: &InvoiceStateUpdateDesc{
82✔
1414
                                NewState: ContractCanceled,
82✔
1415
                        },
82✔
1416
                }, nil
82✔
1417
        }
1418

1419
        invoiceRef := InvoiceRefByHash(payHash)
103✔
1420
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
103✔
1421

103✔
1422
        // Implement idempotency by returning success if the invoice was already
103✔
1423
        // canceled.
103✔
1424
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
106✔
1425
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1426
                return nil
3✔
1427
        }
3✔
1428
        if err != nil {
120✔
1429
                return err
20✔
1430
        }
20✔
1431

1432
        // Return without cancellation if the invoice state is ContractAccepted.
1433
        if invoice.State == ContractAccepted {
92✔
1434
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1435
                        "explicitly requested.", ref)
12✔
1436
                return nil
12✔
1437
        }
12✔
1438

1439
        log.Debugf("Invoice%v: canceled", ref)
68✔
1440

68✔
1441
        // In the callback, some htlcs may have been moved to the canceled
68✔
1442
        // state. We now go through all of these and notify links and resolvers
68✔
1443
        // that are waiting for resolution. Any htlcs that were already canceled
68✔
1444
        // before, will be notified again. This isn't necessary but doesn't hurt
68✔
1445
        // either.
68✔
1446
        for key, htlc := range invoice.Htlcs {
93✔
1447
                if htlc.State != HtlcStateCanceled {
25✔
1448
                        continue
×
1449
                }
1450

1451
                i.notifyHodlSubscribers(
25✔
1452
                        NewFailResolution(
25✔
1453
                                key, int32(htlc.AcceptHeight), ResultCanceled,
25✔
1454
                        ),
25✔
1455
                )
25✔
1456
        }
1457
        i.notifyClients(payHash, invoice, nil)
68✔
1458

68✔
1459
        // Attempt to also delete the invoice if requested through the registry
68✔
1460
        // config.
68✔
1461
        if i.cfg.GcCanceledInvoicesOnTheFly {
71✔
1462
                // Assemble the delete reference and attempt to delete through
3✔
1463
                // the invocice from the DB.
3✔
1464
                deleteRef := InvoiceDeleteRef{
3✔
1465
                        PayHash:     payHash,
3✔
1466
                        AddIndex:    invoice.AddIndex,
3✔
1467
                        SettleIndex: invoice.SettleIndex,
3✔
1468
                }
3✔
1469
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
1470
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1471
                }
×
1472

1473
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1474
                // If by any chance deletion failed, then log it instead of
3✔
1475
                // returning the error, as the invoice itself has already been
3✔
1476
                // canceled.
3✔
1477
                if err != nil {
3✔
1478
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1479
                                err)
×
1480
                }
×
1481
        }
1482

1483
        return nil
68✔
1484
}
1485

1486
// notifyClients notifies all currently registered invoice notification clients
1487
// of a newly added/settled invoice.
1488
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1489
        invoice *Invoice, setID *[32]byte) {
1,391✔
1490

1,391✔
1491
        event := &invoiceEvent{
1,391✔
1492
                invoice: invoice,
1,391✔
1493
                hash:    hash,
1,391✔
1494
                setID:   setID,
1,391✔
1495
        }
1,391✔
1496

1,391✔
1497
        select {
1,391✔
1498
        case i.invoiceEvents <- event:
1,391✔
1499
        case <-i.quit:
×
1500
        }
1501
}
1502

1503
// invoiceSubscriptionKit defines that are common to both all invoice
1504
// subscribers and single invoice subscribers.
1505
type invoiceSubscriptionKit struct {
1506
        id uint32 // nolint:structcheck
1507

1508
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1509
        quit chan struct{}
1510

1511
        ntfnQueue *queue.ConcurrentQueue
1512

1513
        canceled   uint32 // To be used atomically.
1514
        cancelChan chan struct{}
1515

1516
        // backlogDelivered is closed when the backlog events have been
1517
        // delivered.
1518
        backlogDelivered chan struct{}
1519
}
1520

1521
// InvoiceSubscription represents an intent to receive updates for newly added
1522
// or settled invoices. For each newly added invoice, a copy of the invoice
1523
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1524
// invoice, a copy of the invoice will be sent over the SettledInvoices
1525
// channel.
1526
type InvoiceSubscription struct {
1527
        invoiceSubscriptionKit
1528

1529
        // NewInvoices is a channel that we'll use to send all newly created
1530
        // invoices with an invoice index greater than the specified
1531
        // StartingInvoiceIndex field.
1532
        NewInvoices chan *Invoice
1533

1534
        // SettledInvoices is a channel that we'll use to send all settled
1535
        // invoices with an invoices index greater than the specified
1536
        // StartingInvoiceIndex field.
1537
        SettledInvoices chan *Invoice
1538

1539
        // addIndex is the highest add index the caller knows of. We'll use
1540
        // this information to send out an event backlog to the notifications
1541
        // subscriber. Any new add events with an index greater than this will
1542
        // be dispatched before any new notifications are sent out.
1543
        addIndex uint64
1544

1545
        // settleIndex is the highest settle index the caller knows of. We'll
1546
        // use this information to send out an event backlog to the
1547
        // notifications subscriber. Any new settle events with an index
1548
        // greater than this will be dispatched before any new notifications
1549
        // are sent out.
1550
        settleIndex uint64
1551
}
1552

1553
// SingleInvoiceSubscription represents an intent to receive updates for a
1554
// specific invoice.
1555
type SingleInvoiceSubscription struct {
1556
        invoiceSubscriptionKit
1557

1558
        invoiceRef InvoiceRef
1559

1560
        // Updates is a channel that we'll use to send all invoice events for
1561
        // the invoice that is subscribed to.
1562
        Updates chan *Invoice
1563
}
1564

1565
// PayHash returns the optional payment hash of the target invoice.
1566
//
1567
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1568
// be deleted as soon as invoiceregistery_test is in the same module.
1569
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1570
        return s.invoiceRef.PayHash()
18✔
1571
}
18✔
1572

1573
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1574
// resources.
1575
func (i *invoiceSubscriptionKit) Cancel() {
63✔
1576
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
63✔
1577
                return
×
1578
        }
×
1579

1580
        i.ntfnQueue.Stop()
63✔
1581
        close(i.cancelChan)
63✔
1582
}
1583

1584
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
102✔
1585
        select {
102✔
1586
        case i.ntfnQueue.ChanIn() <- event:
102✔
1587

1588
        case <-i.cancelChan:
×
1589
                // This can only be triggered by delivery of non-backlog
×
1590
                // events.
×
1591
                return ErrShuttingDown
×
1592
        case <-i.quit:
×
1593
                return ErrShuttingDown
×
1594
        }
1595

1596
        return nil
102✔
1597
}
1598

1599
// SubscribeNotifications returns an InvoiceSubscription which allows the
1600
// caller to receive async notifications when any invoices are settled or
1601
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1602
// by first sending out all new events with an invoice index _greater_ than
1603
// this value. Afterwards, we'll send out real-time notifications.
1604
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1605
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
45✔
1606

45✔
1607
        client := &InvoiceSubscription{
45✔
1608
                NewInvoices:     make(chan *Invoice),
45✔
1609
                SettledInvoices: make(chan *Invoice),
45✔
1610
                addIndex:        addIndex,
45✔
1611
                settleIndex:     settleIndex,
45✔
1612
                invoiceSubscriptionKit: invoiceSubscriptionKit{
45✔
1613
                        quit:             i.quit,
45✔
1614
                        ntfnQueue:        queue.NewConcurrentQueue(20),
45✔
1615
                        cancelChan:       make(chan struct{}),
45✔
1616
                        backlogDelivered: make(chan struct{}),
45✔
1617
                },
45✔
1618
        }
45✔
1619
        client.ntfnQueue.Start()
45✔
1620

45✔
1621
        // This notifies other goroutines that the backlog phase is over.
45✔
1622
        defer close(client.backlogDelivered)
45✔
1623

45✔
1624
        // Always increment by 1 first, and our client ID will start with 1,
45✔
1625
        // not 0.
45✔
1626
        client.id = atomic.AddUint32(&i.nextClientID, 1)
45✔
1627

45✔
1628
        // Before we register this new invoice subscription, we'll launch a new
45✔
1629
        // goroutine that will proxy all notifications appended to the end of
45✔
1630
        // the concurrent queue to the two client-side channels the caller will
45✔
1631
        // feed off of.
45✔
1632
        i.wg.Add(1)
45✔
1633
        go func() {
90✔
1634
                defer i.wg.Done()
45✔
1635
                defer i.deleteClient(client.id)
45✔
1636

45✔
1637
                for {
156✔
1638
                        select {
111✔
1639
                        // A new invoice event has been sent by the
1640
                        // invoiceRegistry! We'll figure out if this is an add
1641
                        // event or a settle event, then dispatch the event to
1642
                        // the client.
1643
                        case ntfn := <-client.ntfnQueue.ChanOut():
66✔
1644
                                invoiceEvent := ntfn.(*invoiceEvent)
66✔
1645

66✔
1646
                                var targetChan chan *Invoice
66✔
1647
                                state := invoiceEvent.invoice.State
66✔
1648
                                switch {
66✔
1649
                                // AMP invoices never move to settled, but will
1650
                                // be sent with a set ID if an HTLC set is
1651
                                // being settled.
1652
                                case state == ContractOpen &&
1653
                                        invoiceEvent.setID != nil:
6✔
1654
                                        fallthrough
6✔
1655

1656
                                case state == ContractSettled:
24✔
1657
                                        targetChan = client.SettledInvoices
24✔
1658

1659
                                case state == ContractOpen:
42✔
1660
                                        targetChan = client.NewInvoices
42✔
1661

1662
                                default:
×
1663
                                        log.Errorf("unknown invoice state: %v",
×
1664
                                                state)
×
1665

×
1666
                                        continue
×
1667
                                }
1668

1669
                                select {
66✔
1670
                                case targetChan <- invoiceEvent.invoice:
66✔
1671

1672
                                case <-client.cancelChan:
×
1673
                                        return
×
1674

1675
                                case <-i.quit:
×
1676
                                        return
×
1677
                                }
1678

1679
                        case <-client.cancelChan:
44✔
1680
                                return
44✔
1681

1682
                        case <-i.quit:
1✔
1683
                                return
1✔
1684
                        }
1685
                }
1686
        }()
1687

1688
        i.notificationClientMux.Lock()
45✔
1689
        i.notificationClients[client.id] = client
45✔
1690
        i.notificationClientMux.Unlock()
45✔
1691

45✔
1692
        // Query the database to see if based on the provided addIndex and
45✔
1693
        // settledIndex we need to deliver any backlog notifications.
45✔
1694
        err := i.deliverBacklogEvents(ctx, client)
45✔
1695
        if err != nil {
45✔
1696
                return nil, err
×
1697
        }
×
1698

1699
        log.Infof("New invoice subscription client: id=%v", client.id)
45✔
1700

45✔
1701
        return client, nil
45✔
1702
}
1703

1704
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1705
// caller to receive async notifications for a specific invoice.
1706
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1707
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
18✔
1708

18✔
1709
        client := &SingleInvoiceSubscription{
18✔
1710
                Updates: make(chan *Invoice),
18✔
1711
                invoiceSubscriptionKit: invoiceSubscriptionKit{
18✔
1712
                        quit:             i.quit,
18✔
1713
                        ntfnQueue:        queue.NewConcurrentQueue(20),
18✔
1714
                        cancelChan:       make(chan struct{}),
18✔
1715
                        backlogDelivered: make(chan struct{}),
18✔
1716
                },
18✔
1717
                invoiceRef: InvoiceRefByHash(hash),
18✔
1718
        }
18✔
1719
        client.ntfnQueue.Start()
18✔
1720

18✔
1721
        // This notifies other goroutines that the backlog phase is done.
18✔
1722
        defer close(client.backlogDelivered)
18✔
1723

18✔
1724
        // Always increment by 1 first, and our client ID will start with 1,
18✔
1725
        // not 0.
18✔
1726
        client.id = atomic.AddUint32(&i.nextClientID, 1)
18✔
1727

18✔
1728
        // Before we register this new invoice subscription, we'll launch a new
18✔
1729
        // goroutine that will proxy all notifications appended to the end of
18✔
1730
        // the concurrent queue to the two client-side channels the caller will
18✔
1731
        // feed off of.
18✔
1732
        i.wg.Add(1)
18✔
1733
        go func() {
36✔
1734
                defer i.wg.Done()
18✔
1735
                defer i.deleteClient(client.id)
18✔
1736

18✔
1737
                for {
72✔
1738
                        select {
54✔
1739
                        // A new invoice event has been sent by the
1740
                        // invoiceRegistry. We will dispatch the event to the
1741
                        // client.
1742
                        case ntfn := <-client.ntfnQueue.ChanOut():
36✔
1743
                                invoiceEvent := ntfn.(*invoiceEvent)
36✔
1744

36✔
1745
                                select {
36✔
1746
                                case client.Updates <- invoiceEvent.invoice:
36✔
1747

1748
                                case <-client.cancelChan:
×
1749
                                        return
×
1750

1751
                                case <-i.quit:
×
1752
                                        return
×
1753
                                }
1754

1755
                        case <-client.cancelChan:
18✔
1756
                                return
18✔
1757

1758
                        case <-i.quit:
×
1759
                                return
×
1760
                        }
1761
                }
1762
        }()
1763

1764
        i.notificationClientMux.Lock()
18✔
1765
        i.singleNotificationClients[client.id] = client
18✔
1766
        i.notificationClientMux.Unlock()
18✔
1767

18✔
1768
        err := i.deliverSingleBacklogEvents(ctx, client)
18✔
1769
        if err != nil {
18✔
1770
                return nil, err
×
1771
        }
×
1772

1773
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
18✔
1774
                client.id, client.invoiceRef)
18✔
1775

18✔
1776
        return client, nil
18✔
1777
}
1778

1779
// notifyHodlSubscribers sends out the htlc resolution to all current
1780
// subscribers.
1781
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
891✔
1782
        i.hodlSubscriptionsMux.Lock()
891✔
1783
        defer i.hodlSubscriptionsMux.Unlock()
891✔
1784

891✔
1785
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
891✔
1786
        if !ok {
1,370✔
1787
                return
479✔
1788
        }
479✔
1789

1790
        // Notify all interested subscribers and remove subscription from both
1791
        // maps. The subscription can be removed as there only ever will be a
1792
        // single resolution for each hash.
1793
        for subscriber := range subscribers {
824✔
1794
                select {
412✔
1795
                case subscriber <- htlcResolution:
412✔
1796
                case <-i.quit:
×
1797
                        return
×
1798
                }
1799

1800
                delete(
412✔
1801
                        i.hodlReverseSubscriptions[subscriber],
412✔
1802
                        htlcResolution.CircuitKey(),
412✔
1803
                )
412✔
1804
        }
1805

1806
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
412✔
1807
}
1808

1809
// hodlSubscribe adds a new invoice subscription.
1810
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1811
        circuitKey CircuitKey) {
419✔
1812

419✔
1813
        i.hodlSubscriptionsMux.Lock()
419✔
1814
        defer i.hodlSubscriptionsMux.Unlock()
419✔
1815

419✔
1816
        log.Debugf("Hodl subscribe for %v", circuitKey)
419✔
1817

419✔
1818
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
419✔
1819
        if !ok {
831✔
1820
                subscriptions = make(map[chan<- interface{}]struct{})
412✔
1821
                i.hodlSubscriptions[circuitKey] = subscriptions
412✔
1822
        }
412✔
1823
        subscriptions[subscriber] = struct{}{}
419✔
1824

419✔
1825
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
419✔
1826
        if !ok {
776✔
1827
                reverseSubscriptions = make(map[CircuitKey]struct{})
357✔
1828
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
357✔
1829
        }
357✔
1830
        reverseSubscriptions[circuitKey] = struct{}{}
419✔
1831
}
1832

1833
// HodlUnsubscribeAll cancels the subscription.
1834
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
202✔
1835
        i.hodlSubscriptionsMux.Lock()
202✔
1836
        defer i.hodlSubscriptionsMux.Unlock()
202✔
1837

202✔
1838
        hashes := i.hodlReverseSubscriptions[subscriber]
202✔
1839
        for hash := range hashes {
203✔
1840
                delete(i.hodlSubscriptions[hash], subscriber)
1✔
1841
        }
1✔
1842

1843
        delete(i.hodlReverseSubscriptions, subscriber)
202✔
1844
}
1845

1846
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1847
// useful when we need to iterate the map to send notifications.
1848
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:ll
1,391✔
1849
        i.notificationClientMux.RLock()
1,391✔
1850
        defer i.notificationClientMux.RUnlock()
1,391✔
1851

1,391✔
1852
        clients := make(map[uint32]*SingleInvoiceSubscription)
1,391✔
1853
        for k, v := range i.singleNotificationClients {
1,427✔
1854
                clients[k] = v
36✔
1855
        }
36✔
1856
        return clients
1,391✔
1857
}
1858

1859
// copyClients copies i.notificationClients inside a lock. This is useful when
1860
// we need to iterate the map to send notifications.
1861
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
1,235✔
1862
        i.notificationClientMux.RLock()
1,235✔
1863
        defer i.notificationClientMux.RUnlock()
1,235✔
1864

1,235✔
1865
        clients := make(map[uint32]*InvoiceSubscription)
1,235✔
1866
        for k, v := range i.notificationClients {
1,301✔
1867
                clients[k] = v
66✔
1868
        }
66✔
1869
        return clients
1,235✔
1870
}
1871

1872
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1873
// not found.
1874
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
63✔
1875
        i.notificationClientMux.Lock()
63✔
1876
        defer i.notificationClientMux.Unlock()
63✔
1877

63✔
1878
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
63✔
1879
        delete(i.notificationClients, clientID)
63✔
1880
        delete(i.singleNotificationClients, clientID)
63✔
1881
}
63✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc