• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.57
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/clock"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/queue"
15
        "github.com/lightningnetwork/lnd/record"
16
)
17

18
var (
19
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
20
        // be accepted or settled with not enough blocks remaining.
21
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
22

23
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to
24
        // be accepted or settled with an amount that is too low.
25
        ErrInvoiceAmountTooLow = errors.New(
26
                "paid amount less than invoice amount",
27
        )
28

29
        // ErrShuttingDown is returned when an operation failed because the
30
        // invoice registry is shutting down.
31
        ErrShuttingDown = errors.New("invoice registry shutting down")
32
)
33

34
const (
35
        // DefaultHtlcHoldDuration defines the default for how long mpp htlcs
36
        // are held while waiting for the other set members to arrive.
37
        DefaultHtlcHoldDuration = 120 * time.Second
38
)
39

40
// RegistryConfig contains the configuration parameters for invoice registry.
41
type RegistryConfig struct {
42
        // FinalCltvRejectDelta defines the number of blocks before the expiry
43
        // of the htlc where we no longer settle it as an exit hop and instead
44
        // cancel it back. Normally this value should be lower than the cltv
45
        // expiry of any invoice we create and the code effectuating this should
46
        // not be hit.
47
        FinalCltvRejectDelta int32
48

49
        // HtlcHoldDuration defines for how long mpp htlcs are held while
50
        // waiting for the other set members to arrive.
51
        HtlcHoldDuration time.Duration
52

53
        // Clock holds the clock implementation that is used to provide
54
        // Now() and TickAfter() and is useful to stub out the clock functions
55
        // during testing.
56
        Clock clock.Clock
57

58
        // AcceptKeySend indicates whether we want to accept spontaneous key
59
        // send payments.
60
        AcceptKeySend bool
61

62
        // AcceptAMP indicates whether we want to accept spontaneous AMP
63
        // payments.
64
        AcceptAMP bool
65

66
        // GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
67
        // all canceled invoices upon start.
68
        GcCanceledInvoicesOnStartup bool
69

70
        // GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
71
        // canceled invoices on the fly.
72
        GcCanceledInvoicesOnTheFly bool
73

74
        // KeysendHoldTime indicates for how long we want to accept and hold
75
        // spontaneous keysend payments.
76
        KeysendHoldTime time.Duration
77
}
78

79
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
80
// mpp htlcs for which the complete set didn't arrive in time.
81
type htlcReleaseEvent struct {
82
        // invoiceRef identifiers the invoice this htlc belongs to.
83
        invoiceRef InvoiceRef
84

85
        // key is the circuit key of the htlc to release.
86
        key CircuitKey
87

88
        // releaseTime is the time at which to release the htlc.
89
        releaseTime time.Time
90
}
91

92
// Less is used to order PriorityQueueItem's by their release time such that
93
// items with the older release time are at the top of the queue.
94
//
95
// NOTE: Part of the queue.PriorityQueueItem interface.
96
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
3✔
97
        return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
3✔
98
}
3✔
99

100
// InvoiceRegistry is a central registry of all the outstanding invoices
101
// created by the daemon. The registry is a thin wrapper around a map in order
102
// to ensure that all updates/reads are thread safe.
103
type InvoiceRegistry struct {
104
        sync.RWMutex
105

106
        nextClientID uint32 // must be used atomically
107

108
        idb InvoiceDB
109

110
        // cfg contains the registry's configuration parameters.
111
        cfg *RegistryConfig
112

113
        // notificationClientMux locks notificationClients and
114
        // singleNotificationClients. Using a separate mutex for these maps is
115
        // necessary to avoid deadlocks in the registry when processing invoice
116
        // events.
117
        notificationClientMux sync.RWMutex
118

119
        notificationClients map[uint32]*InvoiceSubscription
120

121
        // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
122
        // performance.
123
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
124

125
        // invoiceEvents is a single channel over which invoice updates are
126
        // carried.
127
        invoiceEvents chan *invoiceEvent
128

129
        // hodlSubscriptionsMux locks the hodlSubscriptions and
130
        // hodlReverseSubscriptions. Using a separate mutex for these maps is
131
        // necessary to avoid deadlocks in the registry when processing invoice
132
        // events.
133
        hodlSubscriptionsMux sync.RWMutex
134

135
        // hodlSubscriptions is a map from a circuit key to a list of
136
        // subscribers. It is used for efficient notification of links.
137
        hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
138

139
        // reverseSubscriptions tracks circuit keys subscribed to per
140
        // subscriber. This is used to unsubscribe from all hashes efficiently.
141
        hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
142

143
        // htlcAutoReleaseChan contains the new htlcs that need to be
144
        // auto-released.
145
        htlcAutoReleaseChan chan *htlcReleaseEvent
146

147
        expiryWatcher *InvoiceExpiryWatcher
148

149
        wg   sync.WaitGroup
150
        quit chan struct{}
151
}
152

153
// NewRegistry creates a new invoice registry. The invoice registry
154
// wraps the persistent on-disk invoice storage with an additional in-memory
155
// layer. The in-memory layer is in place such that debug invoices can be added
156
// which are volatile yet available system wide within the daemon.
157
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
158
        cfg *RegistryConfig) *InvoiceRegistry {
3✔
159

3✔
160
        notificationClients := make(map[uint32]*InvoiceSubscription)
3✔
161
        singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
3✔
162
        return &InvoiceRegistry{
3✔
163
                idb:                       idb,
3✔
164
                notificationClients:       notificationClients,
3✔
165
                singleNotificationClients: singleNotificationClients,
3✔
166
                invoiceEvents:             make(chan *invoiceEvent, 100),
3✔
167
                hodlSubscriptions: make(
3✔
168
                        map[CircuitKey]map[chan<- interface{}]struct{},
3✔
169
                ),
3✔
170
                hodlReverseSubscriptions: make(
3✔
171
                        map[chan<- interface{}]map[CircuitKey]struct{},
3✔
172
                ),
3✔
173
                cfg:                 cfg,
3✔
174
                htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
3✔
175
                expiryWatcher:       expiryWatcher,
3✔
176
                quit:                make(chan struct{}),
3✔
177
        }
3✔
178
}
3✔
179

180
// scanInvoicesOnStart will scan all invoices on start and add active invoices
181
// to the invoice expiry watcher while also attempting to delete all canceled
182
// invoices.
183
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
3✔
184
        pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
3✔
185
        if err != nil {
3✔
186
                return err
×
187
        }
×
188

189
        var pending []invoiceExpiry
3✔
190
        for paymentHash, invoice := range pendingInvoices {
6✔
191
                invoice := invoice
3✔
192
                expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
3✔
193
                if expiryRef != nil {
6✔
194
                        pending = append(pending, expiryRef)
3✔
195
                }
3✔
196
        }
197

198
        log.Debugf("Adding %d pending invoices to the expiry watcher",
3✔
199
                len(pending))
3✔
200
        i.expiryWatcher.AddInvoices(pending...)
3✔
201

3✔
202
        if i.cfg.GcCanceledInvoicesOnStartup {
3✔
203
                log.Infof("Deleting canceled invoices")
×
204
                err = i.idb.DeleteCanceledInvoices(ctx)
×
205
                if err != nil {
×
206
                        log.Warnf("Deleting canceled invoices failed: %v", err)
×
207
                        return err
×
208
                }
×
209
        }
210

211
        return nil
3✔
212
}
213

214
// Start starts the registry and all goroutines it needs to carry out its task.
215
func (i *InvoiceRegistry) Start() error {
3✔
216
        // Start InvoiceExpiryWatcher and prepopulate it with existing active
3✔
217
        // invoices.
3✔
218
        err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
6✔
219
                return i.cancelInvoiceImpl(context.Background(), hash, force)
3✔
220
        })
3✔
221
        if err != nil {
3✔
222
                return err
×
223
        }
×
224

225
        log.Info("InvoiceRegistry starting")
3✔
226

3✔
227
        i.wg.Add(1)
3✔
228
        go i.invoiceEventLoop()
3✔
229

3✔
230
        // Now scan all pending and removable invoices to the expiry watcher or
3✔
231
        // delete them.
3✔
232
        err = i.scanInvoicesOnStart(context.Background())
3✔
233
        if err != nil {
3✔
234
                _ = i.Stop()
×
235
                return err
×
236
        }
×
237

238
        return nil
3✔
239
}
240

241
// Stop signals the registry for a graceful shutdown.
242
func (i *InvoiceRegistry) Stop() error {
3✔
243
        log.Info("InvoiceRegistry shutting down...")
3✔
244
        defer log.Debug("InvoiceRegistry shutdown complete")
3✔
245

3✔
246
        i.expiryWatcher.Stop()
3✔
247

3✔
248
        close(i.quit)
3✔
249

3✔
250
        i.wg.Wait()
3✔
251
        return nil
3✔
252
}
3✔
253

254
// invoiceEvent represents a new event that has modified on invoice on disk.
255
// Only two event types are currently supported: newly created invoices, and
256
// instance where invoices are settled.
257
type invoiceEvent struct {
258
        hash    lntypes.Hash
259
        invoice *Invoice
260
        setID   *[32]byte
261
}
262

263
// tickAt returns a channel that ticks at the specified time. If the time has
264
// already passed, it will tick immediately.
265
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
3✔
266
        now := i.cfg.Clock.Now()
3✔
267
        return i.cfg.Clock.TickAfter(t.Sub(now))
3✔
268
}
3✔
269

270
// invoiceEventLoop is the dedicated goroutine responsible for accepting
271
// new notification subscriptions, cancelling old subscriptions, and
272
// dispatching new invoice events.
273
func (i *InvoiceRegistry) invoiceEventLoop() {
3✔
274
        defer i.wg.Done()
3✔
275

3✔
276
        // Set up a heap for htlc auto-releases.
3✔
277
        autoReleaseHeap := &queue.PriorityQueue{}
3✔
278

3✔
279
        for {
6✔
280
                // If there is something to release, set up a release tick
3✔
281
                // channel.
3✔
282
                var nextReleaseTick <-chan time.Time
3✔
283
                if autoReleaseHeap.Len() > 0 {
6✔
284
                        head := autoReleaseHeap.Top().(*htlcReleaseEvent)
3✔
285
                        nextReleaseTick = i.tickAt(head.releaseTime)
3✔
286
                }
3✔
287

288
                select {
3✔
289
                // A sub-systems has just modified the invoice state, so we'll
290
                // dispatch notifications to all registered clients.
291
                case event := <-i.invoiceEvents:
3✔
292
                        // For backwards compatibility, do not notify all
3✔
293
                        // invoice subscribers of cancel and accept events.
3✔
294
                        state := event.invoice.State
3✔
295
                        if state != ContractCanceled &&
3✔
296
                                state != ContractAccepted {
6✔
297

3✔
298
                                i.dispatchToClients(event)
3✔
299
                        }
3✔
300
                        i.dispatchToSingleClients(event)
3✔
301

302
                // A new htlc came in for auto-release.
303
                case event := <-i.htlcAutoReleaseChan:
3✔
304
                        log.Debugf("Scheduling auto-release for htlc: "+
3✔
305
                                "ref=%v, key=%v at %v",
3✔
306
                                event.invoiceRef, event.key, event.releaseTime)
3✔
307

3✔
308
                        // We use an independent timer for every htlc rather
3✔
309
                        // than a set timer that is reset with every htlc coming
3✔
310
                        // in. Otherwise the sender could keep resetting the
3✔
311
                        // timer until the broadcast window is entered and our
3✔
312
                        // channel is force closed.
3✔
313
                        autoReleaseHeap.Push(event)
3✔
314

315
                // The htlc at the top of the heap needs to be auto-released.
316
                case <-nextReleaseTick:
×
317
                        event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
×
318
                        err := i.cancelSingleHtlc(
×
319
                                event.invoiceRef, event.key, ResultMppTimeout,
×
320
                        )
×
321
                        if err != nil {
×
322
                                log.Errorf("HTLC timer: %v", err)
×
323
                        }
×
324

325
                case <-i.quit:
3✔
326
                        return
3✔
327
                }
328
        }
329
}
330

331
// dispatchToSingleClients passes the supplied event to all notification
332
// clients that subscribed to all the invoice this event applies to.
333
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
3✔
334
        // Dispatch to single invoice subscribers.
3✔
335
        clients := i.copySingleClients()
3✔
336
        for _, client := range clients {
6✔
337
                payHash := client.invoiceRef.PayHash()
3✔
338

3✔
339
                if payHash == nil || *payHash != event.hash {
6✔
340
                        continue
3✔
341
                }
342

343
                select {
3✔
344
                case <-client.backlogDelivered:
3✔
345
                        // We won't deliver any events until the backlog has
346
                        // went through first.
347
                case <-i.quit:
×
348
                        return
×
349
                }
350

351
                client.notify(event)
3✔
352
        }
353
}
354

355
// dispatchToClients passes the supplied event to all notification clients that
356
// subscribed to all invoices. Add and settle indices are used to make sure
357
// that clients don't receive duplicate or unwanted events.
358
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
3✔
359
        invoice := event.invoice
3✔
360

3✔
361
        clients := i.copyClients()
3✔
362
        for clientID, client := range clients {
6✔
363
                // Before we dispatch this event, we'll check
3✔
364
                // to ensure that this client hasn't already
3✔
365
                // received this notification in order to
3✔
366
                // ensure we don't duplicate any events.
3✔
367

3✔
368
                // TODO(joostjager): Refactor switches.
3✔
369
                state := event.invoice.State
3✔
370
                switch {
3✔
371
                // If we've already sent this settle event to
372
                // the client, then we can skip this.
373
                case state == ContractSettled &&
374
                        client.settleIndex >= invoice.SettleIndex:
×
375
                        continue
×
376

377
                // Similarly, if we've already sent this add to
378
                // the client then we can skip this one, but only if this isn't
379
                // an AMP invoice. AMP invoices always remain in the settle
380
                // state as a base invoice.
381
                case event.setID == nil && state == ContractOpen &&
382
                        client.addIndex >= invoice.AddIndex:
×
383
                        continue
×
384

385
                // These two states should never happen, but we
386
                // log them just in case so we can detect this
387
                // instance.
388
                case state == ContractOpen &&
389
                        client.addIndex+1 != invoice.AddIndex:
3✔
390
                        log.Warnf("client=%v for invoice "+
3✔
391
                                "notifications missed an update, "+
3✔
392
                                "add_index=%v, new add event index=%v",
3✔
393
                                clientID, client.addIndex,
3✔
394
                                invoice.AddIndex)
3✔
395

396
                case state == ContractSettled &&
397
                        client.settleIndex+1 != invoice.SettleIndex:
3✔
398
                        log.Warnf("client=%v for invoice "+
3✔
399
                                "notifications missed an update, "+
3✔
400
                                "settle_index=%v, new settle event index=%v",
3✔
401
                                clientID, client.settleIndex,
3✔
402
                                invoice.SettleIndex)
3✔
403
                }
404

405
                select {
3✔
406
                case <-client.backlogDelivered:
3✔
407
                        // We won't deliver any events until the backlog has
408
                        // been processed.
409
                case <-i.quit:
×
410
                        return
×
411
                }
412

413
                err := client.notify(&invoiceEvent{
3✔
414
                        invoice: invoice,
3✔
415
                        setID:   event.setID,
3✔
416
                })
3✔
417
                if err != nil {
3✔
418
                        log.Errorf("Failed dispatching to client: %v", err)
×
419
                        return
×
420
                }
×
421

422
                // Each time we send a notification to a client, we'll record
423
                // the latest add/settle index it has. We'll use this to ensure
424
                // we don't send a notification twice, which can happen if a new
425
                // event is added while we're catching up a new client.
426
                invState := event.invoice.State
3✔
427
                switch {
3✔
428
                case invState == ContractSettled:
3✔
429
                        client.settleIndex = invoice.SettleIndex
3✔
430

431
                case invState == ContractOpen && event.setID == nil:
3✔
432
                        client.addIndex = invoice.AddIndex
3✔
433

434
                // If this is an AMP invoice, then we'll need to use the set ID
435
                // to keep track of the settle index of the client. AMP
436
                // invoices never go to the open state, but if a setID is
437
                // passed, then we know it was just settled and will track the
438
                // highest settle index so far.
439
                case invState == ContractOpen && event.setID != nil:
3✔
440
                        setID := *event.setID
3✔
441
                        client.settleIndex = invoice.AMPState[setID].SettleIndex
3✔
442

443
                default:
×
444
                        log.Errorf("unexpected invoice state: %v",
×
445
                                event.invoice.State)
×
446
                }
447
        }
448
}
449

450
// deliverBacklogEvents will attempts to query the invoice database for any
451
// notifications that the client has missed since it reconnected last.
452
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
453
        client *InvoiceSubscription) error {
3✔
454

3✔
455
        addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
3✔
456
        if err != nil {
3✔
457
                return err
×
458
        }
×
459

460
        settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
3✔
461
        if err != nil {
3✔
462
                return err
×
463
        }
×
464

465
        // If we have any to deliver, then we'll append them to the end of the
466
        // notification queue in order to catch up the client before delivering
467
        // any new notifications.
468
        for _, addEvent := range addEvents {
6✔
469
                // We re-bind the loop variable to ensure we don't hold onto
3✔
470
                // the loop reference causing is to point to the same item.
3✔
471
                addEvent := addEvent
3✔
472

3✔
473
                select {
3✔
474
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
475
                        invoice: &addEvent,
476
                }:
3✔
477
                case <-i.quit:
×
478
                        return ErrShuttingDown
×
479
                }
480
        }
481

482
        for _, settleEvent := range settleEvents {
6✔
483
                // We re-bind the loop variable to ensure we don't hold onto
3✔
484
                // the loop reference causing is to point to the same item.
3✔
485
                settleEvent := settleEvent
3✔
486

3✔
487
                select {
3✔
488
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
489
                        invoice: &settleEvent,
490
                }:
3✔
491
                case <-i.quit:
×
492
                        return ErrShuttingDown
×
493
                }
494
        }
495

496
        return nil
3✔
497
}
498

499
// deliverSingleBacklogEvents will attempt to query the invoice database to
500
// retrieve the current invoice state and deliver this to the subscriber. Single
501
// invoice subscribers will always receive the current state right after
502
// subscribing. Only in case the invoice does not yet exist, nothing is sent
503
// yet.
504
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
505
        client *SingleInvoiceSubscription) error {
3✔
506

3✔
507
        invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
3✔
508

3✔
509
        // It is possible that the invoice does not exist yet, but the client is
3✔
510
        // already watching it in anticipation.
3✔
511
        isNotFound := errors.Is(err, ErrInvoiceNotFound)
3✔
512
        isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
3✔
513
        if isNotFound || isNotCreated {
6✔
514
                return nil
3✔
515
        }
3✔
516
        if err != nil {
3✔
517
                return err
×
518
        }
×
519

520
        payHash := client.invoiceRef.PayHash()
3✔
521
        if payHash == nil {
3✔
522
                return nil
×
523
        }
×
524

525
        err = client.notify(&invoiceEvent{
3✔
526
                hash:    *payHash,
3✔
527
                invoice: &invoice,
3✔
528
        })
3✔
529
        if err != nil {
3✔
530
                return err
×
531
        }
×
532

533
        log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
3✔
534
                client.id, payHash)
3✔
535

3✔
536
        return nil
3✔
537
}
538

539
// AddInvoice adds a regular invoice for the specified amount, identified by
540
// the passed preimage. Additionally, any memo or receipt data provided will
541
// also be stored on-disk. Once this invoice is added, subsystems within the
542
// daemon add/forward HTLCs are able to obtain the proper preimage required for
543
// redemption in the case that we're the final destination. We also return the
544
// addIndex of the newly created invoice which monotonically increases for each
545
// new invoice added.  A side effect of this function is that it also sets
546
// AddIndex on the invoice argument.
547
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
548
        paymentHash lntypes.Hash) (uint64, error) {
3✔
549

3✔
550
        i.Lock()
3✔
551

3✔
552
        ref := InvoiceRefByHash(paymentHash)
3✔
553
        log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
3✔
554

3✔
555
        addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
3✔
556
        if err != nil {
6✔
557
                i.Unlock()
3✔
558
                return 0, err
3✔
559
        }
3✔
560

561
        // Now that we've added the invoice, we'll send dispatch a message to
562
        // notify the clients of this new invoice.
563
        i.notifyClients(paymentHash, invoice, nil)
3✔
564
        i.Unlock()
3✔
565

3✔
566
        // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
3✔
567
        // to avoid deadlock when a new invoice is added while an other is being
3✔
568
        // canceled.
3✔
569
        invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
3✔
570
        if invoiceExpiryRef != nil {
6✔
571
                i.expiryWatcher.AddInvoices(invoiceExpiryRef)
3✔
572
        }
3✔
573

574
        return addIndex, nil
3✔
575
}
576

577
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
578
// then we're able to pull the funds pending within an HTLC.
579
//
580
// TODO(roasbeef): ignore if settled?
581
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
582
        rHash lntypes.Hash) (Invoice, error) {
3✔
583

3✔
584
        // We'll check the database to see if there's an existing matching
3✔
585
        // invoice.
3✔
586
        ref := InvoiceRefByHash(rHash)
3✔
587
        return i.idb.LookupInvoice(ctx, ref)
3✔
588
}
3✔
589

590
// LookupInvoiceByRef looks up an invoice by the given reference, if found
591
// then we're able to pull the funds pending within an HTLC.
592
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
593
        ref InvoiceRef) (Invoice, error) {
3✔
594

3✔
595
        return i.idb.LookupInvoice(ctx, ref)
3✔
596
}
3✔
597

598
// startHtlcTimer starts a new timer via the invoice registry main loop that
599
// cancels a single htlc on an invoice when the htlc hold duration has passed.
600
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
601
        key CircuitKey, acceptTime time.Time) error {
3✔
602

3✔
603
        releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
3✔
604
        event := &htlcReleaseEvent{
3✔
605
                invoiceRef:  invoiceRef,
3✔
606
                key:         key,
3✔
607
                releaseTime: releaseTime,
3✔
608
        }
3✔
609

3✔
610
        select {
3✔
611
        case i.htlcAutoReleaseChan <- event:
3✔
612
                return nil
3✔
613

614
        case <-i.quit:
×
615
                return ErrShuttingDown
×
616
        }
617
}
618

619
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
620
// a resolution result which will be used to notify subscribed links and
621
// resolvers of the details of the htlc cancellation.
622
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
623
        key CircuitKey, result FailResolutionResult) error {
×
624

×
625
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
×
626
                // Only allow individual htlc cancellation on open invoices.
×
627
                if invoice.State != ContractOpen {
×
628
                        log.Debugf("cancelSingleHtlc: invoice %v no longer "+
×
629
                                "open", invoiceRef)
×
630

×
631
                        return nil, nil
×
632
                }
×
633

634
                // Lookup the current status of the htlc in the database.
635
                var (
×
636
                        htlcState HtlcState
×
637
                        setID     *SetID
×
638
                )
×
639
                htlc, ok := invoice.Htlcs[key]
×
640
                if !ok {
×
641
                        // If this is an AMP invoice, then all the HTLCs won't
×
642
                        // be read out, so we'll consult the other mapping to
×
643
                        // try to find the HTLC state in question here.
×
644
                        var found bool
×
645
                        for ampSetID, htlcSet := range invoice.AMPState {
×
646
                                ampSetID := ampSetID
×
647
                                for htlcKey := range htlcSet.InvoiceKeys {
×
648
                                        if htlcKey == key {
×
649
                                                htlcState = htlcSet.State
×
650
                                                setID = &ampSetID
×
651

×
652
                                                found = true
×
653
                                                break
×
654
                                        }
655
                                }
656
                        }
657

658
                        if !found {
×
659
                                return nil, fmt.Errorf("htlc %v not found", key)
×
660
                        }
×
661
                } else {
×
662
                        htlcState = htlc.State
×
663
                }
×
664

665
                // Cancellation is only possible if the htlc wasn't already
666
                // resolved.
667
                if htlcState != HtlcStateAccepted {
×
668
                        log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
×
669
                                "is already resolved", key, invoiceRef)
×
670

×
671
                        return nil, nil
×
672
                }
×
673

674
                log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
×
675
                        key, invoiceRef)
×
676

×
677
                // Return an update descriptor that cancels htlc and keeps
×
678
                // invoice open.
×
679
                canceledHtlcs := map[CircuitKey]struct{}{
×
680
                        key: {},
×
681
                }
×
682

×
683
                return &InvoiceUpdateDesc{
×
684
                        UpdateType:  CancelHTLCsUpdate,
×
685
                        CancelHtlcs: canceledHtlcs,
×
686
                        SetID:       setID,
×
687
                }, nil
×
688
        }
689

690
        // Try to mark the specified htlc as canceled in the invoice database.
691
        // Intercept the update descriptor to set the local updated variable. If
692
        // no invoice update is performed, we can return early.
693
        setID := (*SetID)(invoiceRef.SetID())
×
694
        var updated bool
×
695
        invoice, err := i.idb.UpdateInvoice(
×
696
                context.Background(), invoiceRef, setID,
×
697
                func(invoice *Invoice) (
×
698
                        *InvoiceUpdateDesc, error) {
×
699

×
700
                        updateDesc, err := updateInvoice(invoice)
×
701
                        if err != nil {
×
702
                                return nil, err
×
703
                        }
×
704
                        updated = updateDesc != nil
×
705

×
706
                        return updateDesc, err
×
707
                },
708
        )
709
        if err != nil {
×
710
                return err
×
711
        }
×
712
        if !updated {
×
713
                return nil
×
714
        }
×
715

716
        // The invoice has been updated. Notify subscribers of the htlc
717
        // resolution.
718
        htlc, ok := invoice.Htlcs[key]
×
719
        if !ok {
×
720
                return fmt.Errorf("htlc %v not found", key)
×
721
        }
×
722
        if htlc.State == HtlcStateCanceled {
×
723
                resolution := NewFailResolution(
×
724
                        key, int32(htlc.AcceptHeight), result,
×
725
                )
×
726

×
727
                i.notifyHodlSubscribers(resolution)
×
728
        }
×
729
        return nil
×
730
}
731

732
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
733
// htlc.
734
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
3✔
735
        // Retrieve keysend record if present.
3✔
736
        preimageSlice, ok := ctx.customRecords[record.KeySendType]
3✔
737
        if !ok {
6✔
738
                return nil
3✔
739
        }
3✔
740

741
        // Cancel htlc is preimage is invalid.
742
        preimage, err := lntypes.MakePreimage(preimageSlice)
3✔
743
        if err != nil {
3✔
744
                return err
×
745
        }
×
746
        if preimage.Hash() != ctx.hash {
3✔
747
                return fmt.Errorf("invalid keysend preimage %v for hash %v",
×
748
                        preimage, ctx.hash)
×
749
        }
×
750

751
        // Only allow keysend for non-mpp payments.
752
        if ctx.mpp != nil {
3✔
753
                return errors.New("no mpp keysend supported")
×
754
        }
×
755

756
        // Create an invoice for the htlc amount.
757
        amt := ctx.amtPaid
3✔
758

3✔
759
        // Set tlv required feature vector on the invoice. Otherwise we wouldn't
3✔
760
        // be able to pay to it with keysend.
3✔
761
        rawFeatures := lnwire.NewRawFeatureVector(
3✔
762
                lnwire.TLVOnionPayloadRequired,
3✔
763
        )
3✔
764
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
3✔
765

3✔
766
        // Use the minimum block delta that we require for settling htlcs.
3✔
767
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
3✔
768

3✔
769
        // Pre-check expiry here to prevent inserting an invoice that will not
3✔
770
        // be settled.
3✔
771
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
3✔
772
                return errors.New("final expiry too soon")
×
773
        }
×
774

775
        // The invoice database indexes all invoices by payment address, however
776
        // legacy keysend payment do not have one. In order to avoid a new
777
        // payment type on-disk wrt. to indexing, we'll continue to insert a
778
        // blank payment address which is special cased in the insertion logic
779
        // to not be indexed. In the future, once AMP is merged, this should be
780
        // replaced by generating a random payment address on the behalf of the
781
        // sender.
782
        payAddr := BlankPayAddr
3✔
783

3✔
784
        // Create placeholder invoice.
3✔
785
        invoice := &Invoice{
3✔
786
                CreationDate: i.cfg.Clock.Now(),
3✔
787
                Terms: ContractTerm{
3✔
788
                        FinalCltvDelta:  finalCltvDelta,
3✔
789
                        Value:           amt,
3✔
790
                        PaymentPreimage: &preimage,
3✔
791
                        PaymentAddr:     payAddr,
3✔
792
                        Features:        features,
3✔
793
                },
3✔
794
        }
3✔
795

3✔
796
        if i.cfg.KeysendHoldTime != 0 {
3✔
797
                invoice.HodlInvoice = true
×
798
                invoice.Terms.Expiry = i.cfg.KeysendHoldTime
×
799
        }
×
800

801
        // Insert invoice into database. Ignore duplicates, because this
802
        // may be a replay.
803
        _, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
3✔
804
        if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
3✔
805
                return err
×
806
        }
×
807

808
        return nil
3✔
809
}
810

811
// processAMP just-in-time inserts an invoice if this htlc is a keysend
812
// htlc.
813
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
3✔
814
        // AMP payments MUST also include an MPP record.
3✔
815
        if ctx.mpp == nil {
3✔
816
                return errors.New("no MPP record for AMP")
×
817
        }
×
818

819
        // Create an invoice for the total amount expected, provided in the MPP
820
        // record.
821
        amt := ctx.mpp.TotalMsat()
3✔
822

3✔
823
        // Set the TLV required and MPP optional features on the invoice. We'll
3✔
824
        // also make the AMP features required so that it can't be paid by
3✔
825
        // legacy or MPP htlcs.
3✔
826
        rawFeatures := lnwire.NewRawFeatureVector(
3✔
827
                lnwire.TLVOnionPayloadRequired,
3✔
828
                lnwire.PaymentAddrOptional,
3✔
829
                lnwire.AMPRequired,
3✔
830
        )
3✔
831
        features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
3✔
832

3✔
833
        // Use the minimum block delta that we require for settling htlcs.
3✔
834
        finalCltvDelta := i.cfg.FinalCltvRejectDelta
3✔
835

3✔
836
        // Pre-check expiry here to prevent inserting an invoice that will not
3✔
837
        // be settled.
3✔
838
        if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
3✔
839
                return errors.New("final expiry too soon")
×
840
        }
×
841

842
        // We'll use the sender-generated payment address provided in the HTLC
843
        // to create our AMP invoice.
844
        payAddr := ctx.mpp.PaymentAddr()
3✔
845

3✔
846
        // Create placeholder invoice.
3✔
847
        invoice := &Invoice{
3✔
848
                CreationDate: i.cfg.Clock.Now(),
3✔
849
                Terms: ContractTerm{
3✔
850
                        FinalCltvDelta:  finalCltvDelta,
3✔
851
                        Value:           amt,
3✔
852
                        PaymentPreimage: nil,
3✔
853
                        PaymentAddr:     payAddr,
3✔
854
                        Features:        features,
3✔
855
                },
3✔
856
        }
3✔
857

3✔
858
        // Insert invoice into database. Ignore duplicates payment hashes and
3✔
859
        // payment addrs, this may be a replay or a different HTLC for the AMP
3✔
860
        // invoice.
3✔
861
        _, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
3✔
862
        isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
3✔
863
        isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
3✔
864
        switch {
3✔
865
        case isDuplicatedInvoice || isDuplicatedPayAddr:
3✔
866
                return nil
3✔
867
        default:
3✔
868
                return err
3✔
869
        }
870
}
871

872
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
873
// describes how the htlc should be resolved.
874
//
875
// When the preimage of the invoice is not yet known (hodl invoice), this
876
// function moves the invoice to the accepted state. When SettleHoldInvoice is
877
// called later, a resolution message will be send back to the caller via the
878
// provided hodlChan. Invoice registry sends on this channel what action needs
879
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
880
// the channel is either buffered or received on from another goroutine to
881
// prevent deadlock.
882
//
883
// In the case that the htlc is part of a larger set of htlcs that pay to the
884
// same invoice (multi-path payment), the htlc is held until the set is
885
// complete. If the set doesn't fully arrive in time, a timer will cancel the
886
// held htlc.
887
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
888
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
889
        circuitKey CircuitKey, hodlChan chan<- interface{},
890
        payload Payload) (HtlcResolution, error) {
3✔
891

3✔
892
        // Create the update context containing the relevant details of the
3✔
893
        // incoming htlc.
3✔
894
        ctx := invoiceUpdateCtx{
3✔
895
                hash:                 rHash,
3✔
896
                circuitKey:           circuitKey,
3✔
897
                amtPaid:              amtPaid,
3✔
898
                expiry:               expiry,
3✔
899
                currentHeight:        currentHeight,
3✔
900
                finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
3✔
901
                customRecords:        payload.CustomRecords(),
3✔
902
                mpp:                  payload.MultiPath(),
3✔
903
                amp:                  payload.AMPRecord(),
3✔
904
                metadata:             payload.Metadata(),
3✔
905
        }
3✔
906

3✔
907
        switch {
3✔
908
        // If we are accepting spontaneous AMP payments and this payload
909
        // contains an AMP record, create an AMP invoice that will be settled
910
        // below.
911
        case i.cfg.AcceptAMP && ctx.amp != nil:
3✔
912
                err := i.processAMP(ctx)
3✔
913
                if err != nil {
3✔
914
                        ctx.log(fmt.Sprintf("amp error: %v", err))
×
915

×
916
                        return NewFailResolution(
×
917
                                circuitKey, currentHeight, ResultAmpError,
×
918
                        ), nil
×
919
                }
×
920

921
        // If we are accepting spontaneous keysend payments, create a regular
922
        // invoice that will be settled below. We also enforce that this is only
923
        // done when no AMP payload is present since it will only be settle-able
924
        // by regular HTLCs.
925
        case i.cfg.AcceptKeySend && ctx.amp == nil:
3✔
926
                err := i.processKeySend(ctx)
3✔
927
                if err != nil {
3✔
928
                        ctx.log(fmt.Sprintf("keysend error: %v", err))
×
929

×
930
                        return NewFailResolution(
×
931
                                circuitKey, currentHeight, ResultKeySendError,
×
932
                        ), nil
×
933
                }
×
934
        }
935

936
        // Execute locked notify exit hop logic.
937
        i.Lock()
3✔
938
        resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
3✔
939
                &ctx, hodlChan,
3✔
940
        )
3✔
941
        i.Unlock()
3✔
942
        if err != nil {
3✔
943
                return nil, err
×
944
        }
×
945

946
        if invoiceToExpire != nil {
6✔
947
                i.expiryWatcher.AddInvoices(invoiceToExpire)
3✔
948
        }
3✔
949

950
        switch r := resolution.(type) {
3✔
951
        // The htlc is held. Start a timer outside the lock if the htlc should
952
        // be auto-released, because otherwise a deadlock may happen with the
953
        // main event loop.
954
        case *htlcAcceptResolution:
3✔
955
                if r.autoRelease {
6✔
956
                        var invRef InvoiceRef
3✔
957
                        if ctx.amp != nil {
6✔
958
                                invRef = InvoiceRefBySetID(*ctx.setID())
3✔
959
                        } else {
6✔
960
                                invRef = ctx.invoiceRef()
3✔
961
                        }
3✔
962

963
                        err := i.startHtlcTimer(
3✔
964
                                invRef, circuitKey, r.acceptTime,
3✔
965
                        )
3✔
966
                        if err != nil {
3✔
967
                                return nil, err
×
968
                        }
×
969
                }
970

971
                // We return a nil resolution because htlc acceptances are
972
                // represented as nil resolutions externally.
973
                // TODO(carla) update calling code to handle accept resolutions.
974
                return nil, nil
3✔
975

976
        // A direct resolution was received for this htlc.
977
        case HtlcResolution:
3✔
978
                return r, nil
3✔
979

980
        // Fail if an unknown resolution type was received.
981
        default:
×
982
                return nil, errors.New("invalid resolution type")
×
983
        }
984
}
985

986
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
987
// that should be executed inside the registry lock. The returned invoiceExpiry
988
// (if not nil) needs to be added to the expiry watcher outside of the lock.
989
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
990
        ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
991
        HtlcResolution, invoiceExpiry, error) {
3✔
992

3✔
993
        // We'll attempt to settle an invoice matching this rHash on disk (if
3✔
994
        // one exists). The callback will update the invoice state and/or htlcs.
3✔
995
        var (
3✔
996
                resolution        HtlcResolution
3✔
997
                updateSubscribers bool
3✔
998
        )
3✔
999

3✔
1000
        callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1001
                updateDesc, res, err := updateInvoice(ctx, inv)
3✔
1002
                if err != nil {
3✔
1003
                        return nil, err
×
1004
                }
×
1005

1006
                // Only send an update if the invoice state was changed.
1007
                updateSubscribers = updateDesc != nil &&
3✔
1008
                        updateDesc.State != nil
3✔
1009

3✔
1010
                // Assign resolution to outer scope variable.
3✔
1011
                resolution = res
3✔
1012

3✔
1013
                return updateDesc, nil
3✔
1014
        }
1015

1016
        invoiceRef := ctx.invoiceRef()
3✔
1017
        setID := (*SetID)(ctx.setID())
3✔
1018
        invoice, err := i.idb.UpdateInvoice(
3✔
1019
                context.Background(), invoiceRef, setID, callback,
3✔
1020
        )
3✔
1021

3✔
1022
        var duplicateSetIDErr ErrDuplicateSetID
3✔
1023
        if errors.As(err, &duplicateSetIDErr) {
3✔
1024
                return NewFailResolution(
×
1025
                        ctx.circuitKey, ctx.currentHeight,
×
1026
                        ResultInvoiceNotFound,
×
1027
                ), nil, nil
×
1028
        }
×
1029

1030
        switch err {
3✔
1031
        case ErrInvoiceNotFound:
3✔
1032
                // If the invoice was not found, return a failure resolution
3✔
1033
                // with an invoice not found result.
3✔
1034
                return NewFailResolution(
3✔
1035
                        ctx.circuitKey, ctx.currentHeight,
3✔
1036
                        ResultInvoiceNotFound,
3✔
1037
                ), nil, nil
3✔
1038

1039
        case ErrInvRefEquivocation:
×
1040
                return NewFailResolution(
×
1041
                        ctx.circuitKey, ctx.currentHeight,
×
1042
                        ResultInvoiceNotFound,
×
1043
                ), nil, nil
×
1044

1045
        case nil:
3✔
1046

1047
        default:
×
1048
                ctx.log(err.Error())
×
1049
                return nil, nil, err
×
1050
        }
1051

1052
        var invoiceToExpire invoiceExpiry
3✔
1053

3✔
1054
        switch res := resolution.(type) {
3✔
1055
        case *HtlcFailResolution:
3✔
1056
                // Inspect latest htlc state on the invoice. If it is found,
3✔
1057
                // we will update the accept height as it was recorded in the
3✔
1058
                // invoice database (which occurs in the case where the htlc
3✔
1059
                // reached the database in a previous call). If the htlc was
3✔
1060
                // not found on the invoice, it was immediately failed so we
3✔
1061
                // send the failure resolution as is, which has the current
3✔
1062
                // height set as the accept height.
3✔
1063
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
3✔
1064
                if ok {
6✔
1065
                        res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
3✔
1066
                }
3✔
1067

1068
                ctx.log(fmt.Sprintf("failure resolution result "+
3✔
1069
                        "outcome: %v, at accept height: %v",
3✔
1070
                        res.Outcome, res.AcceptHeight))
3✔
1071

3✔
1072
                // Some failures apply to the entire HTLC set. Break here if
3✔
1073
                // this isn't one of them.
3✔
1074
                if !res.Outcome.IsSetFailure() {
6✔
1075
                        break
3✔
1076
                }
1077

1078
                // Also cancel any HTLCs in the HTLC set that are also in the
1079
                // canceled state with the same failure result.
1080
                setID := ctx.setID()
×
1081
                canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
×
1082
                for key, htlc := range canceledHtlcSet {
×
1083
                        htlcFailResolution := NewFailResolution(
×
1084
                                key, int32(htlc.AcceptHeight), res.Outcome,
×
1085
                        )
×
1086

×
1087
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1088
                }
×
1089

1090
        // If the htlc was settled, we will settle any previously accepted
1091
        // htlcs and notify our peer to settle them.
1092
        case *HtlcSettleResolution:
3✔
1093
                ctx.log(fmt.Sprintf("settle resolution result "+
3✔
1094
                        "outcome: %v, at accept height: %v",
3✔
1095
                        res.Outcome, res.AcceptHeight))
3✔
1096

3✔
1097
                // Also settle any previously accepted htlcs. If a htlc is
3✔
1098
                // marked as settled, we should follow now and settle the htlc
3✔
1099
                // with our peer.
3✔
1100
                setID := ctx.setID()
3✔
1101
                settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
3✔
1102
                for key, htlc := range settledHtlcSet {
6✔
1103
                        preimage := res.Preimage
3✔
1104
                        if htlc.AMP != nil && htlc.AMP.Preimage != nil {
6✔
1105
                                preimage = *htlc.AMP.Preimage
3✔
1106
                        }
3✔
1107

1108
                        // Notify subscribers that the htlcs should be settled
1109
                        // with our peer. Note that the outcome of the
1110
                        // resolution is set based on the outcome of the single
1111
                        // htlc that we just settled, so may not be accurate
1112
                        // for all htlcs.
1113
                        htlcSettleResolution := NewSettleResolution(
3✔
1114
                                preimage, key,
3✔
1115
                                int32(htlc.AcceptHeight), res.Outcome,
3✔
1116
                        )
3✔
1117

3✔
1118
                        // Notify subscribers that the htlc should be settled
3✔
1119
                        // with our peer.
3✔
1120
                        i.notifyHodlSubscribers(htlcSettleResolution)
3✔
1121
                }
1122

1123
                // If concurrent payments were attempted to this invoice before
1124
                // the current one was ultimately settled, cancel back any of
1125
                // the HTLCs immediately. As a result of the settle, the HTLCs
1126
                // in other HTLC sets are automatically converted to a canceled
1127
                // state when updating the invoice.
1128
                //
1129
                // TODO(roasbeef): can remove now??
1130
                canceledHtlcSet := invoice.HTLCSetCompliment(
3✔
1131
                        setID, HtlcStateCanceled,
3✔
1132
                )
3✔
1133
                for key, htlc := range canceledHtlcSet {
3✔
1134
                        htlcFailResolution := NewFailResolution(
×
1135
                                key, int32(htlc.AcceptHeight),
×
1136
                                ResultInvoiceAlreadySettled,
×
1137
                        )
×
1138

×
1139
                        i.notifyHodlSubscribers(htlcFailResolution)
×
1140
                }
×
1141

1142
        // If we accepted the htlc, subscribe to the hodl invoice and return
1143
        // an accept resolution with the htlc's accept time on it.
1144
        case *htlcAcceptResolution:
3✔
1145
                invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
3✔
1146
                if !ok {
3✔
1147
                        return nil, nil, fmt.Errorf("accepted htlc: %v not"+
×
1148
                                " present on invoice: %x", ctx.circuitKey,
×
1149
                                ctx.hash[:])
×
1150
                }
×
1151

1152
                // Determine accepted height of this htlc. If the htlc reached
1153
                // the invoice database (possibly in a previous call to the
1154
                // invoice registry), we'll take the original accepted height
1155
                // as it was recorded in the database.
1156
                acceptHeight := int32(invoiceHtlc.AcceptHeight)
3✔
1157

3✔
1158
                ctx.log(fmt.Sprintf("accept resolution result "+
3✔
1159
                        "outcome: %v, at accept height: %v",
3✔
1160
                        res.outcome, acceptHeight))
3✔
1161

3✔
1162
                // Auto-release the htlc if the invoice is still open. It can
3✔
1163
                // only happen for mpp payments that there are htlcs in state
3✔
1164
                // Accepted while the invoice is Open.
3✔
1165
                if invoice.State == ContractOpen {
6✔
1166
                        res.acceptTime = invoiceHtlc.AcceptTime
3✔
1167
                        res.autoRelease = true
3✔
1168
                }
3✔
1169

1170
                // If we have fully accepted the set of htlcs for this invoice,
1171
                // we can now add it to our invoice expiry watcher. We do not
1172
                // add invoices before they are fully accepted, because it is
1173
                // possible that we MppTimeout the htlcs, and then our relevant
1174
                // expiry height could change.
1175
                if res.outcome == resultAccepted {
6✔
1176
                        invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
3✔
1177
                }
3✔
1178

1179
                i.hodlSubscribe(hodlChan, ctx.circuitKey)
3✔
1180

1181
        default:
×
1182
                panic("unknown action")
×
1183
        }
1184

1185
        // Now that the links have been notified of any state changes to their
1186
        // HTLCs, we'll go ahead and notify any clients wiaiting on the invoice
1187
        // state changes.
1188
        if updateSubscribers {
6✔
1189
                // We'll add a setID onto the notification, but only if this is
3✔
1190
                // an AMP invoice being settled.
3✔
1191
                var setID *[32]byte
3✔
1192
                if _, ok := resolution.(*HtlcSettleResolution); ok {
6✔
1193
                        setID = ctx.setID()
3✔
1194
                }
3✔
1195

1196
                i.notifyClients(ctx.hash, invoice, setID)
3✔
1197
        }
1198

1199
        return resolution, invoiceToExpire, nil
3✔
1200
}
1201

1202
// SettleHodlInvoice sets the preimage of a hodl invoice.
1203
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
1204
        preimage lntypes.Preimage) error {
3✔
1205

3✔
1206
        i.Lock()
3✔
1207
        defer i.Unlock()
3✔
1208

3✔
1209
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1210
                switch invoice.State {
3✔
1211
                case ContractOpen:
×
1212
                        return nil, ErrInvoiceStillOpen
×
1213

1214
                case ContractCanceled:
×
1215
                        return nil, ErrInvoiceAlreadyCanceled
×
1216

1217
                case ContractSettled:
×
1218
                        return nil, ErrInvoiceAlreadySettled
×
1219
                }
1220

1221
                return &InvoiceUpdateDesc{
3✔
1222
                        UpdateType: SettleHodlInvoiceUpdate,
3✔
1223
                        State: &InvoiceStateUpdateDesc{
3✔
1224
                                NewState: ContractSettled,
3✔
1225
                                Preimage: &preimage,
3✔
1226
                        },
3✔
1227
                }, nil
3✔
1228
        }
1229

1230
        hash := preimage.Hash()
3✔
1231
        invoiceRef := InvoiceRefByHash(hash)
3✔
1232
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
3✔
1233
        if err != nil {
3✔
1234
                log.Errorf("SettleHodlInvoice with preimage %v: %v",
×
1235
                        preimage, err)
×
1236

×
1237
                return err
×
1238
        }
×
1239

1240
        log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
3✔
1241
                invoice.Terms.PaymentPreimage)
3✔
1242

3✔
1243
        // In the callback, we marked the invoice as settled. UpdateInvoice will
3✔
1244
        // have seen this and should have moved all htlcs that were accepted to
3✔
1245
        // the settled state. In the loop below, we go through all of these and
3✔
1246
        // notify links and resolvers that are waiting for resolution. Any htlcs
3✔
1247
        // that were already settled before, will be notified again. This isn't
3✔
1248
        // necessary but doesn't hurt either.
3✔
1249
        for key, htlc := range invoice.Htlcs {
6✔
1250
                if htlc.State != HtlcStateSettled {
3✔
1251
                        continue
×
1252
                }
1253

1254
                resolution := NewSettleResolution(
3✔
1255
                        preimage, key, int32(htlc.AcceptHeight), ResultSettled,
3✔
1256
                )
3✔
1257

3✔
1258
                i.notifyHodlSubscribers(resolution)
3✔
1259
        }
1260
        i.notifyClients(hash, invoice, nil)
3✔
1261

3✔
1262
        return nil
3✔
1263
}
1264

1265
// CancelInvoice attempts to cancel the invoice corresponding to the passed
1266
// payment hash.
1267
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
1268
        payHash lntypes.Hash) error {
3✔
1269

3✔
1270
        return i.cancelInvoiceImpl(ctx, payHash, true)
3✔
1271
}
3✔
1272

1273
// shouldCancel examines the state of an invoice and whether we want to
1274
// cancel already accepted invoices, taking our force cancel boolean into
1275
// account. This is pulled out into its own function so that tests that mock
1276
// cancelInvoiceImpl can reuse this logic.
1277
func shouldCancel(state ContractState, cancelAccepted bool) bool {
3✔
1278
        if state != ContractAccepted {
6✔
1279
                return true
3✔
1280
        }
3✔
1281

1282
        // If the invoice is accepted, we should only cancel if we want to
1283
        // force cancellation of accepted invoices.
1284
        return cancelAccepted
3✔
1285
}
1286

1287
// cancelInvoice attempts to cancel the invoice corresponding to the passed
1288
// payment hash. Accepted invoices will only be canceled if explicitly
1289
// requested to do so. It notifies subscribing links and resolvers that
1290
// the associated htlcs were canceled if they change state.
1291
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
1292
        payHash lntypes.Hash, cancelAccepted bool) error {
3✔
1293

3✔
1294
        i.Lock()
3✔
1295
        defer i.Unlock()
3✔
1296

3✔
1297
        ref := InvoiceRefByHash(payHash)
3✔
1298
        log.Debugf("Invoice%v: canceling invoice", ref)
3✔
1299

3✔
1300
        updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
6✔
1301
                if !shouldCancel(invoice.State, cancelAccepted) {
3✔
1302
                        return nil, nil
×
1303
                }
×
1304

1305
                // Move invoice to the canceled state. Rely on validation in
1306
                // channeldb to return an error if the invoice is already
1307
                // settled or canceled.
1308
                return &InvoiceUpdateDesc{
3✔
1309
                        UpdateType: CancelInvoiceUpdate,
3✔
1310
                        State: &InvoiceStateUpdateDesc{
3✔
1311
                                NewState: ContractCanceled,
3✔
1312
                        },
3✔
1313
                }, nil
3✔
1314
        }
1315

1316
        invoiceRef := InvoiceRefByHash(payHash)
3✔
1317
        invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
3✔
1318

3✔
1319
        // Implement idempotency by returning success if the invoice was already
3✔
1320
        // canceled.
3✔
1321
        if errors.Is(err, ErrInvoiceAlreadyCanceled) {
3✔
1322
                log.Debugf("Invoice%v: already canceled", ref)
×
1323
                return nil
×
1324
        }
×
1325
        if err != nil {
6✔
1326
                return err
3✔
1327
        }
3✔
1328

1329
        // Return without cancellation if the invoice state is ContractAccepted.
1330
        if invoice.State == ContractAccepted {
3✔
1331
                log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
×
1332
                        "explicitly requested.", ref)
×
1333
                return nil
×
1334
        }
×
1335

1336
        log.Debugf("Invoice%v: canceled", ref)
3✔
1337

3✔
1338
        // In the callback, some htlcs may have been moved to the canceled
3✔
1339
        // state. We now go through all of these and notify links and resolvers
3✔
1340
        // that are waiting for resolution. Any htlcs that were already canceled
3✔
1341
        // before, will be notified again. This isn't necessary but doesn't hurt
3✔
1342
        // either.
3✔
1343
        for key, htlc := range invoice.Htlcs {
6✔
1344
                if htlc.State != HtlcStateCanceled {
3✔
1345
                        continue
×
1346
                }
1347

1348
                i.notifyHodlSubscribers(
3✔
1349
                        NewFailResolution(
3✔
1350
                                key, int32(htlc.AcceptHeight), ResultCanceled,
3✔
1351
                        ),
3✔
1352
                )
3✔
1353
        }
1354
        i.notifyClients(payHash, invoice, nil)
3✔
1355

3✔
1356
        // Attempt to also delete the invoice if requested through the registry
3✔
1357
        // config.
3✔
1358
        if i.cfg.GcCanceledInvoicesOnTheFly {
3✔
1359
                // Assemble the delete reference and attempt to delete through
×
1360
                // the invocice from the DB.
×
1361
                deleteRef := InvoiceDeleteRef{
×
1362
                        PayHash:     payHash,
×
1363
                        AddIndex:    invoice.AddIndex,
×
1364
                        SettleIndex: invoice.SettleIndex,
×
1365
                }
×
1366
                if invoice.Terms.PaymentAddr != BlankPayAddr {
×
1367
                        deleteRef.PayAddr = &invoice.Terms.PaymentAddr
×
1368
                }
×
1369

1370
                err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
×
1371
                // If by any chance deletion failed, then log it instead of
×
1372
                // returning the error, as the invoice itself has already been
×
1373
                // canceled.
×
1374
                if err != nil {
×
1375
                        log.Warnf("Invoice %v could not be deleted: %v", ref,
×
1376
                                err)
×
1377
                }
×
1378
        }
1379

1380
        return nil
3✔
1381
}
1382

1383
// notifyClients notifies all currently registered invoice notification clients
1384
// of a newly added/settled invoice.
1385
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
1386
        invoice *Invoice, setID *[32]byte) {
3✔
1387

3✔
1388
        event := &invoiceEvent{
3✔
1389
                invoice: invoice,
3✔
1390
                hash:    hash,
3✔
1391
                setID:   setID,
3✔
1392
        }
3✔
1393

3✔
1394
        select {
3✔
1395
        case i.invoiceEvents <- event:
3✔
1396
        case <-i.quit:
×
1397
        }
1398
}
1399

1400
// invoiceSubscriptionKit defines that are common to both all invoice
1401
// subscribers and single invoice subscribers.
1402
type invoiceSubscriptionKit struct {
1403
        id uint32 // nolint:structcheck
1404

1405
        // quit is a chan mouted to InvoiceRegistry that signals a shutdown.
1406
        quit chan struct{}
1407

1408
        ntfnQueue *queue.ConcurrentQueue
1409

1410
        canceled   uint32 // To be used atomically.
1411
        cancelChan chan struct{}
1412

1413
        // backlogDelivered is closed when the backlog events have been
1414
        // delivered.
1415
        backlogDelivered chan struct{}
1416
}
1417

1418
// InvoiceSubscription represents an intent to receive updates for newly added
1419
// or settled invoices. For each newly added invoice, a copy of the invoice
1420
// will be sent over the NewInvoices channel. Similarly, for each newly settled
1421
// invoice, a copy of the invoice will be sent over the SettledInvoices
1422
// channel.
1423
type InvoiceSubscription struct {
1424
        invoiceSubscriptionKit
1425

1426
        // NewInvoices is a channel that we'll use to send all newly created
1427
        // invoices with an invoice index greater than the specified
1428
        // StartingInvoiceIndex field.
1429
        NewInvoices chan *Invoice
1430

1431
        // SettledInvoices is a channel that we'll use to send all settled
1432
        // invoices with an invoices index greater than the specified
1433
        // StartingInvoiceIndex field.
1434
        SettledInvoices chan *Invoice
1435

1436
        // addIndex is the highest add index the caller knows of. We'll use
1437
        // this information to send out an event backlog to the notifications
1438
        // subscriber. Any new add events with an index greater than this will
1439
        // be dispatched before any new notifications are sent out.
1440
        addIndex uint64
1441

1442
        // settleIndex is the highest settle index the caller knows of. We'll
1443
        // use this information to send out an event backlog to the
1444
        // notifications subscriber. Any new settle events with an index
1445
        // greater than this will be dispatched before any new notifications
1446
        // are sent out.
1447
        settleIndex uint64
1448
}
1449

1450
// SingleInvoiceSubscription represents an intent to receive updates for a
1451
// specific invoice.
1452
type SingleInvoiceSubscription struct {
1453
        invoiceSubscriptionKit
1454

1455
        invoiceRef InvoiceRef
1456

1457
        // Updates is a channel that we'll use to send all invoice events for
1458
        // the invoice that is subscribed to.
1459
        Updates chan *Invoice
1460
}
1461

1462
// PayHash returns the optional payment hash of the target invoice.
1463
//
1464
// TODO(positiveblue): This method is only supposed to be used in tests. It will
1465
// be deleted as soon as invoiceregistery_test is in the same module.
1466
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
×
1467
        return s.invoiceRef.PayHash()
×
1468
}
×
1469

1470
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
1471
// resources.
1472
func (i *invoiceSubscriptionKit) Cancel() {
3✔
1473
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
3✔
1474
                return
×
1475
        }
×
1476

1477
        i.ntfnQueue.Stop()
3✔
1478
        close(i.cancelChan)
3✔
1479
}
1480

1481
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
3✔
1482
        select {
3✔
1483
        case i.ntfnQueue.ChanIn() <- event:
3✔
1484

1485
        case <-i.cancelChan:
×
1486
                // This can only be triggered by delivery of non-backlog
×
1487
                // events.
×
1488
                return ErrShuttingDown
×
1489
        case <-i.quit:
×
1490
                return ErrShuttingDown
×
1491
        }
1492

1493
        return nil
3✔
1494
}
1495

1496
// SubscribeNotifications returns an InvoiceSubscription which allows the
1497
// caller to receive async notifications when any invoices are settled or
1498
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
1499
// by first sending out all new events with an invoice index _greater_ than
1500
// this value. Afterwards, we'll send out real-time notifications.
1501
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
1502
        addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
3✔
1503

3✔
1504
        client := &InvoiceSubscription{
3✔
1505
                NewInvoices:     make(chan *Invoice),
3✔
1506
                SettledInvoices: make(chan *Invoice),
3✔
1507
                addIndex:        addIndex,
3✔
1508
                settleIndex:     settleIndex,
3✔
1509
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3✔
1510
                        quit:             i.quit,
3✔
1511
                        ntfnQueue:        queue.NewConcurrentQueue(20),
3✔
1512
                        cancelChan:       make(chan struct{}),
3✔
1513
                        backlogDelivered: make(chan struct{}),
3✔
1514
                },
3✔
1515
        }
3✔
1516
        client.ntfnQueue.Start()
3✔
1517

3✔
1518
        // This notifies other goroutines that the backlog phase is over.
3✔
1519
        defer close(client.backlogDelivered)
3✔
1520

3✔
1521
        // Always increment by 1 first, and our client ID will start with 1,
3✔
1522
        // not 0.
3✔
1523
        client.id = atomic.AddUint32(&i.nextClientID, 1)
3✔
1524

3✔
1525
        // Before we register this new invoice subscription, we'll launch a new
3✔
1526
        // goroutine that will proxy all notifications appended to the end of
3✔
1527
        // the concurrent queue to the two client-side channels the caller will
3✔
1528
        // feed off of.
3✔
1529
        i.wg.Add(1)
3✔
1530
        go func() {
6✔
1531
                defer i.wg.Done()
3✔
1532
                defer i.deleteClient(client.id)
3✔
1533

3✔
1534
                for {
6✔
1535
                        select {
3✔
1536
                        // A new invoice event has been sent by the
1537
                        // invoiceRegistry! We'll figure out if this is an add
1538
                        // event or a settle event, then dispatch the event to
1539
                        // the client.
1540
                        case ntfn := <-client.ntfnQueue.ChanOut():
3✔
1541
                                invoiceEvent := ntfn.(*invoiceEvent)
3✔
1542

3✔
1543
                                var targetChan chan *Invoice
3✔
1544
                                state := invoiceEvent.invoice.State
3✔
1545
                                switch {
3✔
1546
                                // AMP invoices never move to settled, but will
1547
                                // be sent with a set ID if an HTLC set is
1548
                                // being settled.
1549
                                case state == ContractOpen &&
1550
                                        invoiceEvent.setID != nil:
3✔
1551
                                        fallthrough
3✔
1552

1553
                                case state == ContractSettled:
3✔
1554
                                        targetChan = client.SettledInvoices
3✔
1555

1556
                                case state == ContractOpen:
3✔
1557
                                        targetChan = client.NewInvoices
3✔
1558

1559
                                default:
×
1560
                                        log.Errorf("unknown invoice state: %v",
×
1561
                                                state)
×
1562

×
1563
                                        continue
×
1564
                                }
1565

1566
                                select {
3✔
1567
                                case targetChan <- invoiceEvent.invoice:
3✔
1568

1569
                                case <-client.cancelChan:
×
1570
                                        return
×
1571

1572
                                case <-i.quit:
×
1573
                                        return
×
1574
                                }
1575

1576
                        case <-client.cancelChan:
3✔
1577
                                return
3✔
1578

1579
                        case <-i.quit:
×
1580
                                return
×
1581
                        }
1582
                }
1583
        }()
1584

1585
        i.notificationClientMux.Lock()
3✔
1586
        i.notificationClients[client.id] = client
3✔
1587
        i.notificationClientMux.Unlock()
3✔
1588

3✔
1589
        // Query the database to see if based on the provided addIndex and
3✔
1590
        // settledIndex we need to deliver any backlog notifications.
3✔
1591
        err := i.deliverBacklogEvents(ctx, client)
3✔
1592
        if err != nil {
3✔
1593
                return nil, err
×
1594
        }
×
1595

1596
        log.Infof("New invoice subscription client: id=%v", client.id)
3✔
1597

3✔
1598
        return client, nil
3✔
1599
}
1600

1601
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
1602
// caller to receive async notifications for a specific invoice.
1603
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
1604
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
3✔
1605

3✔
1606
        client := &SingleInvoiceSubscription{
3✔
1607
                Updates: make(chan *Invoice),
3✔
1608
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3✔
1609
                        quit:             i.quit,
3✔
1610
                        ntfnQueue:        queue.NewConcurrentQueue(20),
3✔
1611
                        cancelChan:       make(chan struct{}),
3✔
1612
                        backlogDelivered: make(chan struct{}),
3✔
1613
                },
3✔
1614
                invoiceRef: InvoiceRefByHash(hash),
3✔
1615
        }
3✔
1616
        client.ntfnQueue.Start()
3✔
1617

3✔
1618
        // This notifies other goroutines that the backlog phase is done.
3✔
1619
        defer close(client.backlogDelivered)
3✔
1620

3✔
1621
        // Always increment by 1 first, and our client ID will start with 1,
3✔
1622
        // not 0.
3✔
1623
        client.id = atomic.AddUint32(&i.nextClientID, 1)
3✔
1624

3✔
1625
        // Before we register this new invoice subscription, we'll launch a new
3✔
1626
        // goroutine that will proxy all notifications appended to the end of
3✔
1627
        // the concurrent queue to the two client-side channels the caller will
3✔
1628
        // feed off of.
3✔
1629
        i.wg.Add(1)
3✔
1630
        go func() {
6✔
1631
                defer i.wg.Done()
3✔
1632
                defer i.deleteClient(client.id)
3✔
1633

3✔
1634
                for {
6✔
1635
                        select {
3✔
1636
                        // A new invoice event has been sent by the
1637
                        // invoiceRegistry. We will dispatch the event to the
1638
                        // client.
1639
                        case ntfn := <-client.ntfnQueue.ChanOut():
3✔
1640
                                invoiceEvent := ntfn.(*invoiceEvent)
3✔
1641

3✔
1642
                                select {
3✔
1643
                                case client.Updates <- invoiceEvent.invoice:
3✔
1644

1645
                                case <-client.cancelChan:
×
1646
                                        return
×
1647

1648
                                case <-i.quit:
×
1649
                                        return
×
1650
                                }
1651

1652
                        case <-client.cancelChan:
3✔
1653
                                return
3✔
1654

1655
                        case <-i.quit:
×
1656
                                return
×
1657
                        }
1658
                }
1659
        }()
1660

1661
        i.notificationClientMux.Lock()
3✔
1662
        i.singleNotificationClients[client.id] = client
3✔
1663
        i.notificationClientMux.Unlock()
3✔
1664

3✔
1665
        err := i.deliverSingleBacklogEvents(ctx, client)
3✔
1666
        if err != nil {
3✔
1667
                return nil, err
×
1668
        }
×
1669

1670
        log.Infof("New single invoice subscription client: id=%v, ref=%v",
3✔
1671
                client.id, client.invoiceRef)
3✔
1672

3✔
1673
        return client, nil
3✔
1674
}
1675

1676
// notifyHodlSubscribers sends out the htlc resolution to all current
1677
// subscribers.
1678
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
3✔
1679
        i.hodlSubscriptionsMux.Lock()
3✔
1680
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1681

3✔
1682
        subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
3✔
1683
        if !ok {
6✔
1684
                return
3✔
1685
        }
3✔
1686

1687
        // Notify all interested subscribers and remove subscription from both
1688
        // maps. The subscription can be removed as there only ever will be a
1689
        // single resolution for each hash.
1690
        for subscriber := range subscribers {
6✔
1691
                select {
3✔
1692
                case subscriber <- htlcResolution:
3✔
1693
                case <-i.quit:
×
1694
                        return
×
1695
                }
1696

1697
                delete(
3✔
1698
                        i.hodlReverseSubscriptions[subscriber],
3✔
1699
                        htlcResolution.CircuitKey(),
3✔
1700
                )
3✔
1701
        }
1702

1703
        delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
3✔
1704
}
1705

1706
// hodlSubscribe adds a new invoice subscription.
1707
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1708
        circuitKey CircuitKey) {
3✔
1709

3✔
1710
        i.hodlSubscriptionsMux.Lock()
3✔
1711
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1712

3✔
1713
        log.Debugf("Hodl subscribe for %v", circuitKey)
3✔
1714

3✔
1715
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
3✔
1716
        if !ok {
6✔
1717
                subscriptions = make(map[chan<- interface{}]struct{})
3✔
1718
                i.hodlSubscriptions[circuitKey] = subscriptions
3✔
1719
        }
3✔
1720
        subscriptions[subscriber] = struct{}{}
3✔
1721

3✔
1722
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
3✔
1723
        if !ok {
6✔
1724
                reverseSubscriptions = make(map[CircuitKey]struct{})
3✔
1725
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
3✔
1726
        }
3✔
1727
        reverseSubscriptions[circuitKey] = struct{}{}
3✔
1728
}
1729

1730
// HodlUnsubscribeAll cancels the subscription.
1731
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
3✔
1732
        i.hodlSubscriptionsMux.Lock()
3✔
1733
        defer i.hodlSubscriptionsMux.Unlock()
3✔
1734

3✔
1735
        hashes := i.hodlReverseSubscriptions[subscriber]
3✔
1736
        for hash := range hashes {
6✔
1737
                delete(i.hodlSubscriptions[hash], subscriber)
3✔
1738
        }
3✔
1739

1740
        delete(i.hodlReverseSubscriptions, subscriber)
3✔
1741
}
1742

1743
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
1744
// useful when we need to iterate the map to send notifications.
1745
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:lll
3✔
1746
        i.notificationClientMux.RLock()
3✔
1747
        defer i.notificationClientMux.RUnlock()
3✔
1748

3✔
1749
        clients := make(map[uint32]*SingleInvoiceSubscription)
3✔
1750
        for k, v := range i.singleNotificationClients {
6✔
1751
                clients[k] = v
3✔
1752
        }
3✔
1753
        return clients
3✔
1754
}
1755

1756
// copyClients copies i.notificationClients inside a lock. This is useful when
1757
// we need to iterate the map to send notifications.
1758
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
3✔
1759
        i.notificationClientMux.RLock()
3✔
1760
        defer i.notificationClientMux.RUnlock()
3✔
1761

3✔
1762
        clients := make(map[uint32]*InvoiceSubscription)
3✔
1763
        for k, v := range i.notificationClients {
6✔
1764
                clients[k] = v
3✔
1765
        }
3✔
1766
        return clients
3✔
1767
}
1768

1769
// deleteClient removes a client by its ID inside a lock. Noop if the client is
1770
// not found.
1771
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
3✔
1772
        i.notificationClientMux.Lock()
3✔
1773
        defer i.notificationClientMux.Unlock()
3✔
1774

3✔
1775
        log.Infof("Cancelling invoice subscription for client=%v", clientID)
3✔
1776
        delete(i.notificationClients, clientID)
3✔
1777
        delete(i.singleNotificationClients, clientID)
3✔
1778
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc