• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10207481183

01 Aug 2024 11:52PM UTC coverage: 58.679% (+0.09%) from 58.591%
10207481183

push

github

web-flow
Merge pull request #8836 from hieblmi/payment-failure-reason-cancel

routing: add payment failure reason `FailureReasonCancel`

7 of 30 new or added lines in 5 files covered. (23.33%)

1662 existing lines in 21 files now uncovered.

125454 of 213798 relevant lines covered (58.68%)

28679.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.31
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77
}
78

79
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
80
// mpp htlcs for which the complete set didn't arrive in time.
81
type htlcReleaseEvent struct {
82
        // invoiceRef identifiers the invoice this htlc belongs to.
83
        invoiceRef InvoiceRef
84

85
        // key is the circuit key of the htlc to release.
86
        key CircuitKey
87

88
        // releaseTime is the time at which to release the htlc.
89
        releaseTime time.Time
90
}
91

92
// Less is used to order PriorityQueueItem's by their release time such that
93
// items with the older release time are at the top of the queue.
94
//
95
// NOTE: Part of the queue.PriorityQueueItem interface.
96
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
10✔
97
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
10✔
98
}
10✔
99

100
// InvoiceRegistry is a central registry of all the outstanding invoices
101
// created by the daemon. The registry is a thin wrapper around a map in order
102
// to ensure that all updates/reads are thread safe.
103
type InvoiceRegistry struct {
104
        started atomic.Bool
105
        stopped atomic.Bool
106

107
        sync.RWMutex
108

109
        nextClientID uint32 // must be used atomically
110

111
        idb InvoiceDB
112

113
        // cfg contains the registry's configuration parameters.
114
        cfg *RegistryConfig
115

116
        // notificationClientMux locks notificationClients and
117
        // singleNotificationClients. Using a separate mutex for these maps is
118
        // necessary to avoid deadlocks in the registry when processing invoice
119
        // events.
120
        notificationClientMux sync.RWMutex
121

122
        notificationClients map[uint32]*InvoiceSubscription
123

124
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
125
        // performance.
126
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
127

128
        // invoiceEvents is a single channel over which invoice updates are
129
        // carried.
130
        invoiceEvents chan *invoiceEvent
131

132
        // hodlSubscriptionsMux locks the hodlSubscriptions and
133
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
134
        // necessary to avoid deadlocks in the registry when processing invoice
135
        // events.
136
        hodlSubscriptionsMux sync.RWMutex
137

138
        // hodlSubscriptions is a map from a circuit key to a list of
139
        // subscribers. It is used for efficient notification of links.
140
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
141

142
        // reverseSubscriptions tracks circuit keys subscribed to per
143
        // subscriber. This is used to unsubscribe from all hashes efficiently.
144
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
145

146
        // htlcAutoReleaseChan contains the new htlcs that need to be
147
        // auto-released.
148
        htlcAutoReleaseChan chan *htlcReleaseEvent
149

150
        expiryWatcher *InvoiceExpiryWatcher
151

152
        wg   sync.WaitGroup
153
        quit chan struct{}
154
}
155

156
// NewRegistry creates a new invoice registry. The invoice registry
157
// wraps the persistent on-disk invoice storage with an additional in-memory
158
// layer. The in-memory layer is in place such that debug invoices can be added
159
// which are volatile yet available system wide within the daemon.
160
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
161
        cfg *RegistryConfig) *InvoiceRegistry {
638✔
162

638✔
163
        notificationClients := make(map[uint32]*InvoiceSubscription)
638✔
164
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
638✔
165
        return &InvoiceRegistry{
638✔
166
                idb:                       idb,
638✔
167
                notificationClients:       notificationClients,
638✔
168
                singleNotificationClients: singleNotificationClients,
638✔
169
                invoiceEvents:             make(chan *invoiceEvent, 100),
638✔
170
                hodlSubscriptions: make(
638✔
171
                        map[CircuitKey]map[chan<- interface{}]struct{},
638✔
172
                ),
638✔
173
                hodlReverseSubscriptions: make(
638✔
174
                        map[chan<- interface{}]map[CircuitKey]struct{},
638✔
175
                ),
638✔
176
                cfg:                 cfg,
638✔
177
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
638✔
178
                expiryWatcher:       expiryWatcher,
638✔
179
                quit:                make(chan struct{}),
638✔
180
        }
638✔
181
}
638✔
182

183
// scanInvoicesOnStart will scan all invoices on start and add active invoices
184
// to the invoice expiry watcher while also attempting to delete all canceled
185
// invoices.
186
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
638✔
187
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
638✔
188
        if err != nil {
638✔
UNCOV
189
                return err
×
UNCOV
190
        }
×
191

192
        var pending []invoiceExpiry
638✔
193
        for paymentHash, invoice := range pendingInvoices {
672✔
194
                invoice := invoice
34✔
195
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
34✔
196
                if expiryRef != nil {
68✔
197
                        pending = append(pending, expiryRef)
34✔
198
                }
34✔
199
        }
200

201
        log.Debugf("Adding %d pending invoices to the expiry watcher",
638✔
202
                len(pending))
638✔
203
        i.expiryWatcher.AddInvoices(pending...)
638✔
204

638✔
205
        if i.cfg.GcCanceledInvoicesOnStartup {
641✔
206
                log.Infof("Deleting canceled invoices")
3✔
207
                err = i.idb.DeleteCanceledInvoices(ctx)
3✔
208
                if err != nil {
3✔
UNCOV
209
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
UNCOV
210
                        return err
×
UNCOV
211
                }
×
212
        }
213

214
        return nil
638✔
215
}
216

217
// Start starts the registry and all goroutines it needs to carry out its task.
218
func (i *InvoiceRegistry) Start() error {
638✔
219
        var err error
638✔
220

638✔
221
        log.Info("InvoiceRegistry starting...")
638✔
222

638✔
223
        if i.started.Swap(true) {
638✔
UNCOV
224
                return fmt.Errorf("InvoiceRegistry started more than once")
×
UNCOV
225
        }
×
226
        // Start InvoiceExpiryWatcher and prepopulate it with existing
227
        // active invoices.
228
        err = i.expiryWatcher.Start(
638✔
229
                func(hash lntypes.Hash, force bool) error {
715✔
230
                        return i.cancelInvoiceImpl(
77✔
231
                                context.Background(), hash, force,
77✔
232
                        )
77✔
233
                })
77✔
234
        if err != nil {
638✔
235
                return err
×
236
        }
×
237

238
        i.wg.Add(1)
638✔
239
        go i.invoiceEventLoop()
638✔
240

638✔
241
        // Now scan all pending and removable invoices to the expiry
638✔
242
        // watcher or delete them.
638✔
243
        err = i.scanInvoicesOnStart(context.Background())
638✔
244
        if err != nil {
638✔
UNCOV
245
                _ = i.Stop()
×
UNCOV
246
        }
×
247

248
        log.Debug("InvoiceRegistry started")
638✔
249

638✔
250
        return err
638✔
251
}
252

253
// Stop signals the registry for a graceful shutdown.
254
func (i *InvoiceRegistry) Stop() error {
379✔
255
        log.Info("InvoiceRegistry shutting down...")
379✔
256

379✔
257
        if i.stopped.Swap(true) {
379✔
UNCOV
258
                return fmt.Errorf("InvoiceRegistry stopped more than once")
×
UNCOV
259
        }
×
260

261
        log.Info("InvoiceRegistry shutting down...")
379✔
262
        defer log.Debug("InvoiceRegistry shutdown complete")
379✔
263

379✔
264
        var err error
379✔
265
        if i.expiryWatcher == nil {
379✔
UNCOV
266
                err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
×
UNCOV
267
                        "initialized")
×
268
        } else {
379✔
269
                i.expiryWatcher.Stop()
379✔
270
        }
379✔
271

272
        close(i.quit)
379✔
273

379✔
274
        i.wg.Wait()
379✔
275

379✔
276
        log.Debug("InvoiceRegistry shutdown complete")
379✔
277

379✔
278
        return err
379✔
279
}
280

281
// invoiceEvent represents a new event that has modified on invoice on disk.
282
// Only two event types are currently supported: newly created invoices, and
283
// instance where invoices are settled.
284
type invoiceEvent struct {
285
        hash    lntypes.Hash
286
        invoice *Invoice
287
        setID   *[32]byte
288
}
289

290
// tickAt returns a channel that ticks at the specified time. If the time has
291
// already passed, it will tick immediately.
292
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
655✔
293
        now := i.cfg.Clock.Now()
655✔
294
        return i.cfg.Clock.TickAfter(t.Sub(now))
655✔
295
}
655✔
296

297
// invoiceEventLoop is the dedicated goroutine responsible for accepting
298
// new notification subscriptions, cancelling old subscriptions, and
299
// dispatching new invoice events.
300
func (i *InvoiceRegistry) invoiceEventLoop() {
638✔
301
        defer i.wg.Done()
638✔
302

638✔
303
        // Set up a heap for htlc auto-releases.
638✔
304
        autoReleaseHeap := &queue.PriorityQueue{}
638✔
305

638✔
306
        for {
4,306✔
307
                // If there is something to release, set up a release tick
3,668✔
308
                // channel.
3,668✔
309
                var nextReleaseTick <-chan time.Time
3,668✔
310
                if autoReleaseHeap.Len() > 0 {
4,323✔
311
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
655✔
312
                        nextReleaseTick = i.tickAt(head.releaseTime)
655✔
313
                }
655✔
314

315
                select {
3,668✔
316
                // A sub-systems has just modified the invoice state, so we'll
317
                // dispatch notifications to all registered clients.
318
                case event := <-i.invoiceEvents:
2,692✔
319
                        // For backwards compatibility, do not notify all
2,692✔
320
                        // invoice subscribers of cancel and accept events.
2,692✔
321
                        state := event.invoice.State
2,692✔
322
                        if state != ContractCanceled &&
2,692✔
323
                                state != ContractAccepted {
4,795✔
324

2,103✔
325
                                i.dispatchToClients(event)
2,103✔
326
                        }
2,103✔
327
                        i.dispatchToSingleClients(event)
2,692✔
328

329
                // A new htlc came in for auto-release.
330
                case event := <-i.htlcAutoReleaseChan:
334✔
331
                        log.Debugf("Scheduling auto-release for htlc: "+
334✔
332
                                "ref=%v, key=%v at %v",
334✔
333
                                event.invoiceRef, event.key, event.releaseTime)
334✔
334

334✔
335
                        // We use an independent timer for every htlc rather
334✔
336
                        // than a set timer that is reset with every htlc coming
334✔
337
                        // in. Otherwise the sender could keep resetting the
334✔
338
                        // timer until the broadcast window is entered and our
334✔
339
                        // channel is force closed.
334✔
340
                        autoReleaseHeap.Push(event)
334✔
341

342
                // The htlc at the top of the heap needs to be auto-released.
343
                case <-nextReleaseTick:
12✔
344
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
12✔
345
                        err := i.cancelSingleHtlc(
12✔
346
                                event.invoiceRef, event.key, ResultMppTimeout,
12✔
347
                        )
12✔
348
                        if err != nil {
12✔
UNCOV
349
                                log.Errorf("HTLC timer: %v", err)
×
UNCOV
350
                        }
×
351

352
                case <-i.quit:
379✔
353
                        return
379✔
354
                }
355
        }
356
}
357

358
// dispatchToSingleClients passes the supplied event to all notification
359
// clients that subscribed to all the invoice this event applies to.
360
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
2,692✔
361
        // Dispatch to single invoice subscribers.
2,692✔
362
        clients := i.copySingleClients()
2,692✔
363
        for _, client := range clients {
2,732✔
364
                payHash := client.invoiceRef.PayHash()
40✔
365

40✔
366
                if payHash == nil || *payHash != event.hash {
44✔
367
                        continue
4✔
368
                }
369

370
                select {
40✔
371
                case <-client.backlogDelivered:
40✔
372
                        // We won't deliver any events until the backlog has
373
                        // went through first.
374
                case <-i.quit:
×
375
                        return
×
376
                }
377

378
                client.notify(event)
40✔
379
        }
380
}
381

382
// dispatchToClients passes the supplied event to all notification clients that
383
// subscribed to all invoices. Add and settle indices are used to make sure
384
// that clients don't receive duplicate or unwanted events.
385
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
2,103✔
386
        invoice := event.invoice
2,103✔
387

2,103✔
388
        clients := i.copyClients()
2,103✔
389
        for clientID, client := range clients {
2,173✔
390
                // Before we dispatch this event, we'll check
70✔
391
                // to ensure that this client hasn't already
70✔
392
                // received this notification in order to
70✔
393
                // ensure we don't duplicate any events.
70✔
394

70✔
395
                // TODO(joostjager): Refactor switches.
70✔
396
                state := event.invoice.State
70✔
397
                switch {
70✔
398
                // If we've already sent this settle event to
399
                // the client, then we can skip this.
400
                case state == ContractSettled &&
UNCOV
401
                        client.settleIndex >= invoice.SettleIndex:
×
UNCOV
402
                        continue
×
403

404
                // Similarly, if we've already sent this add to
405
                // the client then we can skip this one, but only if this isn't
406
                // an AMP invoice. AMP invoices always remain in the settle
407
                // state as a base invoice.
408
                case event.setID == nil && state == ContractOpen &&
409
                        client.addIndex >= invoice.AddIndex:
×
410
                        continue
×
411

412
                // These two states should never happen, but we
413
                // log them just in case so we can detect this
414
                // instance.
415
                case state == ContractOpen &&
416
                        client.addIndex+1 != invoice.AddIndex:
11✔
417
                        log.Warnf("client=%v for invoice "+
11✔
418
                                "notifications missed an update, "+
11✔
419
                                "add_index=%v, new add event index=%v",
11✔
420
                                clientID, client.addIndex,
11✔
421
                                invoice.AddIndex)
11✔
422

423
                case state == ContractSettled &&
424
                        client.settleIndex+1 != invoice.SettleIndex:
4✔
425
                        log.Warnf("client=%v for invoice "+
4✔
426
                                "notifications missed an update, "+
4✔
427
                                "settle_index=%v, new settle event index=%v",
4✔
428
                                clientID, client.settleIndex,
4✔
429
                                invoice.SettleIndex)
4✔
430
                }
431

432
                select {
70✔
433
                case <-client.backlogDelivered:
70✔
434
                        // We won't deliver any events until the backlog has
435
                        // been processed.
UNCOV
436
                case <-i.quit:
×
UNCOV
437
                        return
×
438
                }
439

440
                err := client.notify(&invoiceEvent{
70✔
441
                        invoice: invoice,
70✔
442
                        setID:   event.setID,
70✔
443
                })
70✔
444
                if err != nil {
70✔
445
                        log.Errorf("Failed dispatching to client: %v", err)
×
UNCOV
446
                        return
×
UNCOV
447
                }
×
448

449
                // Each time we send a notification to a client, we'll record
450
                // the latest add/settle index it has. We'll use this to ensure
451
                // we don't send a notification twice, which can happen if a new
452
                // event is added while we're catching up a new client.
453
                invState := event.invoice.State
70✔
454
                switch {
70✔
455
                case invState == ContractSettled:
22✔
456
                        client.settleIndex = invoice.SettleIndex
22✔
457

458
                case invState == ContractOpen && event.setID == nil:
46✔
459
                        client.addIndex = invoice.AddIndex
46✔
460

461
                // If this is an AMP invoice, then we'll need to use the set ID
462
                // to keep track of the settle index of the client. AMP
463
                // invoices never go to the open state, but if a setID is
464
                // passed, then we know it was just settled and will track the
465
                // highest settle index so far.
466
                case invState == ContractOpen && event.setID != nil:
10✔
467
                        setID := *event.setID
10✔
468
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
10✔
469

UNCOV
470
                default:
×
UNCOV
471
                        log.Errorf("unexpected invoice state: %v",
×
UNCOV
472
                                event.invoice.State)
×
473
                }
474
        }
475
}
476

477
// deliverBacklogEvents will attempts to query the invoice database for any
478
// notifications that the client has missed since it reconnected last.
479
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
480
        client *InvoiceSubscription) error {
49✔
481

49✔
482
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
49✔
483
        if err != nil {
49✔
UNCOV
484
                return err
×
UNCOV
485
        }
×
486

487
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
49✔
488
        if err != nil {
49✔
UNCOV
489
                return err
×
UNCOV
490
        }
×
491

492
        // If we have any to deliver, then we'll append them to the end of the
493
        // notification queue in order to catch up the client before delivering
494
        // any new notifications.
495
        for _, addEvent := range addEvents {
53✔
496
                // We re-bind the loop variable to ensure we don't hold onto
4✔
497
                // the loop reference causing is to point to the same item.
4✔
498
                addEvent := addEvent
4✔
499

4✔
500
                select {
4✔
501
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
502
                        invoice: &addEvent,
503
                }:
4✔
UNCOV
504
                case <-i.quit:
×
UNCOV
505
                        return ErrShuttingDown
×
506
                }
507
        }
508

509
        for _, settleEvent := range settleEvents {
53✔
510
                // We re-bind the loop variable to ensure we don't hold onto
4✔
511
                // the loop reference causing is to point to the same item.
4✔
512
                settleEvent := settleEvent
4✔
513

4✔
514
                select {
4✔
515
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
516
                        invoice: &settleEvent,
517
                }:
4✔
518
                case <-i.quit:
×
UNCOV
519
                        return ErrShuttingDown
×
520
                }
521
        }
522

523
        return nil
49✔
524
}
525

526
// deliverSingleBacklogEvents will attempt to query the invoice database to
527
// retrieve the current invoice state and deliver this to the subscriber. Single
528
// invoice subscribers will always receive the current state right after
529
// subscribing. Only in case the invoice does not yet exist, nothing is sent
530
// yet.
531
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
532
        client *SingleInvoiceSubscription) error {
22✔
533

22✔
534
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
22✔
535

22✔
536
        // It is possible that the invoice does not exist yet, but the client is
22✔
537
        // already watching it in anticipation.
22✔
538
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
22✔
539
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
22✔
540
        if isNotFound || isNotCreated {
44✔
541
                return nil
22✔
542
        }
22✔
543
        if err != nil {
4✔
UNCOV
544
                return err
×
UNCOV
545
        }
×
546

547
        payHash := client.invoiceRef.PayHash()
4✔
548
        if payHash == nil {
4✔
UNCOV
549
                return nil
×
UNCOV
550
        }
×
551

552
        err = client.notify(&invoiceEvent{
4✔
553
                hash:    *payHash,
4✔
554
                invoice: &invoice,
4✔
555
        })
4✔
556
        if err != nil {
4✔
UNCOV
557
                return err
×
UNCOV
558
        }
×
559

560
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
4✔
561
                client.id, payHash)
4✔
562

4✔
563
        return nil
4✔
564
}
565

566
// AddInvoice adds a regular invoice for the specified amount, identified by
567
// the passed preimage. Additionally, any memo or receipt data provided will
568
// also be stored on-disk. Once this invoice is added, subsystems within the
569
// daemon add/forward HTLCs are able to obtain the proper preimage required for
570
// redemption in the case that we're the final destination. We also return the
571
// addIndex of the newly created invoice which monotonically increases for each
572
// new invoice added.  A side effect of this function is that it also sets
573
// AddIndex on the invoice argument.
574
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
575
        paymentHash lntypes.Hash) (uint64, error) {
1,156✔
576

1,156✔
577
        i.Lock()
1,156✔
578

1,156✔
579
        ref := InvoiceRefByHash(paymentHash)
1,156✔
580
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
1,156✔
581

1,156✔
582
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
1,156✔
583
        if err != nil {
1,175✔
584
                i.Unlock()
19✔
585
                return 0, err
19✔
586
        }
19✔
587

588
        // Now that we've added the invoice, we'll send dispatch a message to
589
        // notify the clients of this new invoice.
590
        i.notifyClients(paymentHash, invoice, nil)
1,141✔
591
        i.Unlock()
1,141✔
592

1,141✔
593
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
1,141✔
594
        // to avoid deadlock when a new invoice is added while an other is being
1,141✔
595
        // canceled.
1,141✔
596
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
1,141✔
597
        if invoiceExpiryRef != nil {
2,282✔
598
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
1,141✔
599
        }
1,141✔
600

601
        return addIndex, nil
1,141✔
602
}
603

604
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
605
// then we're able to pull the funds pending within an HTLC.
606
//
607
// TODO(roasbeef): ignore if settled?
608
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
609
        rHash lntypes.Hash) (Invoice, error) {
399✔
610

399✔
611
        // We'll check the database to see if there's an existing matching
399✔
612
        // invoice.
399✔
613
        ref := InvoiceRefByHash(rHash)
399✔
614
        return i.idb.LookupInvoice(ctx, ref)
399✔
615
}
399✔
616

617
// LookupInvoiceByRef looks up an invoice by the given reference, if found
618
// then we're able to pull the funds pending within an HTLC.
619
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
620
        ref InvoiceRef) (Invoice, error) {
4✔
621

4✔
622
        return i.idb.LookupInvoice(ctx, ref)
4✔
623
}
4✔
624

625
// startHtlcTimer starts a new timer via the invoice registry main loop that
626
// cancels a single htlc on an invoice when the htlc hold duration has passed.
627
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
628
        key CircuitKey, acceptTime time.Time) error {
334✔
629

334✔
630
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
334✔
631
        event := &htlcReleaseEvent{
334✔
632
                invoiceRef:  invoiceRef,
334✔
633
                key:         key,
334✔
634
                releaseTime: releaseTime,
334✔
635
        }
334✔
636

334✔
637
        select {
334✔
638
        case i.htlcAutoReleaseChan <- event:
334✔
639
                return nil
334✔
640

641
        case <-i.quit:
×
642
                return ErrShuttingDown
×
643
        }
644
}
645

646
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
647
// a resolution result which will be used to notify subscribed links and
648
// resolvers of the details of the htlc cancellation.
649
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
650
        key CircuitKey, result FailResolutionResult) error {
12✔
651

12✔
652
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
24✔
653
                // Only allow individual htlc cancellation on open invoices.
12✔
654
                if invoice.State != ContractOpen {
18✔
655
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
6✔
656
                                "open", invoiceRef)
6✔
657

6✔
658
                        return nil, nil
6✔
659
                }
6✔
660

661
                // Lookup the current status of the htlc in the database.
662
                var (
6✔
663
                        htlcState HtlcState
6✔
664
                        setID     *SetID
6✔
665
                )
6✔
666
                htlc, ok := invoice.Htlcs[key]
6✔
667
                if !ok {
6✔
668
                        // If this is an AMP invoice, then all the HTLCs won't
×
669
                        // be read out, so we'll consult the other mapping to
×
670
                        // try to find the HTLC state in question here.
×
671
                        var found bool
×
672
                        for ampSetID, htlcSet := range invoice.AMPState {
×
UNCOV
673
                                ampSetID := ampSetID
×
UNCOV
674
                                for htlcKey := range htlcSet.InvoiceKeys {
×
UNCOV
675
                                        if htlcKey == key {
×
UNCOV
676
                                                htlcState = htlcSet.State
×
UNCOV
677
                                                setID = &ampSetID
×
UNCOV
678

×
UNCOV
679
                                                found = true
×
UNCOV
680
                                                break
×
681
                                        }
682
                                }
683
                        }
684

UNCOV
685
                        if !found {
×
UNCOV
686
                                return nil, fmt.Errorf("htlc %v not found", key)
×
UNCOV
687
                        }
×
688
                } else {
6✔
689
                        htlcState = htlc.State
6✔
690
                }
6✔
691

692
                // Cancellation is only possible if the htlc wasn't already
693
                // resolved.
694
                if htlcState != HtlcStateAccepted {
6✔
UNCOV
695
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
UNCOV
696
                                "is already resolved", key, invoiceRef)
×
UNCOV
697

×
UNCOV
698
                        return nil, nil
×
UNCOV
699
                }
×
700

701
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
6✔
702
                        key, invoiceRef)
6✔
703

6✔
704
                // Return an update descriptor that cancels htlc and keeps
6✔
705
                // invoice open.
6✔
706
                canceledHtlcs := map[CircuitKey]struct{}{
6✔
707
                        key: {},
6✔
708
                }
6✔
709

6✔
710
                return &InvoiceUpdateDesc{
6✔
711
                        UpdateType:  CancelHTLCsUpdate,
6✔
712
                        CancelHtlcs: canceledHtlcs,
6✔
713
                        SetID:       setID,
6✔
714
                }, nil
6✔
715
        }
716

717
        // Try to mark the specified htlc as canceled in the invoice database.
718
        // Intercept the update descriptor to set the local updated variable. If
719
        // no invoice update is performed, we can return early.
720
        setID := (*SetID)(invoiceRef.SetID())
12✔
721
        var updated bool
12✔
722
        invoice, err := i.idb.UpdateInvoice(
12✔
723
                context.Background(), invoiceRef, setID,
12✔
724
                func(invoice *Invoice) (
12✔
725
                        *InvoiceUpdateDesc, error) {
24✔
726

12✔
727
                        updateDesc, err := updateInvoice(invoice)
12✔
728
                        if err != nil {
12✔
UNCOV
729
                                return nil, err
×
UNCOV
730
                        }
×
731
                        updated = updateDesc != nil
12✔
732

12✔
733
                        return updateDesc, err
12✔
734
                },
735
        )
736
        if err != nil {
12✔
UNCOV
737
                return err
×
UNCOV
738
        }
×
739
        if !updated {
18✔
740
                return nil
6✔
741
        }
6✔
742

743
        // The invoice has been updated. Notify subscribers of the htlc
744
        // resolution.
745
        htlc, ok := invoice.Htlcs[key]
6✔
746
        if !ok {
6✔
747
                return fmt.Errorf("htlc %v not found", key)
×
748
        }
×
749
        if htlc.State == HtlcStateCanceled {
12✔
750
                resolution := NewFailResolution(
6✔
751
                        key, int32(htlc.AcceptHeight), result,
6✔
752
                )
6✔
753

6✔
754
                i.notifyHodlSubscribers(resolution)
6✔
755
        }
6✔
756
        return nil
6✔
757
}
758

759
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
760
// htlc.
761
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
22✔
762
        // Retrieve keysend record if present.
22✔
763
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
22✔
764
        if !ok {
26✔
765
                return nil
4✔
766
        }
4✔
767

768
        // Cancel htlc is preimage is invalid.
769
        preimage, err := lntypes.MakePreimage(preimageSlice)
22✔
770
        if err != nil {
25✔
771
                return err
3✔
772
        }
3✔
773
        if preimage.Hash() != ctx.hash {
19✔
UNCOV
774
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
UNCOV
775
                        preimage, ctx.hash)
×
UNCOV
776
        }
×
777

778
        // Only allow keysend for non-mpp payments.
779
        if ctx.mpp != nil {
19✔
UNCOV
780
                return errors.New("no mpp keysend supported")
×
UNCOV
781
        }
×
782

783
        // Create an invoice for the htlc amount.
784
        amt := ctx.amtPaid
19✔
785

19✔
786
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
19✔
787
        // be able to pay to it with keysend.
19✔
788
        rawFeatures := lnwire.NewRawFeatureVector(
19✔
789
                lnwire.TLVOnionPayloadRequired,
19✔
790
        )
19✔
791
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
19✔
792

19✔
793
        // Use the minimum block delta that we require for settling htlcs.
19✔
794
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
19✔
795

19✔
796
        // Pre-check expiry here to prevent inserting an invoice that will not
19✔
797
        // be settled.
19✔
798
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
19✔
UNCOV
799
                return errors.New("final expiry too soon")
×
UNCOV
800
        }
×
801

802
        // The invoice database indexes all invoices by payment address, however
803
        // legacy keysend payment do not have one. In order to avoid a new
804
        // payment type on-disk wrt. to indexing, we'll continue to insert a
805
        // blank payment address which is special cased in the insertion logic
806
        // to not be indexed. In the future, once AMP is merged, this should be
807
        // replaced by generating a random payment address on the behalf of the
808
        // sender.
809
        payAddr := BlankPayAddr
19✔
810

19✔
811
        // Create placeholder invoice.
19✔
812
        invoice := &Invoice{
19✔
813
                CreationDate: i.cfg.Clock.Now(),
19✔
814
                Terms: ContractTerm{
19✔
815
                        FinalCltvDelta:  finalCltvDelta,
19✔
816
                        Value:           amt,
19✔
817
                        PaymentPreimage: &preimage,
19✔
818
                        PaymentAddr:     payAddr,
19✔
819
                        Features:        features,
19✔
820
                },
19✔
821
        }
19✔
822

19✔
823
        if i.cfg.KeysendHoldTime != 0 {
25✔
824
                invoice.HodlInvoice = true
6✔
825
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
6✔
826
        }
6✔
827

828
        // Insert invoice into database. Ignore duplicates, because this
829
        // may be a replay.
830
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
19✔
831
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
19✔
UNCOV
832
                return err
×
UNCOV
833
        }
×
834

835
        return nil
19✔
836
}
837

838
// processAMP just-in-time inserts an invoice if this htlc is a keysend
839
// htlc.
840
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
31✔
841
        // AMP payments MUST also include an MPP record.
31✔
842
        if ctx.mpp == nil {
34✔
843
                return errors.New("no MPP record for AMP")
3✔
844
        }
3✔
845

846
        // Create an invoice for the total amount expected, provided in the MPP
847
        // record.
848
        amt := ctx.mpp.TotalMsat()
28✔
849

28✔
850
        // Set the TLV required and MPP optional features on the invoice. We'll
28✔
851
        // also make the AMP features required so that it can't be paid by
28✔
852
        // legacy or MPP htlcs.
28✔
853
        rawFeatures := lnwire.NewRawFeatureVector(
28✔
854
                lnwire.TLVOnionPayloadRequired,
28✔
855
                lnwire.PaymentAddrOptional,
28✔
856
                lnwire.AMPRequired,
28✔
857
        )
28✔
858
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
28✔
859

28✔
860
        // Use the minimum block delta that we require for settling htlcs.
28✔
861
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
28✔
862

28✔
863
        // Pre-check expiry here to prevent inserting an invoice that will not
28✔
864
        // be settled.
28✔
865
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
28✔
UNCOV
866
                return errors.New("final expiry too soon")
×
UNCOV
867
        }
×
868

869
        // We'll use the sender-generated payment address provided in the HTLC
870
        // to create our AMP invoice.
871
        payAddr := ctx.mpp.PaymentAddr()
28✔
872

28✔
873
        // Create placeholder invoice.
28✔
874
        invoice := &Invoice{
28✔
875
                CreationDate: i.cfg.Clock.Now(),
28✔
876
                Terms: ContractTerm{
28✔
877
                        FinalCltvDelta:  finalCltvDelta,
28✔
878
                        Value:           amt,
28✔
879
                        PaymentPreimage: nil,
28✔
880
                        PaymentAddr:     payAddr,
28✔
881
                        Features:        features,
28✔
882
                },
28✔
883
        }
28✔
884

28✔
885
        // Insert invoice into database. Ignore duplicates payment hashes and
28✔
886
        // payment addrs, this may be a replay or a different HTLC for the AMP
28✔
887
        // invoice.
28✔
888
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
28✔
889
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
28✔
890
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
28✔
891
        switch {
28✔
892
        case isDuplicatedInvoice || isDuplicatedPayAddr:
16✔
893
                return nil
16✔
894
        default:
16✔
895
                return err
16✔
896
        }
897
}
898

899
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
900
// describes how the htlc should be resolved.
901
//
902
// When the preimage of the invoice is not yet known (hodl invoice), this
903
// function moves the invoice to the accepted state. When SettleHoldInvoice is
904
// called later, a resolution message will be send back to the caller via the
905
// provided hodlChan. Invoice registry sends on this channel what action needs
906
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
907
// the channel is either buffered or received on from another goroutine to
908
// prevent deadlock.
909
//
910
// In the case that the htlc is part of a larger set of htlcs that pay to the
911
// same invoice (multi-path payment), the htlc is held until the set is
912
// complete. If the set doesn't fully arrive in time, a timer will cancel the
913
// held htlc.
914
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
915
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
916
        circuitKey CircuitKey, hodlChan chan<- interface{},
917
        payload Payload) (HtlcResolution, error) {
1,383✔
918

1,383✔
919
        // Create the update context containing the relevant details of the
1,383✔
920
        // incoming htlc.
1,383✔
921
        ctx := invoiceUpdateCtx{
1,383✔
922
                hash:                 rHash,
1,383✔
923
                circuitKey:           circuitKey,
1,383✔
924
                amtPaid:              amtPaid,
1,383✔
925
                expiry:               expiry,
1,383✔
926
                currentHeight:        currentHeight,
1,383✔
927
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
1,383✔
928
                customRecords:        payload.CustomRecords(),
1,383✔
929
                mpp:                  payload.MultiPath(),
1,383✔
930
                amp:                  payload.AMPRecord(),
1,383✔
931
                metadata:             payload.Metadata(),
1,383✔
932
                pathID:               payload.PathID(),
1,383✔
933
                totalAmtMsat:         payload.TotalAmtMsat(),
1,383✔
934
        }
1,383✔
935

1,383✔
936
        switch {
1,383✔
937
        // If we are accepting spontaneous AMP payments and this payload
938
        // contains an AMP record, create an AMP invoice that will be settled
939
        // below.
940
        case i.cfg.AcceptAMP && ctx.amp != nil:
31✔
941
                err := i.processAMP(ctx)
31✔
942
                if err != nil {
34✔
943
                        ctx.log(fmt.Sprintf("amp error: %v", err))
3✔
944

3✔
945
                        return NewFailResolution(
3✔
946
                                circuitKey, currentHeight, ResultAmpError,
3✔
947
                        ), nil
3✔
948
                }
3✔
949

950
        // If we are accepting spontaneous keysend payments, create a regular
951
        // invoice that will be settled below. We also enforce that this is only
952
        // done when no AMP payload is present since it will only be settle-able
953
        // by regular HTLCs.
954
        case i.cfg.AcceptKeySend && ctx.amp == nil:
22✔
955
                err := i.processKeySend(ctx)
22✔
956
                if err != nil {
25✔
957
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
3✔
958

3✔
959
                        return NewFailResolution(
3✔
960
                                circuitKey, currentHeight, ResultKeySendError,
3✔
961
                        ), nil
3✔
962
                }
3✔
963
        }
964

965
        // Execute locked notify exit hop logic.
966
        i.Lock()
1,377✔
967
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
1,377✔
968
                &ctx, hodlChan,
1,377✔
969
        )
1,377✔
970
        i.Unlock()
1,377✔
971
        if err != nil {
1,377✔
UNCOV
972
                return nil, err
×
UNCOV
973
        }
×
974

975
        if invoiceToExpire != nil {
1,896✔
976
                i.expiryWatcher.AddInvoices(invoiceToExpire)
519✔
977
        }
519✔
978

979
        switch r := resolution.(type) {
1,377✔
980
        // The htlc is held. Start a timer outside the lock if the htlc should
981
        // be auto-released, because otherwise a deadlock may happen with the
982
        // main event loop.
983
        case *htlcAcceptResolution:
856✔
984
                if r.autoRelease {
1,190✔
985
                        var invRef InvoiceRef
334✔
986
                        if ctx.amp != nil {
350✔
987
                                invRef = InvoiceRefBySetID(*ctx.setID())
16✔
988
                        } else {
338✔
989
                                invRef = ctx.invoiceRef()
322✔
990
                        }
322✔
991

992
                        err := i.startHtlcTimer(
334✔
993
                                invRef, circuitKey, r.acceptTime,
334✔
994
                        )
334✔
995
                        if err != nil {
334✔
UNCOV
996
                                return nil, err
×
UNCOV
997
                        }
×
998
                }
999

1000
                // We return a nil resolution because htlc acceptances are
1001
                // represented as nil resolutions externally.
1002
                // TODO(carla) update calling code to handle accept resolutions.
1003
                return nil, nil
856✔
1004

1005
        // A direct resolution was received for this htlc.
1006
        case HtlcResolution:
525✔
1007
                return r, nil
525✔
1008

1009
        // Fail if an unknown resolution type was received.
UNCOV
1010
        default:
×
UNCOV
1011
                return nil, errors.New("invalid resolution type")
×
1012
        }
1013
}
1014

1015
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
1016
// that should be executed inside the registry lock. The returned invoiceExpiry
1017
// (if not nil) needs to be added to the expiry watcher outside of the lock.
1018
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
1019
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
1020
        HtlcResolution, invoiceExpiry, error) {
1,377✔
1021

1,377✔
1022
        // We'll attempt to settle an invoice matching this rHash on disk (if
1,377✔
1023
        // one exists). The callback will update the invoice state and/or htlcs.
1,377✔
1024
        var (
1,377✔
1025
                resolution        HtlcResolution
1,377✔
1026
                updateSubscribers bool
1,377✔
1027
        )
1,377✔
1028

1,377✔
1029
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
2,732✔
1030
                updateDesc, res, err := updateInvoice(ctx, inv)
1,355✔
1031
                if err != nil {
1,355✔
UNCOV
1032
                        return nil, err
×
UNCOV
1033
                }
×
1034

1035
                // Only send an update if the invoice state was changed.
1036
                updateSubscribers = updateDesc != nil &&
1,355✔
1037
                        updateDesc.State != nil
1,355✔
1038

1,355✔
1039
                // Assign resolution to outer scope variable.
1,355✔
1040
                resolution = res
1,355✔
1041

1,355✔
1042
                return updateDesc, nil
1,355✔
1043
        }
1044

1045
        invoiceRef := ctx.invoiceRef()
1,377✔
1046
        setID := (*SetID)(ctx.setID())
1,377✔
1047
        invoice, err := i.idb.UpdateInvoice(
1,377✔
1048
                context.Background(), invoiceRef, setID, callback,
1,377✔
1049
        )
1,377✔
1050

1,377✔
1051
        var duplicateSetIDErr ErrDuplicateSetID
1,377✔
1052
        if errors.As(err, &duplicateSetIDErr) {
1,377✔
UNCOV
1053
                return NewFailResolution(
×
UNCOV
1054
                        ctx.circuitKey, ctx.currentHeight,
×
UNCOV
1055
                        ResultInvoiceNotFound,
×
UNCOV
1056
                ), nil, nil
×
UNCOV
1057
        }
×
1058

1059
        switch err {
1,377✔
1060
        case ErrInvoiceNotFound:
26✔
1061
                // If the invoice was not found, return a failure resolution
26✔
1062
                // with an invoice not found result.
26✔
1063
                return NewFailResolution(
26✔
1064
                        ctx.circuitKey, ctx.currentHeight,
26✔
1065
                        ResultInvoiceNotFound,
26✔
1066
                ), nil, nil
26✔
1067

UNCOV
1068
        case ErrInvRefEquivocation:
×
UNCOV
1069
                return NewFailResolution(
×
UNCOV
1070
                        ctx.circuitKey, ctx.currentHeight,
×
UNCOV
1071
                        ResultInvoiceNotFound,
×
UNCOV
1072
                ), nil, nil
×
1073

1074
        case nil:
1,355✔
1075

UNCOV
1076
        default:
×
UNCOV
1077
                ctx.log(err.Error())
×
UNCOV
1078
                return nil, nil, err
×
1079
        }
1080

1081
        var invoiceToExpire invoiceExpiry
1,355✔
1082

1,355✔
1083
        switch res := resolution.(type) {
1,355✔
1084
        case *HtlcFailResolution:
30✔
1085
                // Inspect latest htlc state on the invoice. If it is found,
30✔
1086
                // we will update the accept height as it was recorded in the
30✔
1087
                // invoice database (which occurs in the case where the htlc
30✔
1088
                // reached the database in a previous call). If the htlc was
30✔
1089
                // not found on the invoice, it was immediately failed so we
30✔
1090
                // send the failure resolution as is, which has the current
30✔
1091
                // height set as the accept height.
30✔
1092
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
30✔
1093
                if ok {
37✔
1094
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
7✔
1095
                }
7✔
1096

1097
                ctx.log(fmt.Sprintf("failure resolution result "+
30✔
1098
                        "outcome: %v, at accept height: %v",
30✔
1099
                        res.Outcome, res.AcceptHeight))
30✔
1100

30✔
1101
                // Some failures apply to the entire HTLC set. Break here if
30✔
1102
                // this isn't one of them.
30✔
1103
                if !res.Outcome.IsSetFailure() {
54✔
1104
                        break
24✔
1105
                }
1106

1107
                // Also cancel any HTLCs in the HTLC set that are also in the
1108
                // canceled state with the same failure result.
1109
                setID := ctx.setID()
6✔
1110
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
6✔
1111
                for key, htlc := range canceledHtlcSet {
12✔
1112
                        htlcFailResolution := NewFailResolution(
6✔
1113
                                key, int32(htlc.AcceptHeight), res.Outcome,
6✔
1114
                        )
6✔
1115

6✔
1116
                        i.notifyHodlSubscribers(htlcFailResolution)
6✔
1117
                }
6✔
1118

1119
        // If the htlc was settled, we will settle any previously accepted
1120
        // htlcs and notify our peer to settle them.
1121
        case *HtlcSettleResolution:
477✔
1122
                ctx.log(fmt.Sprintf("settle resolution result "+
477✔
1123
                        "outcome: %v, at accept height: %v",
477✔
1124
                        res.Outcome, res.AcceptHeight))
477✔
1125

477✔
1126
                // Also settle any previously accepted htlcs. If a htlc is
477✔
1127
                // marked as settled, we should follow now and settle the htlc
477✔
1128
                // with our peer.
477✔
1129
                setID := ctx.setID()
477✔
1130
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
477✔
1131
                for key, htlc := range settledHtlcSet {
1,266✔
1132
                        preimage := res.Preimage
789✔
1133
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
805✔
1134
                                preimage = *htlc.AMP.Preimage
16✔
1135
                        }
16✔
1136

1137
                        // Notify subscribers that the htlcs should be settled
1138
                        // with our peer. Note that the outcome of the
1139
                        // resolution is set based on the outcome of the single
1140
                        // htlc that we just settled, so may not be accurate
1141
                        // for all htlcs.
1142
                        htlcSettleResolution := NewSettleResolution(
789✔
1143
                                preimage, key,
789✔
1144
                                int32(htlc.AcceptHeight), res.Outcome,
789✔
1145
                        )
789✔
1146

789✔
1147
                        // Notify subscribers that the htlc should be settled
789✔
1148
                        // with our peer.
789✔
1149
                        i.notifyHodlSubscribers(htlcSettleResolution)
789✔
1150
                }
1151

1152
                // If concurrent payments were attempted to this invoice before
1153
                // the current one was ultimately settled, cancel back any of
1154
                // the HTLCs immediately. As a result of the settle, the HTLCs
1155
                // in other HTLC sets are automatically converted to a canceled
1156
                // state when updating the invoice.
1157
                //
1158
                // TODO(roasbeef): can remove now??
1159
                canceledHtlcSet := invoice.HTLCSetCompliment(
477✔
1160
                        setID, HtlcStateCanceled,
477✔
1161
                )
477✔
1162
                for key, htlc := range canceledHtlcSet {
477✔
UNCOV
1163
                        htlcFailResolution := NewFailResolution(
×
UNCOV
1164
                                key, int32(htlc.AcceptHeight),
×
UNCOV
1165
                                ResultInvoiceAlreadySettled,
×
UNCOV
1166
                        )
×
UNCOV
1167

×
UNCOV
1168
                        i.notifyHodlSubscribers(htlcFailResolution)
×
UNCOV
1169
                }
×
1170

1171
        // If we accepted the htlc, subscribe to the hodl invoice and return
1172
        // an accept resolution with the htlc's accept time on it.
1173
        case *htlcAcceptResolution:
856✔
1174
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
856✔
1175
                if !ok {
856✔
UNCOV
1176
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
UNCOV
1177
                                " present on invoice: %x", ctx.circuitKey,
×
UNCOV
1178
                                ctx.hash[:])
×
UNCOV
1179
                }
×
1180

1181
                // Determine accepted height of this htlc. If the htlc reached
1182
                // the invoice database (possibly in a previous call to the
1183
                // invoice registry), we'll take the original accepted height
1184
                // as it was recorded in the database.
1185
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
856✔
1186

856✔
1187
                ctx.log(fmt.Sprintf("accept resolution result "+
856✔
1188
                        "outcome: %v, at accept height: %v",
856✔
1189
                        res.outcome, acceptHeight))
856✔
1190

856✔
1191
                // Auto-release the htlc if the invoice is still open. It can
856✔
1192
                // only happen for mpp payments that there are htlcs in state
856✔
1193
                // Accepted while the invoice is Open.
856✔
1194
                if invoice.State == ContractOpen {
1,190✔
1195
                        res.acceptTime = invoiceHtlc.AcceptTime
334✔
1196
                        res.autoRelease = true
334✔
1197
                }
334✔
1198

1199
                // If we have fully accepted the set of htlcs for this invoice,
1200
                // we can now add it to our invoice expiry watcher. We do not
1201
                // add invoices before they are fully accepted, because it is
1202
                // possible that we MppTimeout the htlcs, and then our relevant
1203
                // expiry height could change.
1204
                if res.outcome == resultAccepted {
1,375✔
1205
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
519✔
1206
                }
519✔
1207

1208
                i.hodlSubscribe(hodlChan, ctx.circuitKey)
856✔
1209

UNCOV
1210
        default:
×
UNCOV
1211
                panic("unknown action")
×
1212
        }
1213

1214
        // Now that the links have been notified of any state changes to their
1215
        // HTLCs, we'll go ahead and notify any clients wiaiting on the invoice
1216
        // state changes.
1217
        if updateSubscribers {
2,344✔
1218
                // We'll add a setID onto the notification, but only if this is
989✔
1219
                // an AMP invoice being settled.
989✔
1220
                var setID *[32]byte
989✔
1221
                if _, ok := resolution.(*HtlcSettleResolution); ok {
1,457✔
1222
                        setID = ctx.setID()
468✔
1223
                }
468✔
1224

1225
                i.notifyClients(ctx.hash, invoice, setID)
989✔
1226
        }
1227

1228
        return resolution, invoiceToExpire, nil
1,355✔
1229
}
1230

1231
// SettleHodlInvoice sets the preimage of a hodl invoice.
1232
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1233
        preimage lntypes.Preimage) error {
506✔
1234

506✔
1235
        i.Lock()
506✔
1236
        defer i.Unlock()
506✔
1237

506✔
1238
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
1,012✔
1239
                switch invoice.State {
506✔
UNCOV
1240
                case ContractOpen:
×
UNCOV
1241
                        return nil, ErrInvoiceStillOpen
×
1242

UNCOV
1243
                case ContractCanceled:
×
UNCOV
1244
                        return nil, ErrInvoiceAlreadyCanceled
×
1245

1246
                case ContractSettled:
3✔
1247
                        return nil, ErrInvoiceAlreadySettled
3✔
1248
                }
1249

1250
                return &InvoiceUpdateDesc{
503✔
1251
                        UpdateType: SettleHodlInvoiceUpdate,
503✔
1252
                        State: &InvoiceStateUpdateDesc{
503✔
1253
                                NewState: ContractSettled,
503✔
1254
                                Preimage: &preimage,
503✔
1255
                        },
503✔
1256
                }, nil
503✔
1257
        }
1258

1259
        hash := preimage.Hash()
506✔
1260
        invoiceRef := InvoiceRefByHash(hash)
506✔
1261
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
506✔
1262
        if err != nil {
509✔
1263
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
3✔
1264
                        preimage, err)
3✔
1265

3✔
1266
                return err
3✔
1267
        }
3✔
1268

1269
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
503✔
1270
                invoice.Terms.PaymentPreimage)
503✔
1271

503✔
1272
        // In the callback, we marked the invoice as settled. UpdateInvoice will
503✔
1273
        // have seen this and should have moved all htlcs that were accepted to
503✔
1274
        // the settled state. In the loop below, we go through all of these and
503✔
1275
        // notify links and resolvers that are waiting for resolution. Any htlcs
503✔
1276
        // that were already settled before, will be notified again. This isn't
503✔
1277
        // necessary but doesn't hurt either.
503✔
1278
        for key, htlc := range invoice.Htlcs {
1,009✔
1279
                if htlc.State != HtlcStateSettled {
506✔
UNCOV
1280
                        continue
×
1281
                }
1282

1283
                resolution := NewSettleResolution(
506✔
1284
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
506✔
1285
                )
506✔
1286

506✔
1287
                i.notifyHodlSubscribers(resolution)
506✔
1288
        }
1289
        i.notifyClients(hash, invoice, nil)
503✔
1290

503✔
1291
        return nil
503✔
1292
}
1293

1294
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1295
// payment hash.
1296
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1297
        payHash lntypes.Hash) error {
33✔
1298

33✔
1299
        return i.cancelInvoiceImpl(ctx, payHash, true)
33✔
1300
}
33✔
1301

1302
// shouldCancel examines the state of an invoice and whether we want to
1303
// cancel already accepted invoices, taking our force cancel boolean into
1304
// account. This is pulled out into its own function so that tests that mock
1305
// cancelInvoiceImpl can reuse this logic.
1306
func shouldCancel(state ContractState, cancelAccepted bool) bool {
97✔
1307
        if state != ContractAccepted {
166✔
1308
                return true
69✔
1309
        }
69✔
1310

1311
        // If the invoice is accepted, we should only cancel if we want to
1312
        // force cancellation of accepted invoices.
1313
        return cancelAccepted
32✔
1314
}
1315

1316
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1317
// payment hash. Accepted invoices will only be canceled if explicitly
1318
// requested to do so. It notifies subscribing links and resolvers that
1319
// the associated htlcs were canceled if they change state.
1320
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1321
        payHash lntypes.Hash, cancelAccepted bool) error {
106✔
1322

106✔
1323
        i.Lock()
106✔
1324
        defer i.Unlock()
106✔
1325

106✔
1326
        ref := InvoiceRefByHash(payHash)
106✔
1327
        log.Debugf("Invoice%v: canceling invoice", ref)
106✔
1328

106✔
1329
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
203✔
1330
                if !shouldCancel(invoice.State, cancelAccepted) {
109✔
1331
                        return nil, nil
12✔
1332
                }
12✔
1333

1334
                // Move invoice to the canceled state. Rely on validation in
1335
                // channeldb to return an error if the invoice is already
1336
                // settled or canceled.
1337
                return &InvoiceUpdateDesc{
85✔
1338
                        UpdateType: CancelInvoiceUpdate,
85✔
1339
                        State: &InvoiceStateUpdateDesc{
85✔
1340
                                NewState: ContractCanceled,
85✔
1341
                        },
85✔
1342
                }, nil
85✔
1343
        }
1344

1345
        invoiceRef := InvoiceRefByHash(payHash)
106✔
1346
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
106✔
1347

106✔
1348
        // Implement idempotency by returning success if the invoice was already
106✔
1349
        // canceled.
106✔
1350
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
109✔
1351
                log.Debugf("Invoice%v: already canceled", ref)
3✔
1352
                return nil
3✔
1353
        }
3✔
1354
        if err != nil {
126✔
1355
                return err
23✔
1356
        }
23✔
1357

1358
        // Return without cancellation if the invoice state is ContractAccepted.
1359
        if invoice.State == ContractAccepted {
96✔
1360
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
12✔
1361
                        "explicitly requested.", ref)
12✔
1362
                return nil
12✔
1363
        }
12✔
1364

1365
        log.Debugf("Invoice%v: canceled", ref)
72✔
1366

72✔
1367
        // In the callback, some htlcs may have been moved to the canceled
72✔
1368
        // state. We now go through all of these and notify links and resolvers
72✔
1369
        // that are waiting for resolution. Any htlcs that were already canceled
72✔
1370
        // before, will be notified again. This isn't necessary but doesn't hurt
72✔
1371
        // either.
72✔
1372
        for key, htlc := range invoice.Htlcs {
101✔
1373
                if htlc.State != HtlcStateCanceled {
29✔
UNCOV
1374
                        continue
×
1375
                }
1376

1377
                i.notifyHodlSubscribers(
29✔
1378
                        NewFailResolution(
29✔
1379
                                key, int32(htlc.AcceptHeight), ResultCanceled,
29✔
1380
                        ),
29✔
1381
                )
29✔
1382
        }
1383
        i.notifyClients(payHash, invoice, nil)
72✔
1384

72✔
1385
        // Attempt to also delete the invoice if requested through the registry
72✔
1386
        // config.
72✔
1387
        if i.cfg.GcCanceledInvoicesOnTheFly {
75✔
1388
                // Assemble the delete reference and attempt to delete through
3✔
1389
                // the invocice from the DB.
3✔
1390
                deleteRef := InvoiceDeleteRef{
3✔
1391
                        PayHash:     payHash,
3✔
1392
                        AddIndex:    invoice.AddIndex,
3✔
1393
                        SettleIndex: invoice.SettleIndex,
3✔
1394
                }
3✔
1395
                if invoice.Terms.PaymentAddr != BlankPayAddr {
3✔
UNCOV
1396
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
UNCOV
1397
                }
×
1398

1399
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
3✔
1400
                // If by any chance deletion failed, then log it instead of
3✔
1401
                // returning the error, as the invoice itself has already been
3✔
1402
                // canceled.
3✔
1403
                if err != nil {
3✔
UNCOV
1404
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
UNCOV
1405
                                err)
×
UNCOV
1406
                }
×
1407
        }
1408

1409
        return nil
72✔
1410
}
1411

1412
// notifyClients notifies all currently registered invoice notification clients
1413
// of a newly added/settled invoice.
1414
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1415
        invoice *Invoice, setID *[32]byte) {
2,693✔
1416

2,693✔
1417
        event := &invoiceEvent{
2,693✔
1418
                invoice: invoice,
2,693✔
1419
                hash:    hash,
2,693✔
1420
                setID:   setID,
2,693✔
1421
        }
2,693✔
1422

2,693✔
1423
        select {
2,693✔
1424
        case i.invoiceEvents <- event:
2,693✔
UNCOV
1425
        case <-i.quit:
×
1426
        }
1427
}
1428

1429
// invoiceSubscriptionKit defines that are common to both all invoice
1430
// subscribers and single invoice subscribers.
1431
type invoiceSubscriptionKit struct {
1432
        id uint32 // nolint:structcheck
1433

1434
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1435
        quit chan struct{}
1436

1437
        ntfnQueue *queue.ConcurrentQueue
1438

1439
        canceled   uint32 // To be used atomically.
1440
        cancelChan chan struct{}
1441

1442
        // backlogDelivered is closed when the backlog events have been
1443
        // delivered.
1444
        backlogDelivered chan struct{}
1445
}
1446

1447
// InvoiceSubscription represents an intent to receive updates for newly added
1448
// or settled invoices. For each newly added invoice, a copy of the invoice
1449
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1450
// invoice, a copy of the invoice will be sent over the SettledInvoices
1451
// channel.
1452
type InvoiceSubscription struct {
1453
        invoiceSubscriptionKit
1454

1455
        // NewInvoices is a channel that we'll use to send all newly created
1456
        // invoices with an invoice index greater than the specified
1457
        // StartingInvoiceIndex field.
1458
        NewInvoices chan *Invoice
1459

1460
        // SettledInvoices is a channel that we'll use to send all settled
1461
        // invoices with an invoices index greater than the specified
1462
        // StartingInvoiceIndex field.
1463
        SettledInvoices chan *Invoice
1464

1465
        // addIndex is the highest add index the caller knows of. We'll use
1466
        // this information to send out an event backlog to the notifications
1467
        // subscriber. Any new add events with an index greater than this will
1468
        // be dispatched before any new notifications are sent out.
1469
        addIndex uint64
1470

1471
        // settleIndex is the highest settle index the caller knows of. We'll
1472
        // use this information to send out an event backlog to the
1473
        // notifications subscriber. Any new settle events with an index
1474
        // greater than this will be dispatched before any new notifications
1475
        // are sent out.
1476
        settleIndex uint64
1477
}
1478

1479
// SingleInvoiceSubscription represents an intent to receive updates for a
1480
// specific invoice.
1481
type SingleInvoiceSubscription struct {
1482
        invoiceSubscriptionKit
1483

1484
        invoiceRef InvoiceRef
1485

1486
        // Updates is a channel that we'll use to send all invoice events for
1487
        // the invoice that is subscribed to.
1488
        Updates chan *Invoice
1489
}
1490

1491
// PayHash returns the optional payment hash of the target invoice.
1492
//
1493
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1494
// be deleted as soon as invoiceregistery_test is in the same module.
1495
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
18✔
1496
        return s.invoiceRef.PayHash()
18✔
1497
}
18✔
1498

1499
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1500
// resources.
1501
func (i *invoiceSubscriptionKit) Cancel() {
67✔
1502
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
67✔
UNCOV
1503
                return
×
UNCOV
1504
        }
×
1505

1506
        i.ntfnQueue.Stop()
67✔
1507
        close(i.cancelChan)
67✔
1508
}
1509

1510
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
106✔
1511
        select {
106✔
1512
        case i.ntfnQueue.ChanIn() <- event:
106✔
1513

UNCOV
1514
        case <-i.cancelChan:
×
UNCOV
1515
                // This can only be triggered by delivery of non-backlog
×
UNCOV
1516
                // events.
×
UNCOV
1517
                return ErrShuttingDown
×
UNCOV
1518
        case <-i.quit:
×
UNCOV
1519
                return ErrShuttingDown
×
1520
        }
1521

1522
        return nil
106✔
1523
}
1524

1525
// SubscribeNotifications returns an InvoiceSubscription which allows the
1526
// caller to receive async notifications when any invoices are settled or
1527
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1528
// by first sending out all new events with an invoice index _greater_ than
1529
// this value. Afterwards, we'll send out real-time notifications.
1530
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1531
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
49✔
1532

49✔
1533
        client := &InvoiceSubscription{
49✔
1534
                NewInvoices:     make(chan *Invoice),
49✔
1535
                SettledInvoices: make(chan *Invoice),
49✔
1536
                addIndex:        addIndex,
49✔
1537
                settleIndex:     settleIndex,
49✔
1538
                invoiceSubscriptionKit: invoiceSubscriptionKit{
49✔
1539
                        quit:             i.quit,
49✔
1540
                        ntfnQueue:        queue.NewConcurrentQueue(20),
49✔
1541
                        cancelChan:       make(chan struct{}),
49✔
1542
                        backlogDelivered: make(chan struct{}),
49✔
1543
                },
49✔
1544
        }
49✔
1545
        client.ntfnQueue.Start()
49✔
1546

49✔
1547
        // This notifies other goroutines that the backlog phase is over.
49✔
1548
        defer close(client.backlogDelivered)
49✔
1549

49✔
1550
        // Always increment by 1 first, and our client ID will start with 1,
49✔
1551
        // not 0.
49✔
1552
        client.id = atomic.AddUint32(&i.nextClientID, 1)
49✔
1553

49✔
1554
        // Before we register this new invoice subscription, we'll launch a new
49✔
1555
        // goroutine that will proxy all notifications appended to the end of
49✔
1556
        // the concurrent queue to the two client-side channels the caller will
49✔
1557
        // feed off of.
49✔
1558
        i.wg.Add(1)
49✔
1559
        go func() {
98✔
1560
                defer i.wg.Done()
49✔
1561
                defer i.deleteClient(client.id)
49✔
1562

49✔
1563
                for {
164✔
1564
                        select {
115✔
1565
                        // A new invoice event has been sent by the
1566
                        // invoiceRegistry! We'll figure out if this is an add
1567
                        // event or a settle event, then dispatch the event to
1568
                        // the client.
1569
                        case ntfn := <-client.ntfnQueue.ChanOut():
70✔
1570
                                invoiceEvent := ntfn.(*invoiceEvent)
70✔
1571

70✔
1572
                                var targetChan chan *Invoice
70✔
1573
                                state := invoiceEvent.invoice.State
70✔
1574
                                switch {
70✔
1575
                                // AMP invoices never move to settled, but will
1576
                                // be sent with a set ID if an HTLC set is
1577
                                // being settled.
1578
                                case state == ContractOpen &&
1579
                                        invoiceEvent.setID != nil:
10✔
1580
                                        fallthrough
10✔
1581

1582
                                case state == ContractSettled:
28✔
1583
                                        targetChan = client.SettledInvoices
28✔
1584

1585
                                case state == ContractOpen:
46✔
1586
                                        targetChan = client.NewInvoices
46✔
1587

UNCOV
1588
                                default:
×
UNCOV
1589
                                        log.Errorf("unknown invoice state: %v",
×
UNCOV
1590
                                                state)
×
UNCOV
1591

×
UNCOV
1592
                                        continue
×
1593
                                }
1594

1595
                                select {
70✔
1596
                                case targetChan <- invoiceEvent.invoice:
70✔
1597

UNCOV
1598
                                case <-client.cancelChan:
×
UNCOV
1599
                                        return
×
1600

UNCOV
1601
                                case <-i.quit:
×
UNCOV
1602
                                        return
×
1603
                                }
1604

1605
                        case <-client.cancelChan:
49✔
1606
                                return
49✔
1607

UNCOV
1608
                        case <-i.quit:
×
UNCOV
1609
                                return
×
1610
                        }
1611
                }
1612
        }()
1613

1614
        i.notificationClientMux.Lock()
49✔
1615
        i.notificationClients[client.id] = client
49✔
1616
        i.notificationClientMux.Unlock()
49✔
1617

49✔
1618
        // Query the database to see if based on the provided addIndex and
49✔
1619
        // settledIndex we need to deliver any backlog notifications.
49✔
1620
        err := i.deliverBacklogEvents(ctx, client)
49✔
1621
        if err != nil {
49✔
UNCOV
1622
                return nil, err
×
UNCOV
1623
        }
×
1624

1625
        log.Infof("New invoice subscription client: id=%v", client.id)
49✔
1626

49✔
1627
        return client, nil
49✔
1628
}
1629

1630
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1631
// caller to receive async notifications for a specific invoice.
1632
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1633
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
22✔
1634

22✔
1635
        client := &SingleInvoiceSubscription{
22✔
1636
                Updates: make(chan *Invoice),
22✔
1637
                invoiceSubscriptionKit: invoiceSubscriptionKit{
22✔
1638
                        quit:             i.quit,
22✔
1639
                        ntfnQueue:        queue.NewConcurrentQueue(20),
22✔
1640
                        cancelChan:       make(chan struct{}),
22✔
1641
                        backlogDelivered: make(chan struct{}),
22✔
1642
                },
22✔
1643
                invoiceRef: InvoiceRefByHash(hash),
22✔
1644
        }
22✔
1645
        client.ntfnQueue.Start()
22✔
1646

22✔
1647
        // This notifies other goroutines that the backlog phase is done.
22✔
1648
        defer close(client.backlogDelivered)
22✔
1649

22✔
1650
        // Always increment by 1 first, and our client ID will start with 1,
22✔
1651
        // not 0.
22✔
1652
        client.id = atomic.AddUint32(&i.nextClientID, 1)
22✔
1653

22✔
1654
        // Before we register this new invoice subscription, we'll launch a new
22✔
1655
        // goroutine that will proxy all notifications appended to the end of
22✔
1656
        // the concurrent queue to the two client-side channels the caller will
22✔
1657
        // feed off of.
22✔
1658
        i.wg.Add(1)
22✔
1659
        go func() {
44✔
1660
                defer i.wg.Done()
22✔
1661
                defer i.deleteClient(client.id)
22✔
1662

22✔
1663
                for {
80✔
1664
                        select {
58✔
1665
                        // A new invoice event has been sent by the
1666
                        // invoiceRegistry. We will dispatch the event to the
1667
                        // client.
1668
                        case ntfn := <-client.ntfnQueue.ChanOut():
40✔
1669
                                invoiceEvent := ntfn.(*invoiceEvent)
40✔
1670

40✔
1671
                                select {
40✔
1672
                                case client.Updates <- invoiceEvent.invoice:
40✔
1673

UNCOV
1674
                                case <-client.cancelChan:
×
UNCOV
1675
                                        return
×
1676

UNCOV
1677
                                case <-i.quit:
×
UNCOV
1678
                                        return
×
1679
                                }
1680

1681
                        case <-client.cancelChan:
22✔
1682
                                return
22✔
1683

UNCOV
1684
                        case <-i.quit:
×
UNCOV
1685
                                return
×
1686
                        }
1687
                }
1688
        }()
1689

1690
        i.notificationClientMux.Lock()
22✔
1691
        i.singleNotificationClients[client.id] = client
22✔
1692
        i.notificationClientMux.Unlock()
22✔
1693

22✔
1694
        err := i.deliverSingleBacklogEvents(ctx, client)
22✔
1695
        if err != nil {
22✔
1696
                return nil, err
×
UNCOV
1697
        }
×
1698

1699
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
22✔
1700
                client.id, client.invoiceRef)
22✔
1701

22✔
1702
        return client, nil
22✔
1703
}
1704

1705
// notifyHodlSubscribers sends out the htlc resolution to all current
1706
// subscribers.
1707
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
1,328✔
1708
        i.hodlSubscriptionsMux.Lock()
1,328✔
1709
        defer i.hodlSubscriptionsMux.Unlock()
1,328✔
1710

1,328✔
1711
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
1,328✔
1712
        if !ok {
1,811✔
1713
                return
483✔
1714
        }
483✔
1715

1716
        // Notify all interested subscribers and remove subscription from both
1717
        // maps. The subscription can be removed as there only ever will be a
1718
        // single resolution for each hash.
1719
        for subscriber := range subscribers {
1,698✔
1720
                select {
849✔
1721
                case subscriber <- htlcResolution:
849✔
UNCOV
1722
                case <-i.quit:
×
UNCOV
1723
                        return
×
1724
                }
1725

1726
                delete(
849✔
1727
                        i.hodlReverseSubscriptions[subscriber],
849✔
1728
                        htlcResolution.CircuitKey(),
849✔
1729
                )
849✔
1730
        }
1731

1732
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
849✔
1733
}
1734

1735
// hodlSubscribe adds a new invoice subscription.
1736
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1737
        circuitKey CircuitKey) {
856✔
1738

856✔
1739
        i.hodlSubscriptionsMux.Lock()
856✔
1740
        defer i.hodlSubscriptionsMux.Unlock()
856✔
1741

856✔
1742
        log.Debugf("Hodl subscribe for %v", circuitKey)
856✔
1743

856✔
1744
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
856✔
1745
        if !ok {
1,705✔
1746
                subscriptions = make(map[chan<- interface{}]struct{})
849✔
1747
                i.hodlSubscriptions[circuitKey] = subscriptions
849✔
1748
        }
849✔
1749
        subscriptions[subscriber] = struct{}{}
856✔
1750

856✔
1751
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
856✔
1752
        if !ok {
1,217✔
1753
                reverseSubscriptions = make(map[CircuitKey]struct{})
361✔
1754
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
361✔
1755
        }
361✔
1756
        reverseSubscriptions[circuitKey] = struct{}{}
856✔
1757
}
1758

1759
// HodlUnsubscribeAll cancels the subscription.
1760
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
204✔
1761
        i.hodlSubscriptionsMux.Lock()
204✔
1762
        defer i.hodlSubscriptionsMux.Unlock()
204✔
1763

204✔
1764
        hashes := i.hodlReverseSubscriptions[subscriber]
204✔
1765
        for hash := range hashes {
209✔
1766
                delete(i.hodlSubscriptions[hash], subscriber)
5✔
1767
        }
5✔
1768

1769
        delete(i.hodlReverseSubscriptions, subscriber)
204✔
1770
}
1771

1772
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1773
// useful when we need to iterate the map to send notifications.
1774
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:lll
2,692✔
1775
        i.notificationClientMux.RLock()
2,692✔
1776
        defer i.notificationClientMux.RUnlock()
2,692✔
1777

2,692✔
1778
        clients := make(map[uint32]*SingleInvoiceSubscription)
2,692✔
1779
        for k, v := range i.singleNotificationClients {
2,732✔
1780
                clients[k] = v
40✔
1781
        }
40✔
1782
        return clients
2,692✔
1783
}
1784

1785
// copyClients copies i.notificationClients inside a lock. This is useful when
1786
// we need to iterate the map to send notifications.
1787
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
2,103✔
1788
        i.notificationClientMux.RLock()
2,103✔
1789
        defer i.notificationClientMux.RUnlock()
2,103✔
1790

2,103✔
1791
        clients := make(map[uint32]*InvoiceSubscription)
2,103✔
1792
        for k, v := range i.notificationClients {
2,173✔
1793
                clients[k] = v
70✔
1794
        }
70✔
1795
        return clients
2,103✔
1796
}
1797

1798
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1799
// not found.
1800
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
67✔
1801
        i.notificationClientMux.Lock()
67✔
1802
        defer i.notificationClientMux.Unlock()
67✔
1803

67✔
1804
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
67✔
1805
        delete(i.notificationClients, clientID)
67✔
1806
        delete(i.singleNotificationClients, clientID)
67✔
1807
}
67✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc