• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12301186252

12 Dec 2024 05:01PM UTC coverage: 48.92% (-9.7%) from 58.642%
12301186252

push

github

web-flow
Merge pull request #9309 from yyforyongyu/fix-unit-test

chainntnfs: fix `TestHistoricalConfDetailsTxIndex`

99121 of 202617 relevant lines covered (48.92%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/chainntnfs/neutrinonotify/neutrino.go
1
package neutrinonotify
2

3
import (
4
        "errors"
5
        "fmt"
6
        "strings"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcjson"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/rpcclient"
16
        "github.com/btcsuite/btcd/txscript"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightninglabs/neutrino"
19
        "github.com/lightninglabs/neutrino/headerfs"
20
        "github.com/lightningnetwork/lnd/blockcache"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/lntypes"
23
        "github.com/lightningnetwork/lnd/queue"
24
)
25

26
const (
27
        // notifierType uniquely identifies this concrete implementation of the
28
        // ChainNotifier interface.
29
        notifierType = "neutrino"
30
)
31

32
// NeutrinoNotifier is a version of ChainNotifier that's backed by the neutrino
33
// Bitcoin light client. Unlike other implementations, this implementation
34
// speaks directly to the p2p network. As a result, this implementation of the
35
// ChainNotifier interface is much more light weight that other implementation
36
// which rely of receiving notification over an RPC interface backed by a
37
// running full node.
38
//
39
// TODO(roasbeef): heavily consolidate with NeutrinoNotifier code
40
//   - maybe combine into single package?
41
type NeutrinoNotifier struct {
42
        epochClientCounter uint64 // To be used atomically.
43

44
        start   sync.Once
45
        active  int32 // To be used atomically.
46
        stopped int32 // To be used atomically.
47

48
        bestBlockMtx sync.RWMutex
49
        bestBlock    chainntnfs.BlockEpoch
50

51
        p2pNode   *neutrino.ChainService
52
        chainView *neutrino.Rescan
53

54
        chainConn *NeutrinoChainConn
55

56
        notificationCancels  chan interface{}
57
        notificationRegistry chan interface{}
58

59
        txNotifier *chainntnfs.TxNotifier
60

61
        blockEpochClients map[uint64]*blockEpochRegistration
62

63
        rescanErr <-chan error
64

65
        chainUpdates chan *filteredBlock
66

67
        txUpdates *queue.ConcurrentQueue
68

69
        // spendHintCache is a cache used to query and update the latest height
70
        // hints for an outpoint. Each height hint represents the earliest
71
        // height at which the outpoint could have been spent within the chain.
72
        spendHintCache chainntnfs.SpendHintCache
73

74
        // confirmHintCache is a cache used to query the latest height hints for
75
        // a transaction. Each height hint represents the earliest height at
76
        // which the transaction could have confirmed within the chain.
77
        confirmHintCache chainntnfs.ConfirmHintCache
78

79
        // blockCache is an LRU block cache.
80
        blockCache *blockcache.BlockCache
81

82
        wg   sync.WaitGroup
83
        quit chan struct{}
84
}
85

86
// Ensure NeutrinoNotifier implements the ChainNotifier interface at compile time.
87
var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil)
88

89
// New creates a new instance of the NeutrinoNotifier concrete implementation
90
// of the ChainNotifier interface.
91
//
92
// NOTE: The passed neutrino node should already be running and active before
93
// being passed into this function.
94
func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
95
        confirmHintCache chainntnfs.ConfirmHintCache,
96
        blockCache *blockcache.BlockCache) *NeutrinoNotifier {
×
97

×
98
        return &NeutrinoNotifier{
×
99
                notificationCancels:  make(chan interface{}),
×
100
                notificationRegistry: make(chan interface{}),
×
101

×
102
                blockEpochClients: make(map[uint64]*blockEpochRegistration),
×
103

×
104
                p2pNode:   node,
×
105
                chainConn: &NeutrinoChainConn{node},
×
106

×
107
                rescanErr: make(chan error),
×
108

×
109
                chainUpdates: make(chan *filteredBlock, 100),
×
110

×
111
                txUpdates: queue.NewConcurrentQueue(10),
×
112

×
113
                spendHintCache:   spendHintCache,
×
114
                confirmHintCache: confirmHintCache,
×
115

×
116
                blockCache: blockCache,
×
117

×
118
                quit: make(chan struct{}),
×
119
        }
×
120
}
×
121

122
// Start contacts the running neutrino light client and kicks off an initial
123
// empty rescan.
124
func (n *NeutrinoNotifier) Start() error {
×
125
        var startErr error
×
126
        n.start.Do(func() {
×
127
                startErr = n.startNotifier()
×
128
        })
×
129
        return startErr
×
130
}
131

132
// Stop shuts down the NeutrinoNotifier.
133
func (n *NeutrinoNotifier) Stop() error {
×
134
        // Already shutting down?
×
135
        if atomic.AddInt32(&n.stopped, 1) != 1 {
×
136
                return nil
×
137
        }
×
138

139
        chainntnfs.Log.Info("neutrino notifier shutting down...")
×
140
        defer chainntnfs.Log.Debug("neutrino notifier shutdown complete")
×
141

×
142
        close(n.quit)
×
143
        n.wg.Wait()
×
144

×
145
        n.txUpdates.Stop()
×
146

×
147
        // Notify all pending clients of our shutdown by closing the related
×
148
        // notification channels.
×
149
        for _, epochClient := range n.blockEpochClients {
×
150
                close(epochClient.cancelChan)
×
151
                epochClient.wg.Wait()
×
152

×
153
                close(epochClient.epochChan)
×
154
        }
×
155

156
        // The txNotifier is only initialized in the start method therefore we
157
        // need to make sure we don't access a nil pointer here.
158
        if n.txNotifier != nil {
×
159
                n.txNotifier.TearDown()
×
160
        }
×
161

162
        return nil
×
163
}
164

165
// Started returns true if this instance has been started, and false otherwise.
166
func (n *NeutrinoNotifier) Started() bool {
×
167
        return atomic.LoadInt32(&n.active) != 0
×
168
}
×
169

170
func (n *NeutrinoNotifier) startNotifier() error {
×
171
        // Start our concurrent queues before starting the rescan, to ensure
×
172
        // onFilteredBlockConnected and onRelavantTx callbacks won't be
×
173
        // blocked.
×
174
        n.txUpdates.Start()
×
175

×
176
        // First, we'll obtain the latest block height of the p2p node. We'll
×
177
        // start the auto-rescan from this point. Once a caller actually wishes
×
178
        // to register a chain view, the rescan state will be rewound
×
179
        // accordingly.
×
180
        startingPoint, err := n.p2pNode.BestBlock()
×
181
        if err != nil {
×
182
                n.txUpdates.Stop()
×
183
                return err
×
184
        }
×
185
        startingHeader, err := n.p2pNode.GetBlockHeader(
×
186
                &startingPoint.Hash,
×
187
        )
×
188
        if err != nil {
×
189
                n.txUpdates.Stop()
×
190
                return err
×
191
        }
×
192

193
        n.bestBlock.Hash = &startingPoint.Hash
×
194
        n.bestBlock.Height = startingPoint.Height
×
195
        n.bestBlock.BlockHeader = startingHeader
×
196

×
197
        n.txNotifier = chainntnfs.NewTxNotifier(
×
198
                uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit,
×
199
                n.confirmHintCache, n.spendHintCache,
×
200
        )
×
201

×
202
        // Next, we'll create our set of rescan options. Currently it's
×
203
        // required that a user MUST set an addr/outpoint/txid when creating a
×
204
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
×
205
        // actually be matched.
×
206
        var zeroInput neutrino.InputWithScript
×
207
        rescanOptions := []neutrino.RescanOption{
×
208
                neutrino.StartBlock(startingPoint),
×
209
                neutrino.QuitChan(n.quit),
×
210
                neutrino.NotificationHandlers(
×
211
                        rpcclient.NotificationHandlers{
×
212
                                OnFilteredBlockConnected:    n.onFilteredBlockConnected,
×
213
                                OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected,
×
214
                                OnRedeemingTx:               n.onRelevantTx,
×
215
                        },
×
216
                ),
×
217
                neutrino.WatchInputs(zeroInput),
×
218
        }
×
219

×
220
        // Finally, we'll create our rescan struct, start it, and launch all
×
221
        // the goroutines we need to operate this ChainNotifier instance.
×
222
        n.chainView = neutrino.NewRescan(
×
223
                &neutrino.RescanChainSource{
×
224
                        ChainService: n.p2pNode,
×
225
                },
×
226
                rescanOptions...,
×
227
        )
×
228
        n.rescanErr = n.chainView.Start()
×
229

×
230
        n.wg.Add(1)
×
231
        go n.notificationDispatcher()
×
232

×
233
        // Set the active flag now that we've completed the full
×
234
        // startup.
×
235
        atomic.StoreInt32(&n.active, 1)
×
236

×
237
        return nil
×
238
}
239

240
// filteredBlock represents a new block which has been connected to the main
241
// chain. The slice of transactions will only be populated if the block
242
// includes a transaction that confirmed one of our watched txids, or spends
243
// one of the outputs currently being watched.
244
type filteredBlock struct {
245
        header *wire.BlockHeader
246
        hash   chainhash.Hash
247
        height uint32
248
        txns   []*btcutil.Tx
249

250
        // connected is true if this update is a new block and false if it is a
251
        // disconnected block.
252
        connect bool
253
}
254

255
// rescanFilterUpdate represents a request that will be sent to the
256
// notificaionRegistry in order to prevent race conditions between the filter
257
// update and new block notifications.
258
type rescanFilterUpdate struct {
259
        updateOptions []neutrino.UpdateOption
260
        errChan       chan error
261
}
262

263
// onFilteredBlockConnected is a callback which is executed each a new block is
264
// connected to the end of the main chain.
265
func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
266
        header *wire.BlockHeader, txns []*btcutil.Tx) {
×
267

×
268
        // Append this new chain update to the end of the queue of new chain
×
269
        // updates.
×
270
        select {
×
271
        case n.chainUpdates <- &filteredBlock{
272
                hash:    header.BlockHash(),
273
                height:  uint32(height),
274
                txns:    txns,
275
                header:  header,
276
                connect: true,
277
        }:
×
278
        case <-n.quit:
×
279
        }
280
}
281

282
// onFilteredBlockDisconnected is a callback which is executed each time a new
283
// block has been disconnected from the end of the mainchain due to a re-org.
284
func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
285
        header *wire.BlockHeader) {
×
286

×
287
        // Append this new chain update to the end of the queue of new chain
×
288
        // disconnects.
×
289
        select {
×
290
        case n.chainUpdates <- &filteredBlock{
291
                hash:    header.BlockHash(),
292
                height:  uint32(height),
293
                connect: false,
294
        }:
×
295
        case <-n.quit:
×
296
        }
297
}
298

299
// relevantTx represents a relevant transaction to the notifier that fulfills
300
// any outstanding spend requests.
301
type relevantTx struct {
302
        tx      *btcutil.Tx
303
        details *btcjson.BlockDetails
304
}
305

306
// onRelevantTx is a callback that proxies relevant transaction notifications
307
// from the backend to the notifier's main event handler.
308
func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
×
309
        select {
×
310
        case n.txUpdates.ChanIn() <- &relevantTx{tx, details}:
×
311
        case <-n.quit:
×
312
        }
313
}
314

315
// connectFilteredBlock is called when we receive a filteredBlock from the
316
// backend. If the block is ahead of what we're expecting, we'll attempt to
317
// catch up and then process the block.
318
func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) {
×
319
        n.bestBlockMtx.Lock()
×
320
        defer n.bestBlockMtx.Unlock()
×
321

×
322
        if update.height != uint32(n.bestBlock.Height+1) {
×
323
                chainntnfs.Log.Infof("Missed blocks, attempting to catch up")
×
324

×
325
                _, missedBlocks, err := chainntnfs.HandleMissedBlocks(
×
326
                        n.chainConn, n.txNotifier, n.bestBlock,
×
327
                        int32(update.height), false,
×
328
                )
×
329
                if err != nil {
×
330
                        chainntnfs.Log.Error(err)
×
331
                        return
×
332
                }
×
333

334
                for _, block := range missedBlocks {
×
335
                        filteredBlock, err := n.getFilteredBlock(block)
×
336
                        if err != nil {
×
337
                                chainntnfs.Log.Error(err)
×
338
                                return
×
339
                        }
×
340
                        err = n.handleBlockConnected(filteredBlock)
×
341
                        if err != nil {
×
342
                                chainntnfs.Log.Error(err)
×
343
                                return
×
344
                        }
×
345
                }
346
        }
347

348
        err := n.handleBlockConnected(update)
×
349
        if err != nil {
×
350
                chainntnfs.Log.Error(err)
×
351
        }
×
352
}
353

354
// disconnectFilteredBlock is called when our disconnected filtered block
355
// callback is fired. It attempts to rewind the chain to before the
356
// disconnection and updates our best block.
357
func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) {
×
358
        n.bestBlockMtx.Lock()
×
359
        defer n.bestBlockMtx.Unlock()
×
360

×
361
        if update.height != uint32(n.bestBlock.Height) {
×
362
                chainntnfs.Log.Infof("Missed disconnected blocks, attempting" +
×
363
                        " to catch up")
×
364
        }
×
365
        newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier,
×
366
                n.bestBlock, int32(update.height-1),
×
367
        )
×
368
        if err != nil {
×
369
                chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+
×
370
                        "to height %d: %v", n.bestBlock.Height,
×
371
                        update.height-1, err,
×
372
                )
×
373
        }
×
374

375
        n.bestBlock = newBestBlock
×
376
}
377

378
// drainChainUpdates is called after updating the filter. It reads every
379
// buffered item off the chan and returns when no more are available. It is
380
// used to ensure that callers performing a historical scan properly update
381
// their EndHeight to scan blocks that did not have the filter applied at
382
// processing time. Without this, a race condition exists that could allow a
383
// spend or confirmation notification to be missed. It is unlikely this would
384
// occur in a real-world scenario, and instead would manifest itself in tests.
385
func (n *NeutrinoNotifier) drainChainUpdates() {
×
386
        for {
×
387
                select {
×
388
                case update := <-n.chainUpdates:
×
389
                        if update.connect {
×
390
                                n.connectFilteredBlock(update)
×
391
                                break
×
392
                        }
393
                        n.disconnectFilteredBlock(update)
×
394
                default:
×
395
                        return
×
396
                }
397
        }
398
}
399

400
// notificationDispatcher is the primary goroutine which handles client
401
// notification registrations, as well as notification dispatches.
402
func (n *NeutrinoNotifier) notificationDispatcher() {
×
403
        defer n.wg.Done()
×
404

×
405
        for {
×
406
                select {
×
407
                case cancelMsg := <-n.notificationCancels:
×
408
                        switch msg := cancelMsg.(type) {
×
409
                        case *epochCancel:
×
410
                                chainntnfs.Log.Infof("Cancelling epoch "+
×
411
                                        "notification, epoch_id=%v", msg.epochID)
×
412

×
413
                                // First, we'll lookup the original
×
414
                                // registration in order to stop the active
×
415
                                // queue goroutine.
×
416
                                reg := n.blockEpochClients[msg.epochID]
×
417
                                reg.epochQueue.Stop()
×
418

×
419
                                // Next, close the cancel channel for this
×
420
                                // specific client, and wait for the client to
×
421
                                // exit.
×
422
                                close(n.blockEpochClients[msg.epochID].cancelChan)
×
423
                                n.blockEpochClients[msg.epochID].wg.Wait()
×
424

×
425
                                // Once the client has exited, we can then
×
426
                                // safely close the channel used to send epoch
×
427
                                // notifications, in order to notify any
×
428
                                // listeners that the intent has been
×
429
                                // canceled.
×
430
                                close(n.blockEpochClients[msg.epochID].epochChan)
×
431
                                delete(n.blockEpochClients, msg.epochID)
×
432
                        }
433

434
                case registerMsg := <-n.notificationRegistry:
×
435
                        switch msg := registerMsg.(type) {
×
436
                        case *chainntnfs.HistoricalConfDispatch:
×
437
                                // We'll start a historical rescan chain of the
×
438
                                // chain asynchronously to prevent blocking
×
439
                                // potentially long rescans.
×
440
                                n.wg.Add(1)
×
441

×
442
                                //nolint:ll
×
443
                                go func(msg *chainntnfs.HistoricalConfDispatch) {
×
444
                                        defer n.wg.Done()
×
445

×
446
                                        confDetails, err := n.historicalConfDetails(
×
447
                                                msg.ConfRequest,
×
448
                                                msg.StartHeight, msg.EndHeight,
×
449
                                        )
×
450
                                        if err != nil {
×
451
                                                chainntnfs.Log.Error(err)
×
452
                                                return
×
453
                                        }
×
454

455
                                        // If the historical dispatch finished
456
                                        // without error, we will invoke
457
                                        // UpdateConfDetails even if none were
458
                                        // found. This allows the notifier to
459
                                        // begin safely updating the height hint
460
                                        // cache at tip, since any pending
461
                                        // rescans have now completed.
462
                                        err = n.txNotifier.UpdateConfDetails(
×
463
                                                msg.ConfRequest, confDetails,
×
464
                                        )
×
465
                                        if err != nil {
×
466
                                                chainntnfs.Log.Error(err)
×
467
                                        }
×
468
                                }(msg)
469

470
                        case *blockEpochRegistration:
×
471
                                chainntnfs.Log.Infof("New block epoch subscription")
×
472

×
473
                                n.blockEpochClients[msg.epochID] = msg
×
474

×
475
                                // If the client did not provide their best
×
476
                                // known block, then we'll immediately dispatch
×
477
                                // a notification for the current tip.
×
478
                                if msg.bestBlock == nil {
×
479
                                        n.notifyBlockEpochClient(
×
480
                                                msg, n.bestBlock.Height,
×
481
                                                n.bestBlock.Hash,
×
482
                                                n.bestBlock.BlockHeader,
×
483
                                        )
×
484

×
485
                                        msg.errorChan <- nil
×
486
                                        continue
×
487
                                }
488

489
                                // Otherwise, we'll attempt to deliver the
490
                                // backlog of notifications from their best
491
                                // known block.
492
                                n.bestBlockMtx.Lock()
×
493
                                bestHeight := n.bestBlock.Height
×
494
                                n.bestBlockMtx.Unlock()
×
495

×
496
                                missedBlocks, err := chainntnfs.GetClientMissedBlocks(
×
497
                                        n.chainConn, msg.bestBlock, bestHeight,
×
498
                                        false,
×
499
                                )
×
500
                                if err != nil {
×
501
                                        msg.errorChan <- err
×
502
                                        continue
×
503
                                }
504

505
                                for _, block := range missedBlocks {
×
506
                                        n.notifyBlockEpochClient(
×
507
                                                msg, block.Height, block.Hash,
×
508
                                                block.BlockHeader,
×
509
                                        )
×
510
                                }
×
511

512
                                msg.errorChan <- nil
×
513

514
                        case *rescanFilterUpdate:
×
515
                                err := n.chainView.Update(msg.updateOptions...)
×
516
                                if err != nil {
×
517
                                        chainntnfs.Log.Errorf("Unable to "+
×
518
                                                "update rescan filter: %v", err)
×
519
                                }
×
520

521
                                // Drain the chainUpdates chan so the caller
522
                                // listening on errChan can be sure that
523
                                // updates after receiving the error will have
524
                                // the filter applied. This allows the caller
525
                                // to update their EndHeight if they're
526
                                // performing a historical scan.
527
                                n.drainChainUpdates()
×
528

×
529
                                // After draining, send the error to the
×
530
                                // caller.
×
531
                                msg.errChan <- err
×
532
                        }
533

534
                case item := <-n.chainUpdates:
×
535
                        update := item
×
536
                        if update.connect {
×
537
                                n.connectFilteredBlock(update)
×
538
                                continue
×
539
                        }
540

541
                        n.disconnectFilteredBlock(update)
×
542

543
                case txUpdate := <-n.txUpdates.ChanOut():
×
544
                        // A new relevant transaction notification has been
×
545
                        // received from the backend. We'll attempt to process
×
546
                        // it to determine if it fulfills any outstanding
×
547
                        // confirmation and/or spend requests and dispatch
×
548
                        // notifications for them.
×
549
                        update := txUpdate.(*relevantTx)
×
550
                        err := n.txNotifier.ProcessRelevantSpendTx(
×
551
                                update.tx, uint32(update.details.Height),
×
552
                        )
×
553
                        if err != nil {
×
554
                                chainntnfs.Log.Errorf("Unable to process "+
×
555
                                        "transaction %v: %v", update.tx.Hash(),
×
556
                                        err)
×
557
                        }
×
558

559
                case err := <-n.rescanErr:
×
560
                        chainntnfs.Log.Errorf("Error during rescan: %v", err)
×
561

562
                case <-n.quit:
×
563
                        return
×
564

565
                }
566
        }
567
}
568

569
// historicalConfDetails looks up whether a confirmation request (txid/output
570
// script) has already been included in a block in the active chain and, if so,
571
// returns details about said block.
572
func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequest,
573
        startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, error) {
×
574

×
575
        // Starting from the height hint, we'll walk forwards in the chain to
×
576
        // see if this transaction/output script has already been confirmed.
×
577
        for scanHeight := endHeight; scanHeight >= startHeight && scanHeight > 0; scanHeight-- {
×
578
                // Ensure we haven't been requested to shut down before
×
579
                // processing the next height.
×
580
                select {
×
581
                case <-n.quit:
×
582
                        return nil, chainntnfs.ErrChainNotifierShuttingDown
×
583
                default:
×
584
                }
585

586
                // First, we'll fetch the block header for this height so we
587
                // can compute the current block hash.
588
                blockHash, err := n.p2pNode.GetBlockHash(int64(scanHeight))
×
589
                if err != nil {
×
590
                        return nil, fmt.Errorf("unable to get header for "+
×
591
                                "height=%v: %w", scanHeight, err)
×
592
                }
×
593

594
                // With the hash computed, we can now fetch the basic filter for this
595
                // height. Since the range of required items is known we avoid
596
                // roundtrips by requesting a batched response and save bandwidth by
597
                // limiting the max number of items per batch. Since neutrino populates
598
                // its underline filters cache with the batch response, the next call
599
                // will execute a network query only once per batch and not on every
600
                // iteration.
601
                regFilter, err := n.p2pNode.GetCFilter(
×
602
                        *blockHash, wire.GCSFilterRegular,
×
603
                        neutrino.NumRetries(5),
×
604
                        neutrino.OptimisticReverseBatch(),
×
605
                        neutrino.MaxBatchSize(int64(scanHeight-startHeight+1)),
×
606
                )
×
607
                if err != nil {
×
608
                        return nil, fmt.Errorf("unable to retrieve regular "+
×
609
                                "filter for height=%v: %w", scanHeight, err)
×
610
                }
×
611

612
                // In the case that the filter exists, we'll attempt to see if
613
                // any element in it matches our target public key script.
614
                key := builder.DeriveKey(blockHash)
×
615
                match, err := regFilter.Match(key, confRequest.PkScript.Script())
×
616
                if err != nil {
×
617
                        return nil, fmt.Errorf("unable to query filter: %w",
×
618
                                err)
×
619
                }
×
620

621
                // If there's no match, then we can continue forward to the
622
                // next block.
623
                if !match {
×
624
                        continue
×
625
                }
626

627
                // In the case that we do have a match, we'll fetch the block
628
                // from the network so we can find the positional data required
629
                // to send the proper response.
630
                block, err := n.GetBlock(*blockHash)
×
631
                if err != nil {
×
632
                        return nil, fmt.Errorf("unable to get block from "+
×
633
                                "network: %w", err)
×
634
                }
×
635

636
                // For every transaction in the block, check which one matches
637
                // our request. If we find one that does, we can dispatch its
638
                // confirmation details.
639
                for i, tx := range block.Transactions() {
×
640
                        if !confRequest.MatchesTx(tx.MsgTx()) {
×
641
                                continue
×
642
                        }
643

644
                        return &chainntnfs.TxConfirmation{
×
645
                                Tx:          tx.MsgTx().Copy(),
×
646
                                BlockHash:   blockHash,
×
647
                                BlockHeight: scanHeight,
×
648
                                TxIndex:     uint32(i),
×
649
                                Block:       block.MsgBlock(),
×
650
                        }, nil
×
651
                }
652
        }
653

654
        return nil, nil
×
655
}
656

657
// handleBlockConnected applies a chain update for a new block. Any watched
658
// transactions included this block will processed to either send notifications
659
// now or after numConfirmations confs.
660
//
661
// NOTE: This method must be called with the bestBlockMtx lock held.
662
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
×
663
        // We'll extend the txNotifier's height with the information of this
×
664
        // new block, which will handle all of the notification logic for us.
×
665
        //
×
666
        // We actually need the _full_ block here as well in order to be able
×
667
        // to send the full block back up to the client. The neutrino client
×
668
        // itself will only dispatch a block if one of the items we're looking
×
669
        // for matches, so ultimately passing it the full block will still only
×
670
        // result in the items we care about being dispatched.
×
671
        rawBlock, err := n.GetBlock(newBlock.hash)
×
672
        if err != nil {
×
673
                return fmt.Errorf("unable to get full block: %w", err)
×
674
        }
×
675
        err = n.txNotifier.ConnectTip(rawBlock, newBlock.height)
×
676
        if err != nil {
×
677
                return fmt.Errorf("unable to connect tip: %w", err)
×
678
        }
×
679

680
        chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
×
681
                newBlock.hash)
×
682

×
683
        // Now that we've guaranteed the new block extends the txNotifier's
×
684
        // current tip, we'll proceed to dispatch notifications to all of our
×
685
        // registered clients whom have had notifications fulfilled. Before
×
686
        // doing so, we'll make sure update our in memory state in order to
×
687
        // satisfy any client requests based upon the new block.
×
688
        n.bestBlock.Hash = &newBlock.hash
×
689
        n.bestBlock.Height = int32(newBlock.height)
×
690
        n.bestBlock.BlockHeader = newBlock.header
×
691

×
692
        err = n.txNotifier.NotifyHeight(newBlock.height)
×
693
        if err != nil {
×
694
                return fmt.Errorf("unable to notify height: %w", err)
×
695
        }
×
696

697
        n.notifyBlockEpochs(
×
698
                int32(newBlock.height), &newBlock.hash, newBlock.header,
×
699
        )
×
700

×
701
        return nil
×
702
}
703

704
// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
705
func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) {
×
706
        rawBlock, err := n.GetBlock(*epoch.Hash)
×
707
        if err != nil {
×
708
                return nil, fmt.Errorf("unable to get block: %w", err)
×
709
        }
×
710

711
        txns := rawBlock.Transactions()
×
712

×
713
        block := &filteredBlock{
×
714
                hash:    *epoch.Hash,
×
715
                height:  uint32(epoch.Height),
×
716
                header:  &rawBlock.MsgBlock().Header,
×
717
                txns:    txns,
×
718
                connect: true,
×
719
        }
×
720
        return block, nil
×
721
}
722

723
// notifyBlockEpochs notifies all registered block epoch clients of the newly
724
// connected block to the main chain.
725
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
726
        blockHeader *wire.BlockHeader) {
×
727

×
728
        for _, client := range n.blockEpochClients {
×
729
                n.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
×
730
        }
×
731
}
732

733
// notifyBlockEpochClient sends a registered block epoch client a notification
734
// about a specific block.
735
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
736
        height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {
×
737

×
738
        epoch := &chainntnfs.BlockEpoch{
×
739
                Height:      height,
×
740
                Hash:        sha,
×
741
                BlockHeader: blockHeader,
×
742
        }
×
743

×
744
        select {
×
745
        case epochClient.epochQueue.ChanIn() <- epoch:
×
746
        case <-epochClient.cancelChan:
×
747
        case <-n.quit:
×
748
        }
749
}
750

751
// RegisterSpendNtfn registers an intent to be notified once the target
752
// outpoint/output script has been spent by a transaction on-chain. When
753
// intending to be notified of the spend of an output script, a nil outpoint
754
// must be used. The heightHint should represent the earliest height in the
755
// chain of the transaction that spent the outpoint/output script.
756
//
757
// Once a spend of has been detected, the details of the spending event will be
758
// sent across the 'Spend' channel.
759
func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
760
        pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
×
761

×
762
        // Register the conf notification with the TxNotifier. A non-nil value
×
763
        // for `dispatch` will be returned if we are required to perform a
×
764
        // manual scan for the confirmation. Otherwise the notifier will begin
×
765
        // watching at tip for the transaction to confirm.
×
766
        ntfn, err := n.txNotifier.RegisterSpend(outpoint, pkScript, heightHint)
×
767
        if err != nil {
×
768
                return nil, err
×
769
        }
×
770

771
        // To determine whether this outpoint has been spent on-chain, we'll
772
        // update our filter to watch for the transaction at tip and we'll also
773
        // dispatch a historical rescan to determine if it has been spent in the
774
        // past.
775
        //
776
        // We'll update our filter first to ensure we can immediately detect the
777
        // spend at tip.
778
        if outpoint == nil {
×
779
                outpoint = &chainntnfs.ZeroOutPoint
×
780
        }
×
781
        inputToWatch := neutrino.InputWithScript{
×
782
                OutPoint: *outpoint,
×
783
                PkScript: pkScript,
×
784
        }
×
785
        updateOptions := []neutrino.UpdateOption{
×
786
                neutrino.AddInputs(inputToWatch),
×
787
                neutrino.DisableDisconnectedNtfns(true),
×
788
        }
×
789

×
790
        // We'll use the txNotifier's tip as the starting point of our filter
×
791
        // update. In the case of an output script spend request, we'll check if
×
792
        // we should perform a historical rescan and start from there, as we
×
793
        // cannot do so with GetUtxo since it matches outpoints.
×
794
        rewindHeight := ntfn.Height
×
795
        if ntfn.HistoricalDispatch != nil && *outpoint == chainntnfs.ZeroOutPoint {
×
796
                rewindHeight = ntfn.HistoricalDispatch.StartHeight
×
797
        }
×
798
        updateOptions = append(updateOptions, neutrino.Rewind(rewindHeight))
×
799

×
800
        errChan := make(chan error, 1)
×
801
        select {
×
802
        case n.notificationRegistry <- &rescanFilterUpdate{
803
                updateOptions: updateOptions,
804
                errChan:       errChan,
805
        }:
×
806
        case <-n.quit:
×
807
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
808
        }
809

810
        select {
×
811
        case err = <-errChan:
×
812
        case <-n.quit:
×
813
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
814
        }
815
        if err != nil {
×
816
                return nil, fmt.Errorf("unable to update filter: %w", err)
×
817
        }
×
818

819
        // If the txNotifier didn't return any details to perform a historical
820
        // scan of the chain, or if we already performed one like in the case of
821
        // output script spend requests, then we can return early as there's
822
        // nothing left for us to do.
823
        if ntfn.HistoricalDispatch == nil || *outpoint == chainntnfs.ZeroOutPoint {
×
824
                return ntfn.Event, nil
×
825
        }
×
826

827
        // Grab the current best height as the height may have been updated
828
        // while we were draining the chainUpdates queue.
829
        n.bestBlockMtx.RLock()
×
830
        currentHeight := uint32(n.bestBlock.Height)
×
831
        n.bestBlockMtx.RUnlock()
×
832

×
833
        ntfn.HistoricalDispatch.EndHeight = currentHeight
×
834

×
835
        // With the filter updated, we'll dispatch our historical rescan to
×
836
        // ensure we detect the spend if it happened in the past.
×
837
        n.wg.Add(1)
×
838
        go func() {
×
839
                defer n.wg.Done()
×
840

×
841
                // We'll ensure that neutrino is caught up to the starting
×
842
                // height before we attempt to fetch the UTXO from the chain.
×
843
                // If we're behind, then we may miss a notification dispatch.
×
844
                for {
×
845
                        n.bestBlockMtx.RLock()
×
846
                        currentHeight := uint32(n.bestBlock.Height)
×
847
                        n.bestBlockMtx.RUnlock()
×
848

×
849
                        if currentHeight >= ntfn.HistoricalDispatch.StartHeight {
×
850
                                break
×
851
                        }
852

853
                        select {
×
854
                        case <-time.After(time.Millisecond * 200):
×
855
                        case <-n.quit:
×
856
                                return
×
857
                        }
858
                }
859

860
                spendReport, err := n.p2pNode.GetUtxo(
×
861
                        neutrino.WatchInputs(inputToWatch),
×
862
                        neutrino.StartBlock(&headerfs.BlockStamp{
×
863
                                Height: int32(ntfn.HistoricalDispatch.StartHeight),
×
864
                        }),
×
865
                        neutrino.EndBlock(&headerfs.BlockStamp{
×
866
                                Height: int32(ntfn.HistoricalDispatch.EndHeight),
×
867
                        }),
×
868
                        neutrino.ProgressHandler(func(processedHeight uint32) {
×
869
                                // We persist the rescan progress to achieve incremental
×
870
                                // behavior across restarts, otherwise long rescans may
×
871
                                // start from the beginning with every restart.
×
872
                                err := n.spendHintCache.CommitSpendHint(
×
873
                                        processedHeight,
×
874
                                        ntfn.HistoricalDispatch.SpendRequest)
×
875
                                if err != nil {
×
876
                                        chainntnfs.Log.Errorf("Failed to update rescan "+
×
877
                                                "progress: %v", err)
×
878
                                }
×
879
                        }),
880
                        neutrino.QuitChan(n.quit),
881
                )
882
                if err != nil && !strings.Contains(err.Error(), "not found") {
×
883
                        chainntnfs.Log.Errorf("Failed getting UTXO: %v", err)
×
884
                        return
×
885
                }
×
886

887
                // If a spend report was returned, and the transaction is present, then
888
                // this means that the output is already spent.
889
                var spendDetails *chainntnfs.SpendDetail
×
890
                if spendReport != nil && spendReport.SpendingTx != nil {
×
891
                        spendingTxHash := spendReport.SpendingTx.TxHash()
×
892
                        spendDetails = &chainntnfs.SpendDetail{
×
893
                                SpentOutPoint:     outpoint,
×
894
                                SpenderTxHash:     &spendingTxHash,
×
895
                                SpendingTx:        spendReport.SpendingTx,
×
896
                                SpenderInputIndex: spendReport.SpendingInputIndex,
×
897
                                SpendingHeight:    int32(spendReport.SpendingTxHeight),
×
898
                        }
×
899
                }
×
900

901
                // Finally, no matter whether the rescan found a spend in the past or
902
                // not, we'll mark our historical rescan as complete to ensure the
903
                // outpoint's spend hint gets updated upon connected/disconnected
904
                // blocks.
905
                err = n.txNotifier.UpdateSpendDetails(
×
906
                        ntfn.HistoricalDispatch.SpendRequest, spendDetails,
×
907
                )
×
908
                if err != nil {
×
909
                        chainntnfs.Log.Errorf("Failed to update spend details: %v", err)
×
910
                        return
×
911
                }
×
912
        }()
913

914
        return ntfn.Event, nil
×
915
}
916

917
// RegisterConfirmationsNtfn registers an intent to be notified once the target
918
// txid/output script has reached numConfs confirmations on-chain. When
919
// intending to be notified of the confirmation of an output script, a nil txid
920
// must be used. The heightHint should represent the earliest height at which
921
// the txid/output script could have been included in the chain.
922
//
923
// Progress on the number of confirmations left can be read from the 'Updates'
924
// channel. Once it has reached all of its confirmations, a notification will be
925
// sent across the 'Confirmed' channel.
926
func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
927
        pkScript []byte, numConfs, heightHint uint32,
928
        opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) {
×
929

×
930
        // Register the conf notification with the TxNotifier. A non-nil value
×
931
        // for `dispatch` will be returned if we are required to perform a
×
932
        // manual scan for the confirmation. Otherwise the notifier will begin
×
933
        // watching at tip for the transaction to confirm.
×
934
        ntfn, err := n.txNotifier.RegisterConf(
×
935
                txid, pkScript, numConfs, heightHint, opts...,
×
936
        )
×
937
        if err != nil {
×
938
                return nil, err
×
939
        }
×
940

941
        // To determine whether this transaction has confirmed on-chain, we'll
942
        // update our filter to watch for the transaction at tip and we'll also
943
        // dispatch a historical rescan to determine if it has confirmed in the
944
        // past.
945
        //
946
        // We'll update our filter first to ensure we can immediately detect the
947
        // confirmation at tip. To do so, we'll map the script into an address
948
        // type so we can instruct neutrino to match if the transaction
949
        // containing the script is found in a block.
950
        params := n.p2pNode.ChainParams()
×
951
        _, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, &params)
×
952
        if err != nil {
×
953
                return nil, fmt.Errorf("unable to extract script: %w", err)
×
954
        }
×
955

956
        // We'll send the filter update request to the notifier's main event
957
        // handler and wait for its response.
958
        errChan := make(chan error, 1)
×
959
        select {
×
960
        case n.notificationRegistry <- &rescanFilterUpdate{
961
                updateOptions: []neutrino.UpdateOption{
962
                        neutrino.AddAddrs(addrs...),
963
                        neutrino.Rewind(ntfn.Height),
964
                        neutrino.DisableDisconnectedNtfns(true),
965
                },
966
                errChan: errChan,
967
        }:
×
968
        case <-n.quit:
×
969
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
970
        }
971

972
        select {
×
973
        case err = <-errChan:
×
974
        case <-n.quit:
×
975
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
976
        }
977
        if err != nil {
×
978
                return nil, fmt.Errorf("unable to update filter: %w", err)
×
979
        }
×
980

981
        // If a historical rescan was not requested by the txNotifier, then we
982
        // can return to the caller.
983
        if ntfn.HistoricalDispatch == nil {
×
984
                return ntfn.Event, nil
×
985
        }
×
986

987
        // Grab the current best height as the height may have been updated
988
        // while we were draining the chainUpdates queue.
989
        n.bestBlockMtx.RLock()
×
990
        currentHeight := uint32(n.bestBlock.Height)
×
991
        n.bestBlockMtx.RUnlock()
×
992

×
993
        ntfn.HistoricalDispatch.EndHeight = currentHeight
×
994

×
995
        // Finally, with the filter updated, we can dispatch the historical
×
996
        // rescan to ensure we can detect if the event happened in the past.
×
997
        select {
×
998
        case n.notificationRegistry <- ntfn.HistoricalDispatch:
×
999
        case <-n.quit:
×
1000
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
1001
        }
1002

1003
        return ntfn.Event, nil
×
1004
}
1005

1006
// GetBlock is used to retrieve the block with the given hash. Since the block
1007
// cache used by neutrino will be the same as that used by LND (since it is
1008
// passed to neutrino on initialisation), the neutrino GetBlock method can be
1009
// called directly since it already uses the block cache. However, neutrino
1010
// does not lock the block cache mutex for the given block hash and so that is
1011
// done here.
1012
func (n *NeutrinoNotifier) GetBlock(hash chainhash.Hash) (
1013
        *btcutil.Block, error) {
×
1014

×
1015
        n.blockCache.HashMutex.Lock(lntypes.Hash(hash))
×
1016
        defer n.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
×
1017

×
1018
        return n.p2pNode.GetBlock(hash)
×
1019
}
×
1020

1021
// blockEpochRegistration represents a client's intent to receive a
1022
// notification with each newly connected block.
1023
type blockEpochRegistration struct {
1024
        epochID uint64
1025

1026
        epochChan chan *chainntnfs.BlockEpoch
1027

1028
        epochQueue *queue.ConcurrentQueue
1029

1030
        cancelChan chan struct{}
1031

1032
        bestBlock *chainntnfs.BlockEpoch
1033

1034
        errorChan chan error
1035

1036
        wg sync.WaitGroup
1037
}
1038

1039
// epochCancel is a message sent to the NeutrinoNotifier when a client wishes
1040
// to cancel an outstanding epoch notification that has yet to be dispatched.
1041
type epochCancel struct {
1042
        epochID uint64
1043
}
1044

1045
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
1046
// caller to receive notifications, of each new block connected to the main
1047
// chain. Clients have the option of passing in their best known block, which
1048
// the notifier uses to check if they are behind on blocks and catch them up. If
1049
// they do not provide one, then a notification will be dispatched immediately
1050
// for the current tip of the chain upon a successful registration.
1051
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn(
1052
        bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
×
1053

×
1054
        reg := &blockEpochRegistration{
×
1055
                epochQueue: queue.NewConcurrentQueue(20),
×
1056
                epochChan:  make(chan *chainntnfs.BlockEpoch, 20),
×
1057
                cancelChan: make(chan struct{}),
×
1058
                epochID:    atomic.AddUint64(&n.epochClientCounter, 1),
×
1059
                bestBlock:  bestBlock,
×
1060
                errorChan:  make(chan error, 1),
×
1061
        }
×
1062
        reg.epochQueue.Start()
×
1063

×
1064
        // Before we send the request to the main goroutine, we'll launch a new
×
1065
        // goroutine to proxy items added to our queue to the client itself.
×
1066
        // This ensures that all notifications are received *in order*.
×
1067
        reg.wg.Add(1)
×
1068
        go func() {
×
1069
                defer reg.wg.Done()
×
1070

×
1071
                for {
×
1072
                        select {
×
1073
                        case ntfn := <-reg.epochQueue.ChanOut():
×
1074
                                blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
×
1075
                                select {
×
1076
                                case reg.epochChan <- blockNtfn:
×
1077

1078
                                case <-reg.cancelChan:
×
1079
                                        return
×
1080

1081
                                case <-n.quit:
×
1082
                                        return
×
1083
                                }
1084

1085
                        case <-reg.cancelChan:
×
1086
                                return
×
1087

1088
                        case <-n.quit:
×
1089
                                return
×
1090
                        }
1091
                }
1092
        }()
1093

1094
        select {
×
1095
        case <-n.quit:
×
1096
                // As we're exiting before the registration could be sent,
×
1097
                // we'll stop the queue now ourselves.
×
1098
                reg.epochQueue.Stop()
×
1099

×
1100
                return nil, errors.New("chainntnfs: system interrupt while " +
×
1101
                        "attempting to register for block epoch notification.")
×
1102
        case n.notificationRegistry <- reg:
×
1103
                return &chainntnfs.BlockEpochEvent{
×
1104
                        Epochs: reg.epochChan,
×
1105
                        Cancel: func() {
×
1106
                                cancel := &epochCancel{
×
1107
                                        epochID: reg.epochID,
×
1108
                                }
×
1109

×
1110
                                // Submit epoch cancellation to notification dispatcher.
×
1111
                                select {
×
1112
                                case n.notificationCancels <- cancel:
×
1113
                                        // Cancellation is being handled, drain the epoch channel until it is
×
1114
                                        // closed before yielding to caller.
×
1115
                                        for {
×
1116
                                                select {
×
1117
                                                case _, ok := <-reg.epochChan:
×
1118
                                                        if !ok {
×
1119
                                                                return
×
1120
                                                        }
×
1121
                                                case <-n.quit:
×
1122
                                                        return
×
1123
                                                }
1124
                                        }
1125
                                case <-n.quit:
×
1126
                                }
1127
                        },
1128
                }, nil
1129
        }
1130
}
1131

1132
// NeutrinoChainConn is a wrapper around neutrino's chain backend in order
1133
// to satisfy the chainntnfs.ChainConn interface.
1134
type NeutrinoChainConn struct {
1135
        p2pNode *neutrino.ChainService
1136
}
1137

1138
// GetBlockHeader returns the block header for a hash.
1139
func (n *NeutrinoChainConn) GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) {
×
1140
        return n.p2pNode.GetBlockHeader(blockHash)
×
1141
}
×
1142

1143
// GetBlockHeaderVerbose returns a verbose block header result for a hash. This
1144
// result only contains the height with a nil hash.
1145
func (n *NeutrinoChainConn) GetBlockHeaderVerbose(blockHash *chainhash.Hash) (
1146
        *btcjson.GetBlockHeaderVerboseResult, error) {
×
1147

×
1148
        height, err := n.p2pNode.GetBlockHeight(blockHash)
×
1149
        if err != nil {
×
1150
                return nil, err
×
1151
        }
×
1152
        // Since only the height is used from the result, leave the hash nil.
1153
        return &btcjson.GetBlockHeaderVerboseResult{Height: int32(height)}, nil
×
1154
}
1155

1156
// GetBlockHash returns the hash from a block height.
1157
func (n *NeutrinoChainConn) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
×
1158
        return n.p2pNode.GetBlockHash(blockHeight)
×
1159
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc