• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.44
/chainntnfs/neutrinonotify/neutrino.go
1
package neutrinonotify
2

3
import (
4
        "errors"
5
        "fmt"
6
        "strings"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcjson"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/rpcclient"
16
        "github.com/btcsuite/btcd/txscript"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightninglabs/neutrino"
19
        "github.com/lightninglabs/neutrino/headerfs"
20
        "github.com/lightningnetwork/lnd/blockcache"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/lntypes"
23
        "github.com/lightningnetwork/lnd/queue"
24
)
25

26
const (
27
        // notifierType uniquely identifies this concrete implementation of the
28
        // ChainNotifier interface.
29
        notifierType = "neutrino"
30
)
31

32
// NeutrinoNotifier is a version of ChainNotifier that's backed by the neutrino
33
// Bitcoin light client. Unlike other implementations, this implementation
34
// speaks directly to the p2p network. As a result, this implementation of the
35
// ChainNotifier interface is much more light weight that other implementation
36
// which rely of receiving notification over an RPC interface backed by a
37
// running full node.
38
//
39
// TODO(roasbeef): heavily consolidate with NeutrinoNotifier code
40
//   - maybe combine into single package?
41
type NeutrinoNotifier struct {
42
        epochClientCounter uint64 // To be used atomically.
43

44
        start   sync.Once
45
        active  int32 // To be used atomically.
46
        stopped int32 // To be used atomically.
47

48
        bestBlockMtx sync.RWMutex
49
        bestBlock    chainntnfs.BlockEpoch
50

51
        p2pNode   *neutrino.ChainService
52
        chainView *neutrino.Rescan
53

54
        chainConn *NeutrinoChainConn
55

56
        notificationCancels  chan interface{}
57
        notificationRegistry chan interface{}
58

59
        txNotifier *chainntnfs.TxNotifier
60

61
        blockEpochClients map[uint64]*blockEpochRegistration
62

63
        rescanErr <-chan error
64

65
        chainUpdates chan *filteredBlock
66

67
        txUpdates *queue.ConcurrentQueue
68

69
        // spendHintCache is a cache used to query and update the latest height
70
        // hints for an outpoint. Each height hint represents the earliest
71
        // height at which the outpoint could have been spent within the chain.
72
        spendHintCache chainntnfs.SpendHintCache
73

74
        // confirmHintCache is a cache used to query the latest height hints for
75
        // a transaction. Each height hint represents the earliest height at
76
        // which the transaction could have confirmed within the chain.
77
        confirmHintCache chainntnfs.ConfirmHintCache
78

79
        // blockCache is an LRU block cache.
80
        blockCache *blockcache.BlockCache
81

82
        wg   sync.WaitGroup
83
        quit chan struct{}
84
}
85

86
// Ensure NeutrinoNotifier implements the ChainNotifier interface at compile time.
87
var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil)
88

89
// New creates a new instance of the NeutrinoNotifier concrete implementation
90
// of the ChainNotifier interface.
91
//
92
// NOTE: The passed neutrino node should already be running and active before
93
// being passed into this function.
94
func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
95
        confirmHintCache chainntnfs.ConfirmHintCache,
96
        blockCache *blockcache.BlockCache) *NeutrinoNotifier {
1✔
97

1✔
98
        return &NeutrinoNotifier{
1✔
99
                notificationCancels:  make(chan interface{}),
1✔
100
                notificationRegistry: make(chan interface{}),
1✔
101

1✔
102
                blockEpochClients: make(map[uint64]*blockEpochRegistration),
1✔
103

1✔
104
                p2pNode:   node,
1✔
105
                chainConn: &NeutrinoChainConn{node},
1✔
106

1✔
107
                rescanErr: make(chan error),
1✔
108

1✔
109
                chainUpdates: make(chan *filteredBlock, 100),
1✔
110

1✔
111
                txUpdates: queue.NewConcurrentQueue(10),
1✔
112

1✔
113
                spendHintCache:   spendHintCache,
1✔
114
                confirmHintCache: confirmHintCache,
1✔
115

1✔
116
                blockCache: blockCache,
1✔
117

1✔
118
                quit: make(chan struct{}),
1✔
119
        }
1✔
120
}
1✔
121

122
// Start contacts the running neutrino light client and kicks off an initial
123
// empty rescan.
124
func (n *NeutrinoNotifier) Start() error {
1✔
125
        var startErr error
1✔
126
        n.start.Do(func() {
2✔
127
                startErr = n.startNotifier()
1✔
128
        })
1✔
129
        return startErr
1✔
130
}
131

132
// Stop shuts down the NeutrinoNotifier.
133
func (n *NeutrinoNotifier) Stop() error {
1✔
134
        // Already shutting down?
1✔
135
        if atomic.AddInt32(&n.stopped, 1) != 1 {
1✔
136
                return nil
×
137
        }
×
138

139
        chainntnfs.Log.Info("neutrino notifier shutting down...")
1✔
140
        defer chainntnfs.Log.Debug("neutrino notifier shutdown complete")
1✔
141

1✔
142
        close(n.quit)
1✔
143
        n.wg.Wait()
1✔
144

1✔
145
        n.txUpdates.Stop()
1✔
146

1✔
147
        // Notify all pending clients of our shutdown by closing the related
1✔
148
        // notification channels.
1✔
149
        for _, epochClient := range n.blockEpochClients {
2✔
150
                close(epochClient.cancelChan)
1✔
151
                epochClient.wg.Wait()
1✔
152

1✔
153
                close(epochClient.epochChan)
1✔
154
        }
1✔
155

156
        // The txNotifier is only initialized in the start method therefore we
157
        // need to make sure we don't access a nil pointer here.
158
        if n.txNotifier != nil {
2✔
159
                n.txNotifier.TearDown()
1✔
160
        }
1✔
161

162
        return nil
1✔
163
}
164

165
// Started returns true if this instance has been started, and false otherwise.
166
func (n *NeutrinoNotifier) Started() bool {
1✔
167
        return atomic.LoadInt32(&n.active) != 0
1✔
168
}
1✔
169

170
func (n *NeutrinoNotifier) startNotifier() error {
1✔
171
        chainntnfs.Log.Infof("neutrino notifier starting...")
1✔
172

1✔
173
        // Start our concurrent queues before starting the rescan, to ensure
1✔
174
        // onFilteredBlockConnected and onRelavantTx callbacks won't be
1✔
175
        // blocked.
1✔
176
        n.txUpdates.Start()
1✔
177

1✔
178
        // First, we'll obtain the latest block height of the p2p node. We'll
1✔
179
        // start the auto-rescan from this point. Once a caller actually wishes
1✔
180
        // to register a chain view, the rescan state will be rewound
1✔
181
        // accordingly.
1✔
182
        startingPoint, err := n.p2pNode.BestBlock()
1✔
183
        if err != nil {
1✔
184
                n.txUpdates.Stop()
×
185
                return err
×
186
        }
×
187
        startingHeader, err := n.p2pNode.GetBlockHeader(
1✔
188
                &startingPoint.Hash,
1✔
189
        )
1✔
190
        if err != nil {
1✔
191
                n.txUpdates.Stop()
×
192
                return err
×
193
        }
×
194

195
        n.bestBlock.Hash = &startingPoint.Hash
1✔
196
        n.bestBlock.Height = startingPoint.Height
1✔
197
        n.bestBlock.BlockHeader = startingHeader
1✔
198

1✔
199
        n.txNotifier = chainntnfs.NewTxNotifier(
1✔
200
                uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit,
1✔
201
                n.confirmHintCache, n.spendHintCache,
1✔
202
        )
1✔
203

1✔
204
        // Next, we'll create our set of rescan options. Currently it's
1✔
205
        // required that a user MUST set an addr/outpoint/txid when creating a
1✔
206
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
1✔
207
        // actually be matched.
1✔
208
        var zeroInput neutrino.InputWithScript
1✔
209
        rescanOptions := []neutrino.RescanOption{
1✔
210
                neutrino.StartBlock(startingPoint),
1✔
211
                neutrino.QuitChan(n.quit),
1✔
212
                neutrino.NotificationHandlers(
1✔
213
                        rpcclient.NotificationHandlers{
1✔
214
                                OnFilteredBlockConnected:    n.onFilteredBlockConnected,
1✔
215
                                OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected,
1✔
216
                                OnRedeemingTx:               n.onRelevantTx,
1✔
217
                        },
1✔
218
                ),
1✔
219
                neutrino.WatchInputs(zeroInput),
1✔
220
        }
1✔
221

1✔
222
        // Finally, we'll create our rescan struct, start it, and launch all
1✔
223
        // the goroutines we need to operate this ChainNotifier instance.
1✔
224
        n.chainView = neutrino.NewRescan(
1✔
225
                &neutrino.RescanChainSource{
1✔
226
                        ChainService: n.p2pNode,
1✔
227
                },
1✔
228
                rescanOptions...,
1✔
229
        )
1✔
230
        n.rescanErr = n.chainView.Start()
1✔
231

1✔
232
        n.wg.Add(1)
1✔
233
        go n.notificationDispatcher()
1✔
234

1✔
235
        // Set the active flag now that we've completed the full
1✔
236
        // startup.
1✔
237
        atomic.StoreInt32(&n.active, 1)
1✔
238

1✔
239
        chainntnfs.Log.Debugf("neutrino notifier started")
1✔
240

1✔
241
        return nil
1✔
242
}
243

244
// filteredBlock represents a new block which has been connected to the main
245
// chain. The slice of transactions will only be populated if the block
246
// includes a transaction that confirmed one of our watched txids, or spends
247
// one of the outputs currently being watched.
248
type filteredBlock struct {
249
        header *wire.BlockHeader
250
        hash   chainhash.Hash
251
        height uint32
252
        txns   []*btcutil.Tx
253

254
        // connected is true if this update is a new block and false if it is a
255
        // disconnected block.
256
        connect bool
257
}
258

259
// rescanFilterUpdate represents a request that will be sent to the
260
// notificaionRegistry in order to prevent race conditions between the filter
261
// update and new block notifications.
262
type rescanFilterUpdate struct {
263
        updateOptions []neutrino.UpdateOption
264
        errChan       chan error
265
}
266

267
// onFilteredBlockConnected is a callback which is executed each a new block is
268
// connected to the end of the main chain.
269
func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
270
        header *wire.BlockHeader, txns []*btcutil.Tx) {
1✔
271

1✔
272
        // Append this new chain update to the end of the queue of new chain
1✔
273
        // updates.
1✔
274
        select {
1✔
275
        case n.chainUpdates <- &filteredBlock{
276
                hash:    header.BlockHash(),
277
                height:  uint32(height),
278
                txns:    txns,
279
                header:  header,
280
                connect: true,
281
        }:
1✔
282
        case <-n.quit:
×
283
        }
284
}
285

286
// onFilteredBlockDisconnected is a callback which is executed each time a new
287
// block has been disconnected from the end of the mainchain due to a re-org.
288
func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
UNCOV
289
        header *wire.BlockHeader) {
×
UNCOV
290

×
UNCOV
291
        // Append this new chain update to the end of the queue of new chain
×
UNCOV
292
        // disconnects.
×
UNCOV
293
        select {
×
294
        case n.chainUpdates <- &filteredBlock{
295
                hash:    header.BlockHash(),
296
                height:  uint32(height),
297
                connect: false,
UNCOV
298
        }:
×
299
        case <-n.quit:
×
300
        }
301
}
302

303
// relevantTx represents a relevant transaction to the notifier that fulfills
304
// any outstanding spend requests.
305
type relevantTx struct {
306
        tx      *btcutil.Tx
307
        details *btcjson.BlockDetails
308
}
309

310
// onRelevantTx is a callback that proxies relevant transaction notifications
311
// from the backend to the notifier's main event handler.
312
func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
1✔
313
        select {
1✔
314
        case n.txUpdates.ChanIn() <- &relevantTx{tx, details}:
1✔
315
        case <-n.quit:
×
316
        }
317
}
318

319
// connectFilteredBlock is called when we receive a filteredBlock from the
320
// backend. If the block is ahead of what we're expecting, we'll attempt to
321
// catch up and then process the block.
322
func (n *NeutrinoNotifier) connectFilteredBlock(update *filteredBlock) {
1✔
323
        n.bestBlockMtx.Lock()
1✔
324
        defer n.bestBlockMtx.Unlock()
1✔
325

1✔
326
        if update.height != uint32(n.bestBlock.Height+1) {
2✔
327
                chainntnfs.Log.Infof("Missed blocks, attempting to catch up")
1✔
328

1✔
329
                _, missedBlocks, err := chainntnfs.HandleMissedBlocks(
1✔
330
                        n.chainConn, n.txNotifier, n.bestBlock,
1✔
331
                        int32(update.height), false,
1✔
332
                )
1✔
333
                if err != nil {
2✔
334
                        chainntnfs.Log.Error(err)
1✔
335
                        return
1✔
336
                }
1✔
337

UNCOV
338
                for _, block := range missedBlocks {
×
UNCOV
339
                        filteredBlock, err := n.getFilteredBlock(block)
×
UNCOV
340
                        if err != nil {
×
341
                                chainntnfs.Log.Error(err)
×
342
                                return
×
343
                        }
×
UNCOV
344
                        err = n.handleBlockConnected(filteredBlock)
×
UNCOV
345
                        if err != nil {
×
346
                                chainntnfs.Log.Error(err)
×
347
                                return
×
348
                        }
×
349
                }
350
        }
351

352
        err := n.handleBlockConnected(update)
1✔
353
        if err != nil {
1✔
354
                chainntnfs.Log.Error(err)
×
355
        }
×
356
}
357

358
// disconnectFilteredBlock is called when our disconnected filtered block
359
// callback is fired. It attempts to rewind the chain to before the
360
// disconnection and updates our best block.
UNCOV
361
func (n *NeutrinoNotifier) disconnectFilteredBlock(update *filteredBlock) {
×
UNCOV
362
        n.bestBlockMtx.Lock()
×
UNCOV
363
        defer n.bestBlockMtx.Unlock()
×
UNCOV
364

×
UNCOV
365
        if update.height != uint32(n.bestBlock.Height) {
×
366
                chainntnfs.Log.Infof("Missed disconnected blocks, attempting" +
×
367
                        " to catch up")
×
368
        }
×
UNCOV
369
        newBestBlock, err := chainntnfs.RewindChain(n.chainConn, n.txNotifier,
×
UNCOV
370
                n.bestBlock, int32(update.height-1),
×
UNCOV
371
        )
×
UNCOV
372
        if err != nil {
×
373
                chainntnfs.Log.Errorf("Unable to rewind chain from height %d"+
×
374
                        "to height %d: %v", n.bestBlock.Height,
×
375
                        update.height-1, err,
×
376
                )
×
377
        }
×
378

UNCOV
379
        n.bestBlock = newBestBlock
×
380
}
381

382
// drainChainUpdates is called after updating the filter. It reads every
383
// buffered item off the chan and returns when no more are available. It is
384
// used to ensure that callers performing a historical scan properly update
385
// their EndHeight to scan blocks that did not have the filter applied at
386
// processing time. Without this, a race condition exists that could allow a
387
// spend or confirmation notification to be missed. It is unlikely this would
388
// occur in a real-world scenario, and instead would manifest itself in tests.
389
func (n *NeutrinoNotifier) drainChainUpdates() {
1✔
390
        for {
2✔
391
                select {
1✔
392
                case update := <-n.chainUpdates:
1✔
393
                        if update.connect {
2✔
394
                                n.connectFilteredBlock(update)
1✔
395
                                break
1✔
396
                        }
397
                        n.disconnectFilteredBlock(update)
×
398
                default:
1✔
399
                        return
1✔
400
                }
401
        }
402
}
403

404
// notificationDispatcher is the primary goroutine which handles client
405
// notification registrations, as well as notification dispatches.
406
func (n *NeutrinoNotifier) notificationDispatcher() {
1✔
407
        defer n.wg.Done()
1✔
408

1✔
409
        for {
2✔
410
                select {
1✔
411
                case cancelMsg := <-n.notificationCancels:
1✔
412
                        switch msg := cancelMsg.(type) {
1✔
413
                        case *epochCancel:
1✔
414
                                chainntnfs.Log.Infof("Cancelling epoch "+
1✔
415
                                        "notification, epoch_id=%v", msg.epochID)
1✔
416

1✔
417
                                // First, we'll lookup the original
1✔
418
                                // registration in order to stop the active
1✔
419
                                // queue goroutine.
1✔
420
                                reg := n.blockEpochClients[msg.epochID]
1✔
421
                                reg.epochQueue.Stop()
1✔
422

1✔
423
                                // Next, close the cancel channel for this
1✔
424
                                // specific client, and wait for the client to
1✔
425
                                // exit.
1✔
426
                                close(n.blockEpochClients[msg.epochID].cancelChan)
1✔
427
                                n.blockEpochClients[msg.epochID].wg.Wait()
1✔
428

1✔
429
                                // Once the client has exited, we can then
1✔
430
                                // safely close the channel used to send epoch
1✔
431
                                // notifications, in order to notify any
1✔
432
                                // listeners that the intent has been
1✔
433
                                // canceled.
1✔
434
                                close(n.blockEpochClients[msg.epochID].epochChan)
1✔
435
                                delete(n.blockEpochClients, msg.epochID)
1✔
436
                        }
437

438
                case registerMsg := <-n.notificationRegistry:
1✔
439
                        switch msg := registerMsg.(type) {
1✔
440
                        case *chainntnfs.HistoricalConfDispatch:
1✔
441
                                // We'll start a historical rescan chain of the
1✔
442
                                // chain asynchronously to prevent blocking
1✔
443
                                // potentially long rescans.
1✔
444
                                n.wg.Add(1)
1✔
445

1✔
446
                                //nolint:ll
1✔
447
                                go func(msg *chainntnfs.HistoricalConfDispatch) {
2✔
448
                                        defer n.wg.Done()
1✔
449

1✔
450
                                        confDetails, err := n.historicalConfDetails(
1✔
451
                                                msg.ConfRequest,
1✔
452
                                                msg.StartHeight, msg.EndHeight,
1✔
453
                                        )
1✔
454
                                        if err != nil {
1✔
455
                                                chainntnfs.Log.Error(err)
×
456
                                                return
×
457
                                        }
×
458

459
                                        // If the historical dispatch finished
460
                                        // without error, we will invoke
461
                                        // UpdateConfDetails even if none were
462
                                        // found. This allows the notifier to
463
                                        // begin safely updating the height hint
464
                                        // cache at tip, since any pending
465
                                        // rescans have now completed.
466
                                        err = n.txNotifier.UpdateConfDetails(
1✔
467
                                                msg.ConfRequest, confDetails,
1✔
468
                                        )
1✔
469
                                        if err != nil {
1✔
470
                                                chainntnfs.Log.Error(err)
×
471
                                        }
×
472
                                }(msg)
473

474
                        case *blockEpochRegistration:
1✔
475
                                chainntnfs.Log.Infof("New block epoch subscription")
1✔
476

1✔
477
                                n.blockEpochClients[msg.epochID] = msg
1✔
478

1✔
479
                                // If the client did not provide their best
1✔
480
                                // known block, then we'll immediately dispatch
1✔
481
                                // a notification for the current tip.
1✔
482
                                if msg.bestBlock == nil {
2✔
483
                                        n.notifyBlockEpochClient(
1✔
484
                                                msg, n.bestBlock.Height,
1✔
485
                                                n.bestBlock.Hash,
1✔
486
                                                n.bestBlock.BlockHeader,
1✔
487
                                        )
1✔
488

1✔
489
                                        msg.errorChan <- nil
1✔
490
                                        continue
1✔
491
                                }
492

493
                                // Otherwise, we'll attempt to deliver the
494
                                // backlog of notifications from their best
495
                                // known block.
496
                                n.bestBlockMtx.Lock()
1✔
497
                                bestHeight := n.bestBlock.Height
1✔
498
                                n.bestBlockMtx.Unlock()
1✔
499

1✔
500
                                missedBlocks, err := chainntnfs.GetClientMissedBlocks(
1✔
501
                                        n.chainConn, msg.bestBlock, bestHeight,
1✔
502
                                        false,
1✔
503
                                )
1✔
504
                                if err != nil {
1✔
505
                                        msg.errorChan <- err
×
506
                                        continue
×
507
                                }
508

509
                                for _, block := range missedBlocks {
1✔
UNCOV
510
                                        n.notifyBlockEpochClient(
×
UNCOV
511
                                                msg, block.Height, block.Hash,
×
UNCOV
512
                                                block.BlockHeader,
×
UNCOV
513
                                        )
×
UNCOV
514
                                }
×
515

516
                                msg.errorChan <- nil
1✔
517

518
                        case *rescanFilterUpdate:
1✔
519
                                err := n.chainView.Update(msg.updateOptions...)
1✔
520
                                if err != nil {
1✔
521
                                        chainntnfs.Log.Errorf("Unable to "+
×
522
                                                "update rescan filter: %v", err)
×
523
                                }
×
524

525
                                // Drain the chainUpdates chan so the caller
526
                                // listening on errChan can be sure that
527
                                // updates after receiving the error will have
528
                                // the filter applied. This allows the caller
529
                                // to update their EndHeight if they're
530
                                // performing a historical scan.
531
                                n.drainChainUpdates()
1✔
532

1✔
533
                                // After draining, send the error to the
1✔
534
                                // caller.
1✔
535
                                msg.errChan <- err
1✔
536
                        }
537

538
                case item := <-n.chainUpdates:
1✔
539
                        update := item
1✔
540
                        if update.connect {
2✔
541
                                n.connectFilteredBlock(update)
1✔
542
                                continue
1✔
543
                        }
544

UNCOV
545
                        n.disconnectFilteredBlock(update)
×
546

547
                case txUpdate := <-n.txUpdates.ChanOut():
1✔
548
                        // A new relevant transaction notification has been
1✔
549
                        // received from the backend. We'll attempt to process
1✔
550
                        // it to determine if it fulfills any outstanding
1✔
551
                        // confirmation and/or spend requests and dispatch
1✔
552
                        // notifications for them.
1✔
553
                        update := txUpdate.(*relevantTx)
1✔
554
                        err := n.txNotifier.ProcessRelevantSpendTx(
1✔
555
                                update.tx, uint32(update.details.Height),
1✔
556
                        )
1✔
557
                        if err != nil {
1✔
558
                                chainntnfs.Log.Errorf("Unable to process "+
×
559
                                        "transaction %v: %v", update.tx.Hash(),
×
560
                                        err)
×
561
                        }
×
562

563
                case err := <-n.rescanErr:
×
564
                        chainntnfs.Log.Errorf("Error during rescan: %v", err)
×
565

566
                case <-n.quit:
1✔
567
                        return
1✔
568

569
                }
570
        }
571
}
572

573
// historicalConfDetails looks up whether a confirmation request (txid/output
574
// script) has already been included in a block in the active chain and, if so,
575
// returns details about said block.
576
func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequest,
577
        startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, error) {
1✔
578

1✔
579
        // Starting from the height hint, we'll walk forwards in the chain to
1✔
580
        // see if this transaction/output script has already been confirmed.
1✔
581
        for scanHeight := endHeight; scanHeight >= startHeight && scanHeight > 0; scanHeight-- {
2✔
582
                // Ensure we haven't been requested to shut down before
1✔
583
                // processing the next height.
1✔
584
                select {
1✔
585
                case <-n.quit:
×
586
                        return nil, chainntnfs.ErrChainNotifierShuttingDown
×
587
                default:
1✔
588
                }
589

590
                // First, we'll fetch the block header for this height so we
591
                // can compute the current block hash.
592
                blockHash, err := n.p2pNode.GetBlockHash(int64(scanHeight))
1✔
593
                if err != nil {
1✔
594
                        return nil, fmt.Errorf("unable to get header for "+
×
595
                                "height=%v: %w", scanHeight, err)
×
596
                }
×
597

598
                // With the hash computed, we can now fetch the basic filter for this
599
                // height. Since the range of required items is known we avoid
600
                // roundtrips by requesting a batched response and save bandwidth by
601
                // limiting the max number of items per batch. Since neutrino populates
602
                // its underline filters cache with the batch response, the next call
603
                // will execute a network query only once per batch and not on every
604
                // iteration.
605
                regFilter, err := n.p2pNode.GetCFilter(
1✔
606
                        *blockHash, wire.GCSFilterRegular,
1✔
607
                        neutrino.NumRetries(5),
1✔
608
                        neutrino.OptimisticReverseBatch(),
1✔
609
                        neutrino.MaxBatchSize(int64(scanHeight-startHeight+1)),
1✔
610
                )
1✔
611
                if err != nil {
1✔
612
                        return nil, fmt.Errorf("unable to retrieve regular "+
×
613
                                "filter for height=%v: %w", scanHeight, err)
×
614
                }
×
615

616
                // In the case that the filter exists, we'll attempt to see if
617
                // any element in it matches our target public key script.
618
                key := builder.DeriveKey(blockHash)
1✔
619
                match, err := regFilter.Match(key, confRequest.PkScript.Script())
1✔
620
                if err != nil {
1✔
621
                        return nil, fmt.Errorf("unable to query filter: %w",
×
622
                                err)
×
623
                }
×
624

625
                // If there's no match, then we can continue forward to the
626
                // next block.
627
                if !match {
2✔
628
                        continue
1✔
629
                }
630

631
                // In the case that we do have a match, we'll fetch the block
632
                // from the network so we can find the positional data required
633
                // to send the proper response.
634
                block, err := n.GetBlock(*blockHash)
1✔
635
                if err != nil {
1✔
636
                        return nil, fmt.Errorf("unable to get block from "+
×
637
                                "network: %w", err)
×
638
                }
×
639

640
                // For every transaction in the block, check which one matches
641
                // our request. If we find one that does, we can dispatch its
642
                // confirmation details.
643
                for i, tx := range block.Transactions() {
2✔
644
                        if !confRequest.MatchesTx(tx.MsgTx()) {
2✔
645
                                continue
1✔
646
                        }
647

648
                        return &chainntnfs.TxConfirmation{
1✔
649
                                Tx:          tx.MsgTx().Copy(),
1✔
650
                                BlockHash:   blockHash,
1✔
651
                                BlockHeight: scanHeight,
1✔
652
                                TxIndex:     uint32(i),
1✔
653
                                Block:       block.MsgBlock(),
1✔
654
                        }, nil
1✔
655
                }
656
        }
657

658
        return nil, nil
1✔
659
}
660

661
// handleBlockConnected applies a chain update for a new block. Any watched
662
// transactions included this block will processed to either send notifications
663
// now or after numConfirmations confs.
664
//
665
// NOTE: This method must be called with the bestBlockMtx lock held.
666
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
1✔
667
        // We'll extend the txNotifier's height with the information of this
1✔
668
        // new block, which will handle all of the notification logic for us.
1✔
669
        //
1✔
670
        // We actually need the _full_ block here as well in order to be able
1✔
671
        // to send the full block back up to the client. The neutrino client
1✔
672
        // itself will only dispatch a block if one of the items we're looking
1✔
673
        // for matches, so ultimately passing it the full block will still only
1✔
674
        // result in the items we care about being dispatched.
1✔
675
        rawBlock, err := n.GetBlock(newBlock.hash)
1✔
676
        if err != nil {
1✔
677
                return fmt.Errorf("unable to get full block: %w", err)
×
678
        }
×
679
        err = n.txNotifier.ConnectTip(rawBlock, newBlock.height)
1✔
680
        if err != nil {
1✔
681
                return fmt.Errorf("unable to connect tip: %w", err)
×
682
        }
×
683

684
        chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
1✔
685
                newBlock.hash)
1✔
686

1✔
687
        // Now that we've guaranteed the new block extends the txNotifier's
1✔
688
        // current tip, we'll proceed to dispatch notifications to all of our
1✔
689
        // registered clients whom have had notifications fulfilled. Before
1✔
690
        // doing so, we'll make sure update our in memory state in order to
1✔
691
        // satisfy any client requests based upon the new block.
1✔
692
        n.bestBlock.Hash = &newBlock.hash
1✔
693
        n.bestBlock.Height = int32(newBlock.height)
1✔
694
        n.bestBlock.BlockHeader = newBlock.header
1✔
695

1✔
696
        err = n.txNotifier.NotifyHeight(newBlock.height)
1✔
697
        if err != nil {
1✔
698
                return fmt.Errorf("unable to notify height: %w", err)
×
699
        }
×
700

701
        n.notifyBlockEpochs(
1✔
702
                int32(newBlock.height), &newBlock.hash, newBlock.header,
1✔
703
        )
1✔
704

1✔
705
        return nil
1✔
706
}
707

708
// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
UNCOV
709
func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) {
×
UNCOV
710
        rawBlock, err := n.GetBlock(*epoch.Hash)
×
UNCOV
711
        if err != nil {
×
712
                return nil, fmt.Errorf("unable to get block: %w", err)
×
713
        }
×
714

UNCOV
715
        txns := rawBlock.Transactions()
×
UNCOV
716

×
UNCOV
717
        block := &filteredBlock{
×
UNCOV
718
                hash:    *epoch.Hash,
×
UNCOV
719
                height:  uint32(epoch.Height),
×
UNCOV
720
                header:  &rawBlock.MsgBlock().Header,
×
UNCOV
721
                txns:    txns,
×
UNCOV
722
                connect: true,
×
UNCOV
723
        }
×
UNCOV
724
        return block, nil
×
725
}
726

727
// notifyBlockEpochs notifies all registered block epoch clients of the newly
728
// connected block to the main chain.
729
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
730
        blockHeader *wire.BlockHeader) {
1✔
731

1✔
732
        for _, client := range n.blockEpochClients {
2✔
733
                n.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
1✔
734
        }
1✔
735
}
736

737
// notifyBlockEpochClient sends a registered block epoch client a notification
738
// about a specific block.
739
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
740
        height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {
1✔
741

1✔
742
        epoch := &chainntnfs.BlockEpoch{
1✔
743
                Height:      height,
1✔
744
                Hash:        sha,
1✔
745
                BlockHeader: blockHeader,
1✔
746
        }
1✔
747

1✔
748
        select {
1✔
749
        case epochClient.epochQueue.ChanIn() <- epoch:
1✔
750
        case <-epochClient.cancelChan:
×
751
        case <-n.quit:
×
752
        }
753
}
754

755
// RegisterSpendNtfn registers an intent to be notified once the target
756
// outpoint/output script has been spent by a transaction on-chain. When
757
// intending to be notified of the spend of an output script, a nil outpoint
758
// must be used. The heightHint should represent the earliest height in the
759
// chain of the transaction that spent the outpoint/output script.
760
//
761
// Once a spend of has been detected, the details of the spending event will be
762
// sent across the 'Spend' channel.
763
func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
764
        pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
1✔
765

1✔
766
        // Register the conf notification with the TxNotifier. A non-nil value
1✔
767
        // for `dispatch` will be returned if we are required to perform a
1✔
768
        // manual scan for the confirmation. Otherwise the notifier will begin
1✔
769
        // watching at tip for the transaction to confirm.
1✔
770
        ntfn, err := n.txNotifier.RegisterSpend(outpoint, pkScript, heightHint)
1✔
771
        if err != nil {
2✔
772
                return nil, err
1✔
773
        }
1✔
774

775
        // To determine whether this outpoint has been spent on-chain, we'll
776
        // update our filter to watch for the transaction at tip and we'll also
777
        // dispatch a historical rescan to determine if it has been spent in the
778
        // past.
779
        //
780
        // We'll update our filter first to ensure we can immediately detect the
781
        // spend at tip.
782
        if outpoint == nil {
1✔
UNCOV
783
                outpoint = &chainntnfs.ZeroOutPoint
×
UNCOV
784
        }
×
785
        inputToWatch := neutrino.InputWithScript{
1✔
786
                OutPoint: *outpoint,
1✔
787
                PkScript: pkScript,
1✔
788
        }
1✔
789
        updateOptions := []neutrino.UpdateOption{
1✔
790
                neutrino.AddInputs(inputToWatch),
1✔
791
                neutrino.DisableDisconnectedNtfns(true),
1✔
792
        }
1✔
793

1✔
794
        // We'll use the txNotifier's tip as the starting point of our filter
1✔
795
        // update. In the case of an output script spend request, we'll check if
1✔
796
        // we should perform a historical rescan and start from there, as we
1✔
797
        // cannot do so with GetUtxo since it matches outpoints.
1✔
798
        rewindHeight := ntfn.Height
1✔
799
        if ntfn.HistoricalDispatch != nil && *outpoint == chainntnfs.ZeroOutPoint {
1✔
UNCOV
800
                rewindHeight = ntfn.HistoricalDispatch.StartHeight
×
UNCOV
801
        }
×
802
        updateOptions = append(updateOptions, neutrino.Rewind(rewindHeight))
1✔
803

1✔
804
        errChan := make(chan error, 1)
1✔
805
        select {
1✔
806
        case n.notificationRegistry <- &rescanFilterUpdate{
807
                updateOptions: updateOptions,
808
                errChan:       errChan,
809
        }:
1✔
810
        case <-n.quit:
×
811
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
812
        }
813

814
        select {
1✔
815
        case err = <-errChan:
1✔
816
        case <-n.quit:
×
817
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
818
        }
819
        if err != nil {
1✔
820
                return nil, fmt.Errorf("unable to update filter: %w", err)
×
821
        }
×
822

823
        // If the txNotifier didn't return any details to perform a historical
824
        // scan of the chain, or if we already performed one like in the case of
825
        // output script spend requests, then we can return early as there's
826
        // nothing left for us to do.
827
        if ntfn.HistoricalDispatch == nil || *outpoint == chainntnfs.ZeroOutPoint {
2✔
828
                return ntfn.Event, nil
1✔
829
        }
1✔
830

831
        // Grab the current best height as the height may have been updated
832
        // while we were draining the chainUpdates queue.
833
        n.bestBlockMtx.RLock()
1✔
834
        currentHeight := uint32(n.bestBlock.Height)
1✔
835
        n.bestBlockMtx.RUnlock()
1✔
836

1✔
837
        ntfn.HistoricalDispatch.EndHeight = currentHeight
1✔
838

1✔
839
        // With the filter updated, we'll dispatch our historical rescan to
1✔
840
        // ensure we detect the spend if it happened in the past.
1✔
841
        n.wg.Add(1)
1✔
842
        go func() {
2✔
843
                defer n.wg.Done()
1✔
844

1✔
845
                // We'll ensure that neutrino is caught up to the starting
1✔
846
                // height before we attempt to fetch the UTXO from the chain.
1✔
847
                // If we're behind, then we may miss a notification dispatch.
1✔
848
                for {
2✔
849
                        n.bestBlockMtx.RLock()
1✔
850
                        currentHeight := uint32(n.bestBlock.Height)
1✔
851
                        n.bestBlockMtx.RUnlock()
1✔
852

1✔
853
                        if currentHeight >= ntfn.HistoricalDispatch.StartHeight {
2✔
854
                                break
1✔
855
                        }
856

857
                        select {
×
858
                        case <-time.After(time.Millisecond * 200):
×
859
                        case <-n.quit:
×
860
                                return
×
861
                        }
862
                }
863

864
                spendReport, err := n.p2pNode.GetUtxo(
1✔
865
                        neutrino.WatchInputs(inputToWatch),
1✔
866
                        neutrino.StartBlock(&headerfs.BlockStamp{
1✔
867
                                Height: int32(ntfn.HistoricalDispatch.StartHeight),
1✔
868
                        }),
1✔
869
                        neutrino.EndBlock(&headerfs.BlockStamp{
1✔
870
                                Height: int32(ntfn.HistoricalDispatch.EndHeight),
1✔
871
                        }),
1✔
872
                        neutrino.ProgressHandler(func(processedHeight uint32) {
2✔
873
                                // We persist the rescan progress to achieve incremental
1✔
874
                                // behavior across restarts, otherwise long rescans may
1✔
875
                                // start from the beginning with every restart.
1✔
876
                                err := n.spendHintCache.CommitSpendHint(
1✔
877
                                        processedHeight,
1✔
878
                                        ntfn.HistoricalDispatch.SpendRequest)
1✔
879
                                if err != nil {
1✔
880
                                        chainntnfs.Log.Errorf("Failed to update rescan "+
×
881
                                                "progress: %v", err)
×
882
                                }
×
883
                        }),
884
                        neutrino.QuitChan(n.quit),
885
                )
886
                if err != nil && !strings.Contains(err.Error(), "not found") {
2✔
887
                        chainntnfs.Log.Errorf("Failed getting UTXO: %v", err)
1✔
888
                        return
1✔
889
                }
1✔
890

891
                // If a spend report was returned, and the transaction is present, then
892
                // this means that the output is already spent.
893
                var spendDetails *chainntnfs.SpendDetail
1✔
894
                if spendReport != nil && spendReport.SpendingTx != nil {
2✔
895
                        spendingTxHash := spendReport.SpendingTx.TxHash()
1✔
896
                        spendDetails = &chainntnfs.SpendDetail{
1✔
897
                                SpentOutPoint:     outpoint,
1✔
898
                                SpenderTxHash:     &spendingTxHash,
1✔
899
                                SpendingTx:        spendReport.SpendingTx,
1✔
900
                                SpenderInputIndex: spendReport.SpendingInputIndex,
1✔
901
                                SpendingHeight:    int32(spendReport.SpendingTxHeight),
1✔
902
                        }
1✔
903
                }
1✔
904

905
                // Finally, no matter whether the rescan found a spend in the past or
906
                // not, we'll mark our historical rescan as complete to ensure the
907
                // outpoint's spend hint gets updated upon connected/disconnected
908
                // blocks.
909
                err = n.txNotifier.UpdateSpendDetails(
1✔
910
                        ntfn.HistoricalDispatch.SpendRequest, spendDetails,
1✔
911
                )
1✔
912
                if err != nil {
1✔
913
                        chainntnfs.Log.Errorf("Failed to update spend details: %v", err)
×
914
                        return
×
915
                }
×
916
        }()
917

918
        return ntfn.Event, nil
1✔
919
}
920

921
// RegisterConfirmationsNtfn registers an intent to be notified once the target
922
// txid/output script has reached numConfs confirmations on-chain. When
923
// intending to be notified of the confirmation of an output script, a nil txid
924
// must be used. The heightHint should represent the earliest height at which
925
// the txid/output script could have been included in the chain.
926
//
927
// Progress on the number of confirmations left can be read from the 'Updates'
928
// channel. Once it has reached all of its confirmations, a notification will be
929
// sent across the 'Confirmed' channel.
930
func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
931
        pkScript []byte, numConfs, heightHint uint32,
932
        opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) {
1✔
933

1✔
934
        // Register the conf notification with the TxNotifier. A non-nil value
1✔
935
        // for `dispatch` will be returned if we are required to perform a
1✔
936
        // manual scan for the confirmation. Otherwise the notifier will begin
1✔
937
        // watching at tip for the transaction to confirm.
1✔
938
        ntfn, err := n.txNotifier.RegisterConf(
1✔
939
                txid, pkScript, numConfs, heightHint, opts...,
1✔
940
        )
1✔
941
        if err != nil {
1✔
942
                return nil, err
×
943
        }
×
944

945
        // To determine whether this transaction has confirmed on-chain, we'll
946
        // update our filter to watch for the transaction at tip and we'll also
947
        // dispatch a historical rescan to determine if it has confirmed in the
948
        // past.
949
        //
950
        // We'll update our filter first to ensure we can immediately detect the
951
        // confirmation at tip. To do so, we'll map the script into an address
952
        // type so we can instruct neutrino to match if the transaction
953
        // containing the script is found in a block.
954
        params := n.p2pNode.ChainParams()
1✔
955
        _, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, &params)
1✔
956
        if err != nil {
1✔
957
                return nil, fmt.Errorf("unable to extract script: %w", err)
×
958
        }
×
959

960
        // We'll send the filter update request to the notifier's main event
961
        // handler and wait for its response.
962
        errChan := make(chan error, 1)
1✔
963
        select {
1✔
964
        case n.notificationRegistry <- &rescanFilterUpdate{
965
                updateOptions: []neutrino.UpdateOption{
966
                        neutrino.AddAddrs(addrs...),
967
                        neutrino.Rewind(ntfn.Height),
968
                        neutrino.DisableDisconnectedNtfns(true),
969
                },
970
                errChan: errChan,
971
        }:
1✔
972
        case <-n.quit:
×
973
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
974
        }
975

976
        select {
1✔
977
        case err = <-errChan:
1✔
978
        case <-n.quit:
×
979
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
980
        }
981
        if err != nil {
1✔
982
                return nil, fmt.Errorf("unable to update filter: %w", err)
×
983
        }
×
984

985
        // If a historical rescan was not requested by the txNotifier, then we
986
        // can return to the caller.
987
        if ntfn.HistoricalDispatch == nil {
2✔
988
                return ntfn.Event, nil
1✔
989
        }
1✔
990

991
        // Grab the current best height as the height may have been updated
992
        // while we were draining the chainUpdates queue.
993
        n.bestBlockMtx.RLock()
1✔
994
        currentHeight := uint32(n.bestBlock.Height)
1✔
995
        n.bestBlockMtx.RUnlock()
1✔
996

1✔
997
        ntfn.HistoricalDispatch.EndHeight = currentHeight
1✔
998

1✔
999
        // Finally, with the filter updated, we can dispatch the historical
1✔
1000
        // rescan to ensure we can detect if the event happened in the past.
1✔
1001
        select {
1✔
1002
        case n.notificationRegistry <- ntfn.HistoricalDispatch:
1✔
1003
        case <-n.quit:
×
1004
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
1005
        }
1006

1007
        return ntfn.Event, nil
1✔
1008
}
1009

1010
// GetBlock is used to retrieve the block with the given hash. Since the block
1011
// cache used by neutrino will be the same as that used by LND (since it is
1012
// passed to neutrino on initialisation), the neutrino GetBlock method can be
1013
// called directly since it already uses the block cache. However, neutrino
1014
// does not lock the block cache mutex for the given block hash and so that is
1015
// done here.
1016
func (n *NeutrinoNotifier) GetBlock(hash chainhash.Hash) (
1017
        *btcutil.Block, error) {
1✔
1018

1✔
1019
        n.blockCache.HashMutex.Lock(lntypes.Hash(hash))
1✔
1020
        defer n.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
1✔
1021

1✔
1022
        return n.p2pNode.GetBlock(hash)
1✔
1023
}
1✔
1024

1025
// blockEpochRegistration represents a client's intent to receive a
1026
// notification with each newly connected block.
1027
type blockEpochRegistration struct {
1028
        epochID uint64
1029

1030
        epochChan chan *chainntnfs.BlockEpoch
1031

1032
        epochQueue *queue.ConcurrentQueue
1033

1034
        cancelChan chan struct{}
1035

1036
        bestBlock *chainntnfs.BlockEpoch
1037

1038
        errorChan chan error
1039

1040
        wg sync.WaitGroup
1041
}
1042

1043
// epochCancel is a message sent to the NeutrinoNotifier when a client wishes
1044
// to cancel an outstanding epoch notification that has yet to be dispatched.
1045
type epochCancel struct {
1046
        epochID uint64
1047
}
1048

1049
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
1050
// caller to receive notifications, of each new block connected to the main
1051
// chain. Clients have the option of passing in their best known block, which
1052
// the notifier uses to check if they are behind on blocks and catch them up. If
1053
// they do not provide one, then a notification will be dispatched immediately
1054
// for the current tip of the chain upon a successful registration.
1055
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn(
1056
        bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
1✔
1057

1✔
1058
        reg := &blockEpochRegistration{
1✔
1059
                epochQueue: queue.NewConcurrentQueue(20),
1✔
1060
                epochChan:  make(chan *chainntnfs.BlockEpoch, 20),
1✔
1061
                cancelChan: make(chan struct{}),
1✔
1062
                epochID:    atomic.AddUint64(&n.epochClientCounter, 1),
1✔
1063
                bestBlock:  bestBlock,
1✔
1064
                errorChan:  make(chan error, 1),
1✔
1065
        }
1✔
1066
        reg.epochQueue.Start()
1✔
1067

1✔
1068
        // Before we send the request to the main goroutine, we'll launch a new
1✔
1069
        // goroutine to proxy items added to our queue to the client itself.
1✔
1070
        // This ensures that all notifications are received *in order*.
1✔
1071
        reg.wg.Add(1)
1✔
1072
        go func() {
2✔
1073
                defer reg.wg.Done()
1✔
1074

1✔
1075
                for {
2✔
1076
                        select {
1✔
1077
                        case ntfn := <-reg.epochQueue.ChanOut():
1✔
1078
                                blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
1✔
1079
                                select {
1✔
1080
                                case reg.epochChan <- blockNtfn:
1✔
1081

1082
                                case <-reg.cancelChan:
1✔
1083
                                        return
1✔
1084

UNCOV
1085
                                case <-n.quit:
×
UNCOV
1086
                                        return
×
1087
                                }
1088

1089
                        case <-reg.cancelChan:
1✔
1090
                                return
1✔
1091

1092
                        case <-n.quit:
1✔
1093
                                return
1✔
1094
                        }
1095
                }
1096
        }()
1097

1098
        select {
1✔
1099
        case <-n.quit:
×
1100
                // As we're exiting before the registration could be sent,
×
1101
                // we'll stop the queue now ourselves.
×
1102
                reg.epochQueue.Stop()
×
1103

×
1104
                return nil, errors.New("chainntnfs: system interrupt while " +
×
1105
                        "attempting to register for block epoch notification.")
×
1106
        case n.notificationRegistry <- reg:
1✔
1107
                return &chainntnfs.BlockEpochEvent{
1✔
1108
                        Epochs: reg.epochChan,
1✔
1109
                        Cancel: func() {
2✔
1110
                                cancel := &epochCancel{
1✔
1111
                                        epochID: reg.epochID,
1✔
1112
                                }
1✔
1113

1✔
1114
                                // Submit epoch cancellation to notification dispatcher.
1✔
1115
                                select {
1✔
1116
                                case n.notificationCancels <- cancel:
1✔
1117
                                        // Cancellation is being handled, drain the epoch channel until it is
1✔
1118
                                        // closed before yielding to caller.
1✔
1119
                                        for {
2✔
1120
                                                select {
1✔
1121
                                                case _, ok := <-reg.epochChan:
1✔
1122
                                                        if !ok {
2✔
1123
                                                                return
1✔
1124
                                                        }
1✔
1125
                                                case <-n.quit:
×
1126
                                                        return
×
1127
                                                }
1128
                                        }
1129
                                case <-n.quit:
1✔
1130
                                }
1131
                        },
1132
                }, nil
1133
        }
1134
}
1135

1136
// NeutrinoChainConn is a wrapper around neutrino's chain backend in order
1137
// to satisfy the chainntnfs.ChainConn interface.
1138
type NeutrinoChainConn struct {
1139
        p2pNode *neutrino.ChainService
1140
}
1141

1142
// GetBlockHeader returns the block header for a hash.
UNCOV
1143
func (n *NeutrinoChainConn) GetBlockHeader(blockHash *chainhash.Hash) (*wire.BlockHeader, error) {
×
UNCOV
1144
        return n.p2pNode.GetBlockHeader(blockHash)
×
UNCOV
1145
}
×
1146

1147
// GetBlockHeaderVerbose returns a verbose block header result for a hash. This
1148
// result only contains the height with a nil hash.
1149
func (n *NeutrinoChainConn) GetBlockHeaderVerbose(blockHash *chainhash.Hash) (
1150
        *btcjson.GetBlockHeaderVerboseResult, error) {
×
1151

×
1152
        height, err := n.p2pNode.GetBlockHeight(blockHash)
×
1153
        if err != nil {
×
1154
                return nil, err
×
1155
        }
×
1156
        // Since only the height is used from the result, leave the hash nil.
1157
        return &btcjson.GetBlockHeaderVerboseResult{Height: int32(height)}, nil
×
1158
}
1159

1160
// GetBlockHash returns the hash from a block height.
UNCOV
1161
func (n *NeutrinoChainConn) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
×
UNCOV
1162
        return n.p2pNode.GetBlockHash(blockHeight)
×
UNCOV
1163
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc