• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.43
/chainntnfs/btcdnotify/btcd.go
1
package btcdnotify
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg"
13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/btcsuite/btcd/rpcclient"
15
        "github.com/btcsuite/btcd/txscript"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btcwallet/chain"
18
        "github.com/lightningnetwork/lnd/blockcache"
19
        "github.com/lightningnetwork/lnd/chainntnfs"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/queue"
22
)
23

24
const (
25
        // notifierType uniquely identifies this concrete implementation of the
26
        // ChainNotifier interface.
27
        notifierType = "btcd"
28
)
29

30
// chainUpdate encapsulates an update to the current main chain. This struct is
31
// used as an element within an unbounded queue in order to avoid blocking the
32
// main rpc dispatch rule.
33
type chainUpdate struct {
34
        blockHash   *chainhash.Hash
35
        blockHeight int32
36

37
        // connected is true if this update is a new block and false if it is a
38
        // disconnected block.
39
        connect bool
40
}
41

42
// txUpdate encapsulates a transaction related notification sent from btcd to
43
// the registered RPC client. This struct is used as an element within an
44
// unbounded queue in order to avoid blocking the main rpc dispatch rule.
45
type txUpdate struct {
46
        tx      *btcutil.Tx
47
        details *btcjson.BlockDetails
48
}
49

50
// TODO(roasbeef): generalize struct below:
51
//  * move chans to config, allow outside callers to handle send conditions
52

53
// BtcdNotifier implements the ChainNotifier interface using btcd's websockets
54
// notifications. Multiple concurrent clients are supported. All notifications
55
// are achieved via non-blocking sends on client channels.
56
type BtcdNotifier struct {
57
        epochClientCounter uint64 // To be used atomically.
58

59
        start   sync.Once
60
        active  int32 // To be used atomically.
61
        stopped int32 // To be used atomically.
62

63
        chainConn   *chain.RPCClient
64
        chainParams *chaincfg.Params
65

66
        notificationCancels  chan interface{}
67
        notificationRegistry chan interface{}
68

69
        txNotifier *chainntnfs.TxNotifier
70

71
        blockEpochClients map[uint64]*blockEpochRegistration
72

73
        bestBlock chainntnfs.BlockEpoch
74

75
        // blockCache is a LRU block cache.
76
        blockCache *blockcache.BlockCache
77

78
        chainUpdates *queue.ConcurrentQueue
79
        txUpdates    *queue.ConcurrentQueue
80

81
        // spendHintCache is a cache used to query and update the latest height
82
        // hints for an outpoint. Each height hint represents the earliest
83
        // height at which the outpoint could have been spent within the chain.
84
        spendHintCache chainntnfs.SpendHintCache
85

86
        // confirmHintCache is a cache used to query the latest height hints for
87
        // a transaction. Each height hint represents the earliest height at
88
        // which the transaction could have confirmed within the chain.
89
        confirmHintCache chainntnfs.ConfirmHintCache
90

91
        // memNotifier notifies clients of events related to the mempool.
92
        memNotifier *chainntnfs.MempoolNotifier
93

94
        wg   sync.WaitGroup
95
        quit chan struct{}
96
}
97

98
// Ensure BtcdNotifier implements the ChainNotifier interface at compile time.
99
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
100

101
// Ensure BtcdNotifier implements the MempoolWatcher interface at compile time.
102
var _ chainntnfs.MempoolWatcher = (*BtcdNotifier)(nil)
103

104
// New returns a new BtcdNotifier instance. This function assumes the btcd node
105
// detailed in the passed configuration is already running, and willing to
106
// accept new websockets clients.
107
func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params,
108
        spendHintCache chainntnfs.SpendHintCache,
109
        confirmHintCache chainntnfs.ConfirmHintCache,
110
        blockCache *blockcache.BlockCache) (*BtcdNotifier, error) {
10✔
111

10✔
112
        notifier := &BtcdNotifier{
10✔
113
                chainParams: chainParams,
10✔
114

10✔
115
                notificationCancels:  make(chan interface{}),
10✔
116
                notificationRegistry: make(chan interface{}),
10✔
117

10✔
118
                blockEpochClients: make(map[uint64]*blockEpochRegistration),
10✔
119

10✔
120
                chainUpdates: queue.NewConcurrentQueue(10),
10✔
121
                txUpdates:    queue.NewConcurrentQueue(10),
10✔
122

10✔
123
                spendHintCache:   spendHintCache,
10✔
124
                confirmHintCache: confirmHintCache,
10✔
125

10✔
126
                blockCache:  blockCache,
10✔
127
                memNotifier: chainntnfs.NewMempoolNotifier(),
10✔
128

10✔
129
                quit: make(chan struct{}),
10✔
130
        }
10✔
131

10✔
132
        ntfnCallbacks := &rpcclient.NotificationHandlers{
10✔
133
                OnBlockConnected:    notifier.onBlockConnected,
10✔
134
                OnBlockDisconnected: notifier.onBlockDisconnected,
10✔
135
                OnRedeemingTx:       notifier.onRedeemingTx,
10✔
136
        }
10✔
137

10✔
138
        rpcCfg := &chain.RPCClientConfig{
10✔
139
                ReconnectAttempts:    20,
10✔
140
                Conn:                 config,
10✔
141
                Chain:                chainParams,
10✔
142
                NotificationHandlers: ntfnCallbacks,
10✔
143
        }
10✔
144

10✔
145
        chainRPC, err := chain.NewRPCClientWithConfig(rpcCfg)
10✔
146
        if err != nil {
10✔
147
                return nil, err
×
148
        }
×
149

150
        notifier.chainConn = chainRPC
10✔
151

10✔
152
        return notifier, nil
10✔
153
}
154

155
// Start connects to the running btcd node over websockets, registers for block
156
// notifications, and finally launches all related helper goroutines.
157
func (b *BtcdNotifier) Start() error {
7✔
158
        var startErr error
7✔
159
        b.start.Do(func() {
14✔
160
                startErr = b.startNotifier()
7✔
161
        })
7✔
162

163
        return startErr
7✔
164
}
165

166
// Started returns true if this instance has been started, and false otherwise.
UNCOV
167
func (b *BtcdNotifier) Started() bool {
×
UNCOV
168
        return atomic.LoadInt32(&b.active) != 0
×
UNCOV
169
}
×
170

171
// Stop shutsdown the BtcdNotifier.
172
func (b *BtcdNotifier) Stop() error {
6✔
173
        // Already shutting down?
6✔
174
        if atomic.AddInt32(&b.stopped, 1) != 1 {
6✔
175
                return nil
×
176
        }
×
177

178
        chainntnfs.Log.Info("btcd notifier shutting down...")
6✔
179
        defer chainntnfs.Log.Debug("btcd notifier shutdown complete")
6✔
180

6✔
181
        // Shutdown the rpc client, this gracefully disconnects from btcd, and
6✔
182
        // cleans up all related resources.
6✔
183
        b.chainConn.Stop()
6✔
184

6✔
185
        close(b.quit)
6✔
186
        b.wg.Wait()
6✔
187

6✔
188
        b.chainUpdates.Stop()
6✔
189
        b.txUpdates.Stop()
6✔
190

6✔
191
        // Notify all pending clients of our shutdown by closing the related
6✔
192
        // notification channels.
6✔
193
        for _, epochClient := range b.blockEpochClients {
29✔
194
                close(epochClient.cancelChan)
23✔
195
                epochClient.wg.Wait()
23✔
196

23✔
197
                close(epochClient.epochChan)
23✔
198
        }
23✔
199
        b.txNotifier.TearDown()
6✔
200

6✔
201
        // Stop the mempool notifier.
6✔
202
        b.memNotifier.TearDown()
6✔
203

6✔
204
        return nil
6✔
205
}
206

207
func (b *BtcdNotifier) startNotifier() error {
7✔
208
        // Start our concurrent queues before starting the chain connection, to
7✔
209
        // ensure onBlockConnected and onRedeemingTx callbacks won't be
7✔
210
        // blocked.
7✔
211
        b.chainUpdates.Start()
7✔
212
        b.txUpdates.Start()
7✔
213

7✔
214
        // Connect to btcd, and register for notifications on connected, and
7✔
215
        // disconnected blocks.
7✔
216
        if err := b.chainConn.Connect(20); err != nil {
7✔
217
                b.txUpdates.Stop()
×
218
                b.chainUpdates.Stop()
×
219
                return err
×
220
        }
×
221

222
        currentHash, currentHeight, err := b.chainConn.GetBestBlock()
7✔
223
        if err != nil {
7✔
224
                b.txUpdates.Stop()
×
225
                b.chainUpdates.Stop()
×
226
                return err
×
227
        }
×
228

229
        bestBlock, err := b.chainConn.GetBlock(currentHash)
7✔
230
        if err != nil {
7✔
231
                b.txUpdates.Stop()
×
232
                b.chainUpdates.Stop()
×
233
                return err
×
234
        }
×
235

236
        b.txNotifier = chainntnfs.NewTxNotifier(
7✔
237
                uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
7✔
238
                b.confirmHintCache, b.spendHintCache,
7✔
239
        )
7✔
240

7✔
241
        b.bestBlock = chainntnfs.BlockEpoch{
7✔
242
                Height:      currentHeight,
7✔
243
                Hash:        currentHash,
7✔
244
                BlockHeader: &bestBlock.Header,
7✔
245
        }
7✔
246

7✔
247
        if err := b.chainConn.NotifyBlocks(); err != nil {
7✔
248
                b.txUpdates.Stop()
×
249
                b.chainUpdates.Stop()
×
250
                return err
×
251
        }
×
252

253
        b.wg.Add(1)
7✔
254
        go b.notificationDispatcher()
7✔
255

7✔
256
        // Set the active flag now that we've completed the full
7✔
257
        // startup.
7✔
258
        atomic.StoreInt32(&b.active, 1)
7✔
259

7✔
260
        return nil
7✔
261
}
262

263
// onBlockConnected implements on OnBlockConnected callback for rpcclient.
264
// Ingesting a block updates the wallet's internal utxo state based on the
265
// outputs created and destroyed within each block.
266
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
896✔
267
        // Append this new chain update to the end of the queue of new chain
896✔
268
        // updates.
896✔
269
        select {
896✔
270
        case b.chainUpdates.ChanIn() <- &chainUpdate{
271
                blockHash:   hash,
272
                blockHeight: height,
273
                connect:     true,
274
        }:
896✔
275
        case <-b.quit:
×
276
                return
×
277
        }
278
}
279

280
// filteredBlock represents a new block which has been connected to the main
281
// chain. The slice of transactions will only be populated if the block
282
// includes a transaction that confirmed one of our watched txids, or spends
283
// one of the outputs currently being watched.
284
//
285
// TODO(halseth): this is currently used for complete blocks. Change to use
286
// onFilteredBlockConnected and onFilteredBlockDisconnected, making it easier
287
// to unify with the Neutrino implementation.
288
type filteredBlock struct {
289
        hash   chainhash.Hash
290
        height uint32
291
        block  *btcutil.Block
292

293
        // connected is true if this update is a new block and false if it is a
294
        // disconnected block.
295
        connect bool
296
}
297

298
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient.
299
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
44✔
300
        // Append this new chain update to the end of the queue of new chain
44✔
301
        // updates.
44✔
302
        select {
44✔
303
        case b.chainUpdates.ChanIn() <- &chainUpdate{
304
                blockHash:   hash,
305
                blockHeight: height,
306
                connect:     false,
307
        }:
44✔
308
        case <-b.quit:
×
309
                return
×
310
        }
311
}
312

313
// onRedeemingTx implements on OnRedeemingTx callback for rpcclient.
314
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
9✔
315
        // Append this new transaction update to the end of the queue of new
9✔
316
        // chain updates.
9✔
317
        select {
9✔
318
        case b.txUpdates.ChanIn() <- &txUpdate{tx, details}:
9✔
319
        case <-b.quit:
×
320
                return
×
321
        }
322
}
323

324
// notificationDispatcher is the primary goroutine which handles client
325
// notification registrations, as well as notification dispatches.
326
func (b *BtcdNotifier) notificationDispatcher() {
10✔
327
        defer b.wg.Done()
10✔
328

10✔
329
out:
10✔
330
        for {
1,007✔
331
                select {
997✔
332
                case cancelMsg := <-b.notificationCancels:
1✔
333
                        switch msg := cancelMsg.(type) {
1✔
334
                        case *epochCancel:
1✔
335
                                chainntnfs.Log.Infof("Cancelling epoch "+
1✔
336
                                        "notification, epoch_id=%v", msg.epochID)
1✔
337

1✔
338
                                // First, we'll lookup the original
1✔
339
                                // registration in order to stop the active
1✔
340
                                // queue goroutine.
1✔
341
                                reg := b.blockEpochClients[msg.epochID]
1✔
342
                                reg.epochQueue.Stop()
1✔
343

1✔
344
                                // Next, close the cancel channel for this
1✔
345
                                // specific client, and wait for the client to
1✔
346
                                // exit.
1✔
347
                                close(b.blockEpochClients[msg.epochID].cancelChan)
1✔
348
                                b.blockEpochClients[msg.epochID].wg.Wait()
1✔
349

1✔
350
                                // Once the client has exited, we can then
1✔
351
                                // safely close the channel used to send epoch
1✔
352
                                // notifications, in order to notify any
1✔
353
                                // listeners that the intent has been
1✔
354
                                // canceled.
1✔
355
                                close(b.blockEpochClients[msg.epochID].epochChan)
1✔
356
                                delete(b.blockEpochClients, msg.epochID)
1✔
357
                        }
358
                case registerMsg := <-b.notificationRegistry:
57✔
359
                        switch msg := registerMsg.(type) {
57✔
360
                        case *chainntnfs.HistoricalConfDispatch:
33✔
361
                                // Look up whether the transaction/output script
33✔
362
                                // has already confirmed in the active chain.
33✔
363
                                // We'll do this in a goroutine to prevent
33✔
364
                                // blocking potentially long rescans.
33✔
365
                                //
33✔
366
                                // TODO(wilmer): add retry logic if rescan fails?
33✔
367
                                b.wg.Add(1)
33✔
368

33✔
369
                                //nolint:ll
33✔
370
                                go func(msg *chainntnfs.HistoricalConfDispatch) {
66✔
371
                                        defer b.wg.Done()
33✔
372

33✔
373
                                        confDetails, _, err := b.historicalConfDetails(
33✔
374
                                                msg.ConfRequest,
33✔
375
                                                msg.StartHeight, msg.EndHeight,
33✔
376
                                        )
33✔
377
                                        if err != nil {
33✔
378
                                                chainntnfs.Log.Error(err)
×
379
                                                return
×
380
                                        }
×
381

382
                                        // If the historical dispatch finished
383
                                        // without error, we will invoke
384
                                        // UpdateConfDetails even if none were
385
                                        // found. This allows the notifier to
386
                                        // begin safely updating the height hint
387
                                        // cache at tip, since any pending
388
                                        // rescans have now completed.
389
                                        err = b.txNotifier.UpdateConfDetails(
33✔
390
                                                msg.ConfRequest, confDetails,
33✔
391
                                        )
33✔
392
                                        if err != nil {
33✔
393
                                                chainntnfs.Log.Error(err)
×
394
                                        }
×
395
                                }(msg)
396

397
                        case *blockEpochRegistration:
24✔
398
                                chainntnfs.Log.Infof("New block epoch subscription")
24✔
399

24✔
400
                                b.blockEpochClients[msg.epochID] = msg
24✔
401

24✔
402
                                // If the client did not provide their best
24✔
403
                                // known block, then we'll immediately dispatch
24✔
404
                                // a notification for the current tip.
24✔
405
                                if msg.bestBlock == nil {
43✔
406
                                        b.notifyBlockEpochClient(
19✔
407
                                                msg, b.bestBlock.Height,
19✔
408
                                                b.bestBlock.Hash,
19✔
409
                                                b.bestBlock.BlockHeader,
19✔
410
                                        )
19✔
411

19✔
412
                                        msg.errorChan <- nil
19✔
413
                                        continue
19✔
414
                                }
415

416
                                // Otherwise, we'll attempt to deliver the
417
                                // backlog of notifications from their best
418
                                // known block.
419
                                missedBlocks, err := chainntnfs.GetClientMissedBlocks(
5✔
420
                                        b.chainConn, msg.bestBlock,
5✔
421
                                        b.bestBlock.Height, true,
5✔
422
                                )
5✔
423
                                if err != nil {
5✔
424
                                        msg.errorChan <- err
×
425
                                        continue
×
426
                                }
427

428
                                for _, block := range missedBlocks {
55✔
429
                                        b.notifyBlockEpochClient(
50✔
430
                                                msg, block.Height, block.Hash,
50✔
431
                                                block.BlockHeader,
50✔
432
                                        )
50✔
433
                                }
50✔
434

435
                                msg.errorChan <- nil
5✔
436
                        }
437

438
                case item := <-b.chainUpdates.ChanOut():
920✔
439
                        update := item.(*chainUpdate)
920✔
440
                        if update.connect {
1,796✔
441
                                blockHeader, err := b.chainConn.GetBlockHeader(
876✔
442
                                        update.blockHash,
876✔
443
                                )
876✔
444
                                if err != nil {
877✔
445
                                        chainntnfs.Log.Errorf("Unable to fetch "+
1✔
446
                                                "block header: %v", err)
1✔
447
                                        continue
1✔
448
                                }
449

450
                                if blockHeader.PrevBlock != *b.bestBlock.Hash {
877✔
451
                                        // Handle the case where the notifier
2✔
452
                                        // missed some blocks from its chain
2✔
453
                                        // backend
2✔
454
                                        chainntnfs.Log.Infof("Missed blocks, " +
2✔
455
                                                "attempting to catch up")
2✔
456
                                        newBestBlock, missedBlocks, err :=
2✔
457
                                                chainntnfs.HandleMissedBlocks(
2✔
458
                                                        b.chainConn,
2✔
459
                                                        b.txNotifier,
2✔
460
                                                        b.bestBlock,
2✔
461
                                                        update.blockHeight,
2✔
462
                                                        true,
2✔
463
                                                )
2✔
464
                                        if err != nil {
2✔
UNCOV
465
                                                // Set the bestBlock here in case
×
UNCOV
466
                                                // a catch up partially completed.
×
UNCOV
467
                                                b.bestBlock = newBestBlock
×
UNCOV
468
                                                chainntnfs.Log.Error(err)
×
UNCOV
469
                                                continue
×
470
                                        }
471

472
                                        for _, block := range missedBlocks {
23✔
473
                                                err := b.handleBlockConnected(block)
21✔
474
                                                if err != nil {
21✔
475
                                                        chainntnfs.Log.Error(err)
×
476
                                                        continue out
×
477
                                                }
478
                                        }
479
                                }
480

481
                                newBlock := chainntnfs.BlockEpoch{
875✔
482
                                        Height:      update.blockHeight,
875✔
483
                                        Hash:        update.blockHash,
875✔
484
                                        BlockHeader: blockHeader,
875✔
485
                                }
875✔
486
                                if err := b.handleBlockConnected(newBlock); err != nil {
875✔
487
                                        chainntnfs.Log.Error(err)
×
488
                                }
×
489
                                continue
875✔
490
                        }
491

492
                        if update.blockHeight != b.bestBlock.Height {
45✔
493
                                chainntnfs.Log.Infof("Missed disconnected" +
1✔
494
                                        "blocks, attempting to catch up")
1✔
495
                        }
1✔
496

497
                        newBestBlock, err := chainntnfs.RewindChain(
44✔
498
                                b.chainConn, b.txNotifier, b.bestBlock,
44✔
499
                                update.blockHeight-1,
44✔
500
                        )
44✔
501
                        if err != nil {
45✔
502
                                chainntnfs.Log.Errorf("Unable to rewind chain "+
1✔
503
                                        "from height %d to height %d: %v",
1✔
504
                                        b.bestBlock.Height, update.blockHeight-1, err)
1✔
505
                        }
1✔
506

507
                        // Set the bestBlock here in case a chain rewind
508
                        // partially completed.
509
                        b.bestBlock = newBestBlock
44✔
510

511
                case item := <-b.txUpdates.ChanOut():
9✔
512
                        newSpend := item.(*txUpdate)
9✔
513
                        tx := newSpend.tx
9✔
514

9✔
515
                        // Init values.
9✔
516
                        isMempool := false
9✔
517
                        height := uint32(0)
9✔
518

9✔
519
                        // Unwrap values.
9✔
520
                        if newSpend.details == nil {
13✔
521
                                isMempool = true
4✔
522
                        } else {
9✔
523
                                height = uint32(newSpend.details.Height)
5✔
524
                        }
5✔
525

526
                        // Handle the transaction.
527
                        b.handleRelevantTx(tx, isMempool, height)
9✔
528

529
                case <-b.quit:
6✔
530
                        break out
6✔
531
                }
532
        }
533
}
534

535
// handleRelevantTx handles a new transaction that has been seen either in a
536
// block or in the mempool. If in mempool, it will ask the mempool notifier to
537
// handle it. If in a block, it will ask the txNotifier to handle it, and
538
// cancel any relevant subscriptions made in the mempool.
539
func (b *BtcdNotifier) handleRelevantTx(tx *btcutil.Tx,
540
        mempool bool, height uint32) {
9✔
541

9✔
542
        // If this is a mempool spend, we'll ask the mempool notifier to handle
9✔
543
        // it.
9✔
544
        if mempool {
13✔
545
                err := b.memNotifier.ProcessRelevantSpendTx(tx)
4✔
546
                if err != nil {
4✔
547
                        chainntnfs.Log.Errorf("Unable to process transaction "+
×
548
                                "%v: %v", tx.Hash(), err)
×
549
                }
×
550

551
                return
4✔
552
        }
553

554
        // Otherwise this is a confirmed spend, and we'll ask the tx notifier
555
        // to handle it.
556
        err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
5✔
557
        if err != nil {
5✔
558
                chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
×
559
                        tx.Hash(), err)
×
560

×
561
                return
×
562
        }
×
563

564
        // Once the tx is processed, we will ask the memNotifier to unsubscribe
565
        // the input.
566
        //
567
        // NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
568
        // but choose to implement it here so we can easily decouple the two
569
        // notifiers in the future.
570
        b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
5✔
571
}
572

573
// historicalConfDetails looks up whether a confirmation request (txid/output
574
// script) has already been included in a block in the active chain and, if so,
575
// returns details about said block.
576
func (b *BtcdNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequest,
577
        startHeight, endHeight uint32) (*chainntnfs.TxConfirmation,
578
        chainntnfs.TxConfStatus, error) {
39✔
579

39✔
580
        // If a txid was not provided, then we should dispatch upon seeing the
39✔
581
        // script on-chain, so we'll short-circuit straight to scanning manually
39✔
582
        // as there doesn't exist a script index to query.
39✔
583
        if confRequest.TxID == chainntnfs.ZeroHash {
55✔
584
                return b.confDetailsManually(
16✔
585
                        confRequest, startHeight, endHeight,
16✔
586
                )
16✔
587
        }
16✔
588

589
        // Otherwise, we'll dispatch upon seeing a transaction on-chain with the
590
        // given hash.
591
        //
592
        // We'll first attempt to retrieve the transaction using the node's
593
        // txindex.
594
        txNotFoundErr := "No information available about transaction"
23✔
595
        txConf, txStatus, err := chainntnfs.ConfDetailsFromTxIndex(
23✔
596
                b.chainConn, confRequest, txNotFoundErr,
23✔
597
        )
23✔
598

23✔
599
        // We'll then check the status of the transaction lookup returned to
23✔
600
        // determine whether we should proceed with any fallback methods.
23✔
601
        switch {
23✔
602

603
        // We failed querying the index for the transaction, fall back to
604
        // scanning manually.
605
        case err != nil:
8✔
606
                chainntnfs.Log.Debugf("Unable to determine confirmation of %v "+
8✔
607
                        "through the backend's txindex (%v), scanning manually",
8✔
608
                        confRequest.TxID, err)
8✔
609

8✔
610
                return b.confDetailsManually(
8✔
611
                        confRequest, startHeight, endHeight,
8✔
612
                )
8✔
613

614
        // The transaction was found within the node's mempool.
615
        case txStatus == chainntnfs.TxFoundMempool:
13✔
616

617
        // The transaction was found within the node's txindex.
618
        case txStatus == chainntnfs.TxFoundIndex:
1✔
619

620
        // The transaction was not found within the node's mempool or txindex.
621
        case txStatus == chainntnfs.TxNotFoundIndex:
1✔
622

623
        // Unexpected txStatus returned.
624
        default:
×
625
                return nil, txStatus,
×
626
                        fmt.Errorf("Got unexpected txConfStatus: %v", txStatus)
×
627
        }
628

629
        return txConf, txStatus, nil
15✔
630
}
631

632
// confDetailsManually looks up whether a transaction/output script has already
633
// been included in a block in the active chain by scanning the chain's blocks
634
// within the given range. If the transaction/output script is found, its
635
// confirmation details are returned. Otherwise, nil is returned.
636
func (b *BtcdNotifier) confDetailsManually(confRequest chainntnfs.ConfRequest,
637
        startHeight, endHeight uint32) (*chainntnfs.TxConfirmation,
638
        chainntnfs.TxConfStatus, error) {
24✔
639

24✔
640
        // Begin scanning blocks at every height to determine where the
24✔
641
        // transaction was included in.
24✔
642
        for height := endHeight; height >= startHeight && height > 0; height-- {
61✔
643
                // Ensure we haven't been requested to shut down before
37✔
644
                // processing the next height.
37✔
645
                select {
37✔
646
                case <-b.quit:
×
647
                        return nil, chainntnfs.TxNotFoundManually,
×
648
                                chainntnfs.ErrChainNotifierShuttingDown
×
649
                default:
37✔
650
                }
651

652
                blockHash, err := b.chainConn.GetBlockHash(int64(height))
37✔
653
                if err != nil {
37✔
654
                        return nil, chainntnfs.TxNotFoundManually,
×
655
                                fmt.Errorf("unable to get hash from block "+
×
656
                                        "with height %d", height)
×
657
                }
×
658

659
                // TODO: fetch the neutrino filters instead.
660
                block, err := b.GetBlock(blockHash)
37✔
661
                if err != nil {
37✔
662
                        return nil, chainntnfs.TxNotFoundManually,
×
663
                                fmt.Errorf("unable to get block with hash "+
×
664
                                        "%v: %v", blockHash, err)
×
665
                }
×
666

667
                // For every transaction in the block, check which one matches
668
                // our request. If we find one that does, we can dispatch its
669
                // confirmation details.
670
                for txIndex, tx := range block.Transactions {
97✔
671
                        if !confRequest.MatchesTx(tx) {
115✔
672
                                continue
55✔
673
                        }
674

675
                        return &chainntnfs.TxConfirmation{
5✔
676
                                Tx:          tx.Copy(),
5✔
677
                                BlockHash:   blockHash,
5✔
678
                                BlockHeight: height,
5✔
679
                                TxIndex:     uint32(txIndex),
5✔
680
                                Block:       block,
5✔
681
                        }, chainntnfs.TxFoundManually, nil
5✔
682
                }
683
        }
684

685
        // If we reach here, then we were not able to find the transaction
686
        // within a block, so we avoid returning an error.
687
        return nil, chainntnfs.TxNotFoundManually, nil
19✔
688
}
689

690
// handleBlockConnected applies a chain update for a new block. Any watched
691
// transactions included this block will processed to either send notifications
692
// now or after numConfirmations confs.
693
// TODO(halseth): this is reusing the neutrino notifier implementation, unify
694
// them.
695
func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
896✔
696
        // First, we'll fetch the raw block as we'll need to gather all the
896✔
697
        // transactions to determine whether any are relevant to our registered
896✔
698
        // clients.
896✔
699
        rawBlock, err := b.GetBlock(epoch.Hash)
896✔
700
        if err != nil {
896✔
701
                return fmt.Errorf("unable to get block: %w", err)
×
702
        }
×
703
        newBlock := &filteredBlock{
896✔
704
                hash:    *epoch.Hash,
896✔
705
                height:  uint32(epoch.Height),
896✔
706
                block:   btcutil.NewBlock(rawBlock),
896✔
707
                connect: true,
896✔
708
        }
896✔
709

896✔
710
        // We'll then extend the txNotifier's height with the information of
896✔
711
        // this new block, which will handle all of the notification logic for
896✔
712
        // us.
896✔
713
        err = b.txNotifier.ConnectTip(newBlock.block, newBlock.height)
896✔
714
        if err != nil {
896✔
715
                return fmt.Errorf("unable to connect tip: %w", err)
×
716
        }
×
717

718
        chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
896✔
719
                epoch.Hash)
896✔
720

896✔
721
        // Now that we've guaranteed the new block extends the txNotifier's
896✔
722
        // current tip, we'll proceed to dispatch notifications to all of our
896✔
723
        // registered clients whom have had notifications fulfilled. Before
896✔
724
        // doing so, we'll make sure update our in memory state in order to
896✔
725
        // satisfy any client requests based upon the new block.
896✔
726
        b.bestBlock = epoch
896✔
727

896✔
728
        err = b.txNotifier.NotifyHeight(uint32(epoch.Height))
896✔
729
        if err != nil {
896✔
730
                return fmt.Errorf("unable to notify height: %w", err)
×
731
        }
×
732

733
        b.notifyBlockEpochs(
896✔
734
                epoch.Height, epoch.Hash, epoch.BlockHeader,
896✔
735
        )
896✔
736

896✔
737
        return nil
896✔
738
}
739

740
// notifyBlockEpochs notifies all registered block epoch clients of the newly
741
// connected block to the main chain.
742
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32,
743
        newSha *chainhash.Hash, blockHeader *wire.BlockHeader) {
896✔
744

896✔
745
        for _, client := range b.blockEpochClients {
1,167✔
746
                b.notifyBlockEpochClient(
271✔
747
                        client, newHeight, newSha, blockHeader,
271✔
748
                )
271✔
749
        }
271✔
750
}
751

752
// notifyBlockEpochClient sends a registered block epoch client a notification
753
// about a specific block.
754
func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
755
        height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {
340✔
756

340✔
757
        epoch := &chainntnfs.BlockEpoch{
340✔
758
                Height:      height,
340✔
759
                Hash:        sha,
340✔
760
                BlockHeader: blockHeader,
340✔
761
        }
340✔
762

340✔
763
        select {
340✔
764
        case epochClient.epochQueue.ChanIn() <- epoch:
340✔
765
        case <-epochClient.cancelChan:
×
766
        case <-b.quit:
×
767
        }
768
}
769

770
// RegisterSpendNtfn registers an intent to be notified once the target
771
// outpoint/output script has been spent by a transaction on-chain. When
772
// intending to be notified of the spend of an output script, a nil outpoint
773
// must be used. The heightHint should represent the earliest height in the
774
// chain of the transaction that spent the outpoint/output script.
775
//
776
// Once a spend of has been detected, the details of the spending event will be
777
// sent across the 'Spend' channel.
778
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
779
        pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
26✔
780

26✔
781
        // Register the conf notification with the TxNotifier. A non-nil value
26✔
782
        // for `dispatch` will be returned if we are required to perform a
26✔
783
        // manual scan for the confirmation. Otherwise the notifier will begin
26✔
784
        // watching at tip for the transaction to confirm.
26✔
785
        ntfn, err := b.txNotifier.RegisterSpend(outpoint, pkScript, heightHint)
26✔
786
        if err != nil {
26✔
UNCOV
787
                return nil, err
×
UNCOV
788
        }
×
789

790
        // We'll then request the backend to notify us when it has detected the
791
        // outpoint/output script as spent.
792
        //
793
        // TODO(wilmer): use LoadFilter API instead.
794
        if outpoint == nil || *outpoint == chainntnfs.ZeroOutPoint {
39✔
795
                _, addrs, _, err := txscript.ExtractPkScriptAddrs(
13✔
796
                        pkScript, b.chainParams,
13✔
797
                )
13✔
798
                if err != nil {
13✔
799
                        return nil, fmt.Errorf("unable to parse script: %w",
×
800
                                err)
×
801
                }
×
802
                if err := b.chainConn.NotifyReceived(addrs); err != nil {
13✔
803
                        return nil, err
×
804
                }
×
805
        } else {
13✔
806
                ops := []*wire.OutPoint{outpoint}
13✔
807
                if err := b.chainConn.NotifySpent(ops); err != nil {
13✔
808
                        return nil, err
×
809
                }
×
810
        }
811

812
        // If the txNotifier didn't return any details to perform a historical
813
        // scan of the chain, then we can return early as there's nothing left
814
        // for us to do.
815
        if ntfn.HistoricalDispatch == nil {
50✔
816
                return ntfn.Event, nil
24✔
817
        }
24✔
818

819
        // Otherwise, we'll need to dispatch a historical rescan to determine if
820
        // the outpoint was already spent at a previous height.
821
        //
822
        // We'll short-circuit the path when dispatching the spend of a script,
823
        // rather than an outpoint, as there aren't any additional checks we can
824
        // make for scripts.
825
        if outpoint == nil || *outpoint == chainntnfs.ZeroOutPoint {
3✔
826
                startHash, err := b.chainConn.GetBlockHash(
1✔
827
                        int64(ntfn.HistoricalDispatch.StartHeight),
1✔
828
                )
1✔
829
                if err != nil {
1✔
830
                        return nil, err
×
831
                }
×
832

833
                // TODO(wilmer): add retry logic if rescan fails?
834
                _, addrs, _, err := txscript.ExtractPkScriptAddrs(
1✔
835
                        pkScript, b.chainParams,
1✔
836
                )
1✔
837
                if err != nil {
1✔
838
                        return nil, fmt.Errorf("unable to parse address: %w",
×
839
                                err)
×
840
                }
×
841

842
                asyncResult := b.chainConn.RescanAsync(startHash, addrs, nil)
1✔
843
                go func() {
2✔
844
                        if rescanErr := asyncResult.Receive(); rescanErr != nil {
1✔
845
                                chainntnfs.Log.Errorf("Rescan to determine "+
×
846
                                        "the spend details of %v failed: %v",
×
847
                                        ntfn.HistoricalDispatch.SpendRequest,
×
848
                                        rescanErr)
×
849
                        }
×
850
                }()
851

852
                return ntfn.Event, nil
1✔
853
        }
854

855
        // When dispatching spends of outpoints, there are a number of checks we
856
        // can make to start our rescan from a better height or completely avoid
857
        // it.
858
        //
859
        // We'll start by checking the backend's UTXO set to determine whether
860
        // the outpoint has been spent. If it hasn't, we can return to the
861
        // caller as well.
862
        txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
1✔
863
        if err != nil {
1✔
864
                return nil, err
×
865
        }
×
866
        if txOut != nil {
1✔
UNCOV
867
                // We'll let the txNotifier know the outpoint is still unspent
×
UNCOV
868
                // in order to begin updating its spend hint.
×
UNCOV
869
                err := b.txNotifier.UpdateSpendDetails(
×
UNCOV
870
                        ntfn.HistoricalDispatch.SpendRequest, nil,
×
UNCOV
871
                )
×
UNCOV
872
                if err != nil {
×
873
                        return nil, err
×
874
                }
×
875

UNCOV
876
                return ntfn.Event, nil
×
877
        }
878

879
        // Since the outpoint was spent, as it no longer exists within the UTXO
880
        // set, we'll determine when it happened by scanning the chain. We'll
881
        // begin by fetching the block hash of our starting height.
882
        startHash, err := b.chainConn.GetBlockHash(
1✔
883
                int64(ntfn.HistoricalDispatch.StartHeight),
1✔
884
        )
1✔
885
        if err != nil {
1✔
886
                return nil, fmt.Errorf("unable to get block hash for height "+
×
887
                        "%d: %v", ntfn.HistoricalDispatch.StartHeight, err)
×
888
        }
×
889

890
        // As a minimal optimization, we'll query the backend's transaction
891
        // index (if enabled) to determine if we have a better rescan starting
892
        // height. We can do this as the GetRawTransaction call will return the
893
        // hash of the block it was included in within the chain.
894
        tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
1✔
895
        if err != nil {
2✔
896
                // Avoid returning an error if the transaction was not found to
1✔
897
                // proceed with fallback methods.
1✔
898
                jsonErr, ok := err.(*btcjson.RPCError)
1✔
899
                if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
1✔
900
                        return nil, fmt.Errorf("unable to query for txid %v: "+
×
901
                                "%w", outpoint.Hash, err)
×
902
                }
×
903
        }
904

905
        // If the transaction index was enabled, we'll use the block's hash to
906
        // retrieve its height and check whether it provides a better starting
907
        // point for our rescan.
908
        if tx != nil {
1✔
UNCOV
909
                // If the transaction containing the outpoint hasn't confirmed
×
UNCOV
910
                // on-chain, then there's no need to perform a rescan.
×
UNCOV
911
                if tx.BlockHash == "" {
×
UNCOV
912
                        return ntfn.Event, nil
×
UNCOV
913
                }
×
914

UNCOV
915
                blockHash, err := chainhash.NewHashFromStr(tx.BlockHash)
×
UNCOV
916
                if err != nil {
×
917
                        return nil, err
×
918
                }
×
UNCOV
919
                blockHeader, err := b.chainConn.GetBlockHeaderVerbose(blockHash)
×
UNCOV
920
                if err != nil {
×
921
                        return nil, fmt.Errorf("unable to get header for "+
×
922
                                "block %v: %v", blockHash, err)
×
923
                }
×
924

UNCOV
925
                if uint32(blockHeader.Height) > ntfn.HistoricalDispatch.StartHeight {
×
UNCOV
926
                        startHash, err = b.chainConn.GetBlockHash(
×
UNCOV
927
                                int64(blockHeader.Height),
×
UNCOV
928
                        )
×
UNCOV
929
                        if err != nil {
×
930
                                return nil, fmt.Errorf("unable to get block "+
×
931
                                        "hash for height %d: %v",
×
932
                                        blockHeader.Height, err)
×
933
                        }
×
934
                }
935
        }
936

937
        // Now that we've determined the best starting point for our rescan,
938
        // we can go ahead and dispatch it.
939
        //
940
        // In order to ensure that we don't block the caller on what may be a
941
        // long rescan, we'll launch a new goroutine to handle the async result
942
        // of the rescan. We purposefully prevent from adding this goroutine to
943
        // the WaitGroup as we cannot wait for a quit signal due to the
944
        // asyncResult channel not being exposed.
945
        //
946
        // TODO(wilmer): add retry logic if rescan fails?
947
        asyncResult := b.chainConn.RescanAsync(
1✔
948
                startHash, nil, []*wire.OutPoint{outpoint},
1✔
949
        )
1✔
950
        go func() {
2✔
951
                if rescanErr := asyncResult.Receive(); rescanErr != nil {
1✔
952
                        chainntnfs.Log.Errorf("Rescan to determine the spend "+
×
953
                                "details of %v failed: %v", outpoint, rescanErr)
×
954
                }
×
955
        }()
956

957
        return ntfn.Event, nil
1✔
958
}
959

960
// RegisterConfirmationsNtfn registers an intent to be notified once the target
961
// txid/output script has reached numConfs confirmations on-chain. When
962
// intending to be notified of the confirmation of an output script, a nil txid
963
// must be used. The heightHint should represent the earliest height at which
964
// the txid/output script could have been included in the chain.
965
//
966
// Progress on the number of confirmations left can be read from the 'Updates'
967
// channel. Once it has reached all of its confirmations, a notification will be
968
// sent across the 'Confirmed' channel.
969
func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
970
        pkScript []byte, numConfs, heightHint uint32,
971
        opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) {
48✔
972

48✔
973
        // Register the conf notification with the TxNotifier. A non-nil value
48✔
974
        // for `dispatch` will be returned if we are required to perform a
48✔
975
        // manual scan for the confirmation. Otherwise the notifier will begin
48✔
976
        // watching at tip for the transaction to confirm.
48✔
977
        ntfn, err := b.txNotifier.RegisterConf(
48✔
978
                txid, pkScript, numConfs, heightHint, opts...,
48✔
979
        )
48✔
980
        if err != nil {
48✔
981
                return nil, err
×
982
        }
×
983

984
        if ntfn.HistoricalDispatch == nil {
63✔
985
                return ntfn.Event, nil
15✔
986
        }
15✔
987

988
        select {
33✔
989
        case b.notificationRegistry <- ntfn.HistoricalDispatch:
33✔
990
                return ntfn.Event, nil
33✔
991
        case <-b.quit:
×
992
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
993
        }
994
}
995

996
// blockEpochRegistration represents a client's intent to receive a
997
// notification with each newly connected block.
998
type blockEpochRegistration struct {
999
        epochID uint64
1000

1001
        epochChan chan *chainntnfs.BlockEpoch
1002

1003
        epochQueue *queue.ConcurrentQueue
1004

1005
        bestBlock *chainntnfs.BlockEpoch
1006

1007
        errorChan chan error
1008

1009
        cancelChan chan struct{}
1010

1011
        wg sync.WaitGroup
1012
}
1013

1014
// epochCancel is a message sent to the BtcdNotifier when a client wishes to
1015
// cancel an outstanding epoch notification that has yet to be dispatched.
1016
type epochCancel struct {
1017
        epochID uint64
1018
}
1019

1020
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
1021
// caller to receive notifications, of each new block connected to the main
1022
// chain. Clients have the option of passing in their best known block, which
1023
// the notifier uses to check if they are behind on blocks and catch them up. If
1024
// they do not provide one, then a notification will be dispatched immediately
1025
// for the current tip of the chain upon a successful registration.
1026
func (b *BtcdNotifier) RegisterBlockEpochNtfn(
1027
        bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
24✔
1028

24✔
1029
        reg := &blockEpochRegistration{
24✔
1030
                epochQueue: queue.NewConcurrentQueue(20),
24✔
1031
                epochChan:  make(chan *chainntnfs.BlockEpoch, 20),
24✔
1032
                cancelChan: make(chan struct{}),
24✔
1033
                epochID:    atomic.AddUint64(&b.epochClientCounter, 1),
24✔
1034
                bestBlock:  bestBlock,
24✔
1035
                errorChan:  make(chan error, 1),
24✔
1036
        }
24✔
1037

24✔
1038
        reg.epochQueue.Start()
24✔
1039

24✔
1040
        // Before we send the request to the main goroutine, we'll launch a new
24✔
1041
        // goroutine to proxy items added to our queue to the client itself.
24✔
1042
        // This ensures that all notifications are received *in order*.
24✔
1043
        reg.wg.Add(1)
24✔
1044
        go func() {
48✔
1045
                defer reg.wg.Done()
24✔
1046

24✔
1047
                for {
342✔
1048
                        select {
318✔
1049
                        case ntfn := <-reg.epochQueue.ChanOut():
296✔
1050
                                blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
296✔
1051
                                select {
296✔
1052
                                case reg.epochChan <- blockNtfn:
294✔
1053

UNCOV
1054
                                case <-reg.cancelChan:
×
UNCOV
1055
                                        return
×
1056

1057
                                case <-b.quit:
2✔
1058
                                        return
2✔
1059
                                }
1060

1061
                        case <-reg.cancelChan:
1✔
1062
                                return
1✔
1063

1064
                        case <-b.quit:
21✔
1065
                                return
21✔
1066
                        }
1067
                }
1068
        }()
1069

1070
        select {
24✔
1071
        case <-b.quit:
×
1072
                // As we're exiting before the registration could be sent,
×
1073
                // we'll stop the queue now ourselves.
×
1074
                reg.epochQueue.Stop()
×
1075

×
1076
                return nil, errors.New("chainntnfs: system interrupt while " +
×
1077
                        "attempting to register for block epoch notification.")
×
1078
        case b.notificationRegistry <- reg:
24✔
1079
                return &chainntnfs.BlockEpochEvent{
24✔
1080
                        Epochs: reg.epochChan,
24✔
1081
                        Cancel: func() {
25✔
1082
                                cancel := &epochCancel{
1✔
1083
                                        epochID: reg.epochID,
1✔
1084
                                }
1✔
1085

1✔
1086
                                // Submit epoch cancellation to notification dispatcher.
1✔
1087
                                select {
1✔
1088
                                case b.notificationCancels <- cancel:
1✔
1089
                                        // Cancellation is being handled, drain
1✔
1090
                                        // the epoch channel until it is closed
1✔
1091
                                        // before yielding to caller.
1✔
1092
                                        for {
3✔
1093
                                                select {
2✔
1094
                                                case _, ok := <-reg.epochChan:
2✔
1095
                                                        if !ok {
3✔
1096
                                                                return
1✔
1097
                                                        }
1✔
1098
                                                case <-b.quit:
×
1099
                                                        return
×
1100
                                                }
1101
                                        }
UNCOV
1102
                                case <-b.quit:
×
1103
                                }
1104
                        },
1105
                }, nil
1106
        }
1107
}
1108

1109
// GetBlock is used to retrieve the block with the given hash. This function
1110
// wraps the blockCache's GetBlock function.
1111
func (b *BtcdNotifier) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock,
1112
        error) {
933✔
1113

933✔
1114
        return b.blockCache.GetBlock(hash, b.chainConn.GetBlock)
933✔
1115
}
933✔
1116

1117
// SubscribeMempoolSpent allows the caller to register a subscription to watch
1118
// for a spend of an outpoint in the mempool.The event will be dispatched once
1119
// the outpoint is spent in the mempool.
1120
//
1121
// NOTE: part of the MempoolWatcher interface.
1122
func (b *BtcdNotifier) SubscribeMempoolSpent(
UNCOV
1123
        outpoint wire.OutPoint) (*chainntnfs.MempoolSpendEvent, error) {
×
UNCOV
1124

×
UNCOV
1125
        event := b.memNotifier.SubscribeInput(outpoint)
×
UNCOV
1126

×
UNCOV
1127
        ops := []*wire.OutPoint{&outpoint}
×
UNCOV
1128

×
UNCOV
1129
        return event, b.chainConn.NotifySpent(ops)
×
UNCOV
1130
}
×
1131

1132
// CancelMempoolSpendEvent allows the caller to cancel a subscription to watch
1133
// for a spend of an outpoint in the mempool.
1134
//
1135
// NOTE: part of the MempoolWatcher interface.
1136
func (b *BtcdNotifier) CancelMempoolSpendEvent(
UNCOV
1137
        sub *chainntnfs.MempoolSpendEvent) {
×
UNCOV
1138

×
UNCOV
1139
        b.memNotifier.UnsubscribeEvent(sub)
×
UNCOV
1140
}
×
1141

1142
// LookupInputMempoolSpend takes an outpoint and queries the mempool to find
1143
// its spending tx. Returns the tx if found, otherwise fn.None.
1144
//
1145
// NOTE: part of the MempoolWatcher interface.
1146
func (b *BtcdNotifier) LookupInputMempoolSpend(
UNCOV
1147
        op wire.OutPoint) fn.Option[wire.MsgTx] {
×
UNCOV
1148

×
UNCOV
1149
        // Find the spending txid.
×
UNCOV
1150
        txid, found := b.chainConn.LookupInputMempoolSpend(op)
×
UNCOV
1151
        if !found {
×
UNCOV
1152
                return fn.None[wire.MsgTx]()
×
UNCOV
1153
        }
×
1154

1155
        // Query the spending tx using the id.
UNCOV
1156
        tx, err := b.chainConn.GetRawTransaction(&txid)
×
UNCOV
1157
        if err != nil {
×
1158
                // TODO(yy): enable logging errors in this package.
×
1159
                return fn.None[wire.MsgTx]()
×
1160
        }
×
1161

UNCOV
1162
        return fn.Some(*tx.MsgTx().Copy())
×
1163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc