• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12349698563

16 Dec 2024 09:29AM UTC coverage: 58.55% (-0.09%) from 58.636%
12349698563

Pull #9357

github

GeorgeTsagk
contractcourt: include custom records on replayed htlc

When notifying the invoice registry for an exit hop htlc we also want to
include its custom records. The channelLink, the other caller of this
method, already populates this field. So we make sure the contest
resolver does so too.
Pull Request #9357: contractcourt: include custom records on replayed htlc

2 of 2 new or added lines in 1 file covered. (100.0%)

262 existing lines in 24 files now uncovered.

134243 of 229278 relevant lines covered (58.55%)

19277.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.93
/chainntnfs/bitcoindnotify/bitcoind.go
1
package bitcoindnotify
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcjson"
10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/txscript"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/chain"
16
        "github.com/lightningnetwork/lnd/blockcache"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/queue"
20
)
21

22
const (
23
        // notifierType uniquely identifies a concrete implementation of the
24
        // ChainNotifier interface that makes use of the bitcoind ZMQ interface.
25
        notifierTypeZMQ = "bitcoind"
26

27
        // notifierTypeRPCPolling uniquely identifies a concrete implementation
28
        // of the ChainNotifier interface that makes use of the bitcoind RPC
29
        // interface.
30
        notifierTypeRPCPolling = "bitcoind-rpc-polling"
31
)
32

33
// TODO(roasbeef): generalize struct below:
34
//  * move chans to config
35
//  * extract common code
36
//  * allow outside callers to handle send conditions
37

38
// BitcoindNotifier implements the ChainNotifier interface using a bitcoind
39
// chain client. Multiple concurrent clients are supported. All notifications
40
// are achieved via non-blocking sends on client channels.
41
type BitcoindNotifier struct {
42
        epochClientCounter uint64 // To be used atomically.
43

44
        start   sync.Once
45
        active  int32 // To be used atomically.
46
        stopped int32 // To be used atomically.
47

48
        chainConn   *chain.BitcoindClient
49
        chainParams *chaincfg.Params
50

51
        notificationCancels  chan interface{}
52
        notificationRegistry chan interface{}
53

54
        txNotifier *chainntnfs.TxNotifier
55

56
        blockEpochClients map[uint64]*blockEpochRegistration
57

58
        bestBlock chainntnfs.BlockEpoch
59

60
        // blockCache is a LRU block cache.
61
        blockCache *blockcache.BlockCache
62

63
        // spendHintCache is a cache used to query and update the latest height
64
        // hints for an outpoint. Each height hint represents the earliest
65
        // height at which the outpoint could have been spent within the chain.
66
        spendHintCache chainntnfs.SpendHintCache
67

68
        // confirmHintCache is a cache used to query the latest height hints for
69
        // a transaction. Each height hint represents the earliest height at
70
        // which the transaction could have confirmed within the chain.
71
        confirmHintCache chainntnfs.ConfirmHintCache
72

73
        // memNotifier notifies clients of events related to the mempool.
74
        memNotifier *chainntnfs.MempoolNotifier
75

76
        wg   sync.WaitGroup
77
        quit chan struct{}
78
}
79

80
// Ensure BitcoindNotifier implements the ChainNotifier interface at compile
81
// time.
82
var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil)
83

84
// Ensure BitcoindNotifier implements the MempoolWatcher interface at compile
85
// time.
86
var _ chainntnfs.MempoolWatcher = (*BitcoindNotifier)(nil)
87

88
// New returns a new BitcoindNotifier instance. This function assumes the
89
// bitcoind node detailed in the passed configuration is already running, and
90
// willing to accept RPC requests and new zmq clients.
91
func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params,
92
        spendHintCache chainntnfs.SpendHintCache,
93
        confirmHintCache chainntnfs.ConfirmHintCache,
94
        blockCache *blockcache.BlockCache) *BitcoindNotifier {
14✔
95

14✔
96
        notifier := &BitcoindNotifier{
14✔
97
                chainParams: chainParams,
14✔
98

14✔
99
                notificationCancels:  make(chan interface{}),
14✔
100
                notificationRegistry: make(chan interface{}),
14✔
101

14✔
102
                blockEpochClients: make(map[uint64]*blockEpochRegistration),
14✔
103

14✔
104
                spendHintCache:   spendHintCache,
14✔
105
                confirmHintCache: confirmHintCache,
14✔
106

14✔
107
                blockCache:  blockCache,
14✔
108
                memNotifier: chainntnfs.NewMempoolNotifier(),
14✔
109

14✔
110
                quit: make(chan struct{}),
14✔
111
        }
14✔
112

14✔
113
        notifier.chainConn = chainConn.NewBitcoindClient()
14✔
114

14✔
115
        return notifier
14✔
116
}
14✔
117

118
// Start connects to the running bitcoind node over websockets, registers for
119
// block notifications, and finally launches all related helper goroutines.
120
func (b *BitcoindNotifier) Start() error {
8✔
121
        var startErr error
8✔
122
        b.start.Do(func() {
16✔
123
                startErr = b.startNotifier()
8✔
124
        })
8✔
125

126
        return startErr
8✔
127
}
128

129
// Stop shutsdown the BitcoindNotifier.
130
func (b *BitcoindNotifier) Stop() error {
14✔
131
        // Already shutting down?
14✔
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
14✔
133
                return nil
×
134
        }
×
135

136
        chainntnfs.Log.Info("bitcoind notifier shutting down...")
14✔
137
        defer chainntnfs.Log.Debug("bitcoind notifier shutdown complete")
14✔
138

14✔
139
        // Shutdown the rpc client, this gracefully disconnects from bitcoind,
14✔
140
        // and cleans up all related resources.
14✔
141
        b.chainConn.Stop()
14✔
142
        b.chainConn.WaitForShutdown()
14✔
143

14✔
144
        close(b.quit)
14✔
145
        b.wg.Wait()
14✔
146

14✔
147
        // Notify all pending clients of our shutdown by closing the related
14✔
148
        // notification channels.
14✔
149
        for _, epochClient := range b.blockEpochClients {
62✔
150
                close(epochClient.cancelChan)
48✔
151
                epochClient.wg.Wait()
48✔
152

48✔
153
                close(epochClient.epochChan)
48✔
154
        }
48✔
155

156
        // The txNotifier is only initialized in the start method therefore we
157
        // need to make sure we don't access a nil pointer here.
158
        if b.txNotifier != nil {
28✔
159
                b.txNotifier.TearDown()
14✔
160
        }
14✔
161

162
        // Stop the mempool notifier.
163
        b.memNotifier.TearDown()
14✔
164

14✔
165
        return nil
14✔
166
}
167

168
// Started returns true if this instance has been started, and false otherwise.
169
func (b *BitcoindNotifier) Started() bool {
2✔
170
        return atomic.LoadInt32(&b.active) != 0
2✔
171
}
2✔
172

173
func (b *BitcoindNotifier) startNotifier() error {
8✔
174
        // Connect to bitcoind, and register for notifications on connected,
8✔
175
        // and disconnected blocks.
8✔
176
        if err := b.chainConn.Start(); err != nil {
8✔
177
                return err
×
178
        }
×
179
        if err := b.chainConn.NotifyBlocks(); err != nil {
8✔
180
                return err
×
181
        }
×
182

183
        currentHash, currentHeight, err := b.chainConn.GetBestBlock()
8✔
184
        if err != nil {
8✔
185
                return err
×
186
        }
×
187
        blockHeader, err := b.chainConn.GetBlockHeader(currentHash)
8✔
188
        if err != nil {
8✔
189
                return err
×
190
        }
×
191

192
        b.txNotifier = chainntnfs.NewTxNotifier(
8✔
193
                uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
8✔
194
                b.confirmHintCache, b.spendHintCache,
8✔
195
        )
8✔
196

8✔
197
        b.bestBlock = chainntnfs.BlockEpoch{
8✔
198
                Height:      currentHeight,
8✔
199
                Hash:        currentHash,
8✔
200
                BlockHeader: blockHeader,
8✔
201
        }
8✔
202

8✔
203
        b.wg.Add(1)
8✔
204
        go b.notificationDispatcher()
8✔
205

8✔
206
        // Set the active flag now that we've completed the full
8✔
207
        // startup.
8✔
208
        atomic.StoreInt32(&b.active, 1)
8✔
209

8✔
210
        return nil
8✔
211
}
212

213
// notificationDispatcher is the primary goroutine which handles client
214
// notification registrations, as well as notification dispatches.
215
func (b *BitcoindNotifier) notificationDispatcher() {
14✔
216
        defer b.wg.Done()
14✔
217

14✔
218
out:
14✔
219
        for {
857✔
220
                select {
843✔
221
                case cancelMsg := <-b.notificationCancels:
4✔
222
                        switch msg := cancelMsg.(type) {
4✔
223
                        case *epochCancel:
4✔
224
                                chainntnfs.Log.Infof("Cancelling epoch "+
4✔
225
                                        "notification, epoch_id=%v", msg.epochID)
4✔
226

4✔
227
                                // First, we'll lookup the original
4✔
228
                                // registration in order to stop the active
4✔
229
                                // queue goroutine.
4✔
230
                                reg := b.blockEpochClients[msg.epochID]
4✔
231
                                reg.epochQueue.Stop()
4✔
232

4✔
233
                                // Next, close the cancel channel for this
4✔
234
                                // specific client, and wait for the client to
4✔
235
                                // exit.
4✔
236
                                close(b.blockEpochClients[msg.epochID].cancelChan)
4✔
237
                                b.blockEpochClients[msg.epochID].wg.Wait()
4✔
238

4✔
239
                                // Once the client has exited, we can then
4✔
240
                                // safely close the channel used to send epoch
4✔
241
                                // notifications, in order to notify any
4✔
242
                                // listeners that the intent has been
4✔
243
                                // canceled.
4✔
244
                                close(b.blockEpochClients[msg.epochID].epochChan)
4✔
245
                                delete(b.blockEpochClients, msg.epochID)
4✔
246

247
                        }
248
                case registerMsg := <-b.notificationRegistry:
120✔
249
                        switch msg := registerMsg.(type) {
120✔
250
                        case *chainntnfs.HistoricalConfDispatch:
68✔
251
                                // Look up whether the transaction is already
68✔
252
                                // included in the active chain. We'll do this
68✔
253
                                // in a goroutine to prevent blocking
68✔
254
                                // potentially long rescans.
68✔
255
                                //
68✔
256
                                // TODO(wilmer): add retry logic if rescan fails?
68✔
257
                                b.wg.Add(1)
68✔
258

68✔
259
                                //nolint:ll
68✔
260
                                go func(msg *chainntnfs.HistoricalConfDispatch) {
136✔
261
                                        defer b.wg.Done()
68✔
262

68✔
263
                                        confDetails, _, err := b.historicalConfDetails(
68✔
264
                                                msg.ConfRequest,
68✔
265
                                                msg.StartHeight, msg.EndHeight,
68✔
266
                                        )
68✔
267
                                        if err != nil {
68✔
268
                                                chainntnfs.Log.Errorf("Rescan to "+
×
269
                                                        "determine the conf "+
×
270
                                                        "details of %v within "+
×
271
                                                        "range %d-%d failed: %v",
×
272
                                                        msg.ConfRequest,
×
273
                                                        msg.StartHeight,
×
274
                                                        msg.EndHeight, err)
×
275
                                                return
×
276
                                        }
×
277

278
                                        // If the historical dispatch finished
279
                                        // without error, we will invoke
280
                                        // UpdateConfDetails even if none were
281
                                        // found. This allows the notifier to
282
                                        // begin safely updating the height hint
283
                                        // cache at tip, since any pending
284
                                        // rescans have now completed.
285
                                        err = b.txNotifier.UpdateConfDetails(
68✔
286
                                                msg.ConfRequest, confDetails,
68✔
287
                                        )
68✔
288
                                        if err != nil {
68✔
289
                                                chainntnfs.Log.Errorf("Unable "+
×
290
                                                        "to update conf "+
×
291
                                                        "details of %v: %v",
×
292
                                                        msg.ConfRequest, err)
×
293
                                        }
×
294
                                }(msg)
295

296
                        case *chainntnfs.HistoricalSpendDispatch:
6✔
297
                                // In order to ensure we don't block the caller
6✔
298
                                // on what may be a long rescan, we'll launch a
6✔
299
                                // goroutine to do so in the background.
6✔
300
                                //
6✔
301
                                // TODO(wilmer): add retry logic if rescan fails?
6✔
302
                                b.wg.Add(1)
6✔
303

6✔
304
                                //nolint:ll
6✔
305
                                go func(msg *chainntnfs.HistoricalSpendDispatch) {
12✔
306
                                        defer b.wg.Done()
6✔
307

6✔
308
                                        spendDetails, err := b.historicalSpendDetails(
6✔
309
                                                msg.SpendRequest,
6✔
310
                                                msg.StartHeight, msg.EndHeight,
6✔
311
                                        )
6✔
312
                                        if err != nil {
6✔
313
                                                chainntnfs.Log.Errorf("Rescan to "+
×
314
                                                        "determine the spend "+
×
315
                                                        "details of %v within "+
×
316
                                                        "range %d-%d failed: %v",
×
317
                                                        msg.SpendRequest,
×
318
                                                        msg.StartHeight,
×
319
                                                        msg.EndHeight, err)
×
320
                                                return
×
321
                                        }
×
322

323
                                        chainntnfs.Log.Infof("Historical "+
6✔
324
                                                "spend dispatch finished "+
6✔
325
                                                "for request %v (start=%v "+
6✔
326
                                                "end=%v) with details: %v",
6✔
327
                                                msg.SpendRequest,
6✔
328
                                                msg.StartHeight, msg.EndHeight,
6✔
329
                                                spendDetails)
6✔
330

6✔
331
                                        // If the historical dispatch finished
6✔
332
                                        // without error, we will invoke
6✔
333
                                        // UpdateSpendDetails even if none were
6✔
334
                                        // found. This allows the notifier to
6✔
335
                                        // begin safely updating the height hint
6✔
336
                                        // cache at tip, since any pending
6✔
337
                                        // rescans have now completed.
6✔
338
                                        err = b.txNotifier.UpdateSpendDetails(
6✔
339
                                                msg.SpendRequest, spendDetails,
6✔
340
                                        )
6✔
341
                                        if err != nil {
6✔
342
                                                chainntnfs.Log.Errorf("Unable "+
×
343
                                                        "to update spend "+
×
344
                                                        "details of %v: %v",
×
345
                                                        msg.SpendRequest, err)
×
346
                                        }
×
347
                                }(msg)
348

349
                        case *blockEpochRegistration:
50✔
350
                                chainntnfs.Log.Infof("New block epoch subscription")
50✔
351

50✔
352
                                b.blockEpochClients[msg.epochID] = msg
50✔
353

50✔
354
                                // If the client did not provide their best
50✔
355
                                // known block, then we'll immediately dispatch
50✔
356
                                // a notification for the current tip.
50✔
357
                                if msg.bestBlock == nil {
90✔
358
                                        b.notifyBlockEpochClient(
40✔
359
                                                msg, b.bestBlock.Height,
40✔
360
                                                b.bestBlock.Hash,
40✔
361
                                                b.bestBlock.BlockHeader,
40✔
362
                                        )
40✔
363

40✔
364
                                        msg.errorChan <- nil
40✔
365
                                        continue
40✔
366
                                }
367

368
                                // Otherwise, we'll attempt to deliver the
369
                                // backlog of notifications from their best
370
                                // known block.
371
                                missedBlocks, err := chainntnfs.GetClientMissedBlocks(
12✔
372
                                        b.chainConn, msg.bestBlock,
12✔
373
                                        b.bestBlock.Height, true,
12✔
374
                                )
12✔
375
                                if err != nil {
12✔
UNCOV
376
                                        msg.errorChan <- err
×
UNCOV
377
                                        continue
×
378
                                }
379

380
                                for _, block := range missedBlocks {
114✔
381
                                        b.notifyBlockEpochClient(
102✔
382
                                                msg, block.Height, block.Hash,
102✔
383
                                                block.BlockHeader,
102✔
384
                                        )
102✔
385
                                }
102✔
386

387
                                msg.errorChan <- nil
12✔
388
                        }
389

390
                case ntfn := <-b.chainConn.Notifications():
711✔
391
                        switch item := ntfn.(type) {
711✔
392
                        case chain.BlockConnected:
318✔
393
                                blockHeader, err :=
318✔
394
                                        b.chainConn.GetBlockHeader(&item.Hash)
318✔
395
                                if err != nil {
318✔
396
                                        chainntnfs.Log.Errorf("Unable to fetch "+
×
397
                                                "block header: %v", err)
×
398
                                        continue
×
399
                                }
400

401
                                if blockHeader.PrevBlock != *b.bestBlock.Hash {
322✔
402
                                        // Handle the case where the notifier
4✔
403
                                        // missed some blocks from its chain
4✔
404
                                        // backend.
4✔
405
                                        chainntnfs.Log.Infof("Missed blocks, " +
4✔
406
                                                "attempting to catch up")
4✔
407
                                        newBestBlock, missedBlocks, err :=
4✔
408
                                                chainntnfs.HandleMissedBlocks(
4✔
409
                                                        b.chainConn,
4✔
410
                                                        b.txNotifier,
4✔
411
                                                        b.bestBlock, item.Height,
4✔
412
                                                        true,
4✔
413
                                                )
4✔
414

4✔
415
                                        if err != nil {
4✔
UNCOV
416
                                                // Set the bestBlock here in case
×
UNCOV
417
                                                // a catch up partially completed.
×
UNCOV
418
                                                b.bestBlock = newBestBlock
×
UNCOV
419
                                                chainntnfs.Log.Error(err)
×
UNCOV
420
                                                continue
×
421
                                        }
422

423
                                        for _, block := range missedBlocks {
46✔
424
                                                err := b.handleBlockConnected(block)
42✔
425
                                                if err != nil {
42✔
426
                                                        chainntnfs.Log.Error(err)
×
427
                                                        continue out
×
428
                                                }
429
                                        }
430
                                }
431

432
                                newBlock := chainntnfs.BlockEpoch{
318✔
433
                                        Height:      item.Height,
318✔
434
                                        Hash:        &item.Hash,
318✔
435
                                        BlockHeader: blockHeader,
318✔
436
                                }
318✔
437
                                if err := b.handleBlockConnected(newBlock); err != nil {
318✔
438
                                        chainntnfs.Log.Error(err)
×
439
                                }
×
440

441
                                continue
318✔
442

443
                        case chain.BlockDisconnected:
10✔
444
                                if item.Height != b.bestBlock.Height {
11✔
445
                                        chainntnfs.Log.Infof("Missed disconnected" +
1✔
446
                                                "blocks, attempting to catch up")
1✔
447
                                }
1✔
448

449
                                newBestBlock, err := chainntnfs.RewindChain(
10✔
450
                                        b.chainConn, b.txNotifier,
10✔
451
                                        b.bestBlock, item.Height-1,
10✔
452
                                )
10✔
453
                                if err != nil {
10✔
454
                                        chainntnfs.Log.Errorf("Unable to rewind chain "+
×
455
                                                "from height %d to height %d: %v",
×
456
                                                b.bestBlock.Height, item.Height-1, err)
×
457
                                }
×
458

459
                                // Set the bestBlock here in case a chain
460
                                // rewind partially completed.
461
                                b.bestBlock = newBestBlock
10✔
462

463
                        case chain.RelevantTx:
63✔
464
                                tx := btcutil.NewTx(&item.TxRecord.MsgTx)
63✔
465

63✔
466
                                // Init values.
63✔
467
                                isMempool := false
63✔
468
                                height := uint32(0)
63✔
469

63✔
470
                                // Unwrap values.
63✔
471
                                if item.Block == nil {
100✔
472
                                        isMempool = true
37✔
473
                                } else {
65✔
474
                                        height = uint32(item.Block.Height)
28✔
475
                                }
28✔
476

477
                                // Handle the transaction.
478
                                b.handleRelevantTx(tx, isMempool, height)
63✔
479
                        }
480

481
                case <-b.quit:
14✔
482
                        break out
14✔
483
                }
484
        }
485
}
486

487
// handleRelevantTx handles a new transaction that has been seen either in a
488
// block or in the mempool. If in mempool, it will ask the mempool notifier to
489
// handle it. If in a block, it will ask the txNotifier to handle it, and
490
// cancel any relevant subscriptions made in the mempool.
491
func (b *BitcoindNotifier) handleRelevantTx(tx *btcutil.Tx,
492
        mempool bool, height uint32) {
63✔
493

63✔
494
        // If this is a mempool spend, we'll ask the mempool notifier to hanlde
63✔
495
        // it.
63✔
496
        if mempool {
100✔
497
                err := b.memNotifier.ProcessRelevantSpendTx(tx)
37✔
498
                if err != nil {
37✔
499
                        chainntnfs.Log.Errorf("Unable to process transaction "+
×
500
                                "%v: %v", tx.Hash(), err)
×
501
                }
×
502

503
                return
37✔
504
        }
505

506
        // Otherwise this is a confirmed spend, and we'll ask the tx notifier
507
        // to handle it.
508
        err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
28✔
509
        if err != nil {
28✔
510
                chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
×
511
                        tx.Hash(), err)
×
512

×
513
                return
×
514
        }
×
515

516
        // Once the tx is processed, we will ask the memNotifier to unsubscribe
517
        // the input.
518
        //
519
        // NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
520
        // but choose to implement it here so we can easily decouple the two
521
        // notifiers in the future.
522
        b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
28✔
523
}
524

525
// historicalConfDetails looks up whether a confirmation request (txid/output
526
// script) has already been included in a block in the active chain and, if so,
527
// returns details about said block.
528
func (b *BitcoindNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequest,
529
        startHeight, endHeight uint32) (*chainntnfs.TxConfirmation,
530
        chainntnfs.TxConfStatus, error) {
78✔
531

78✔
532
        // If a txid was not provided, then we should dispatch upon seeing the
78✔
533
        // script on-chain, so we'll short-circuit straight to scanning manually
78✔
534
        // as there doesn't exist a script index to query.
78✔
535
        if confRequest.TxID == chainntnfs.ZeroHash {
108✔
536
                return b.confDetailsManually(
30✔
537
                        confRequest, startHeight, endHeight,
30✔
538
                )
30✔
539
        }
30✔
540

541
        // Otherwise, we'll dispatch upon seeing a transaction on-chain with the
542
        // given hash.
543
        //
544
        // We'll first attempt to retrieve the transaction using the node's
545
        // txindex.
546
        txNotFoundErr := "No such mempool or blockchain transaction"
48✔
547
        txConf, txStatus, err := chainntnfs.ConfDetailsFromTxIndex(
48✔
548
                b.chainConn, confRequest, txNotFoundErr,
48✔
549
        )
48✔
550

48✔
551
        // We'll then check the status of the transaction lookup returned to
48✔
552
        // determine whether we should proceed with any fallback methods.
48✔
553
        switch {
48✔
554

555
        // We failed querying the index for the transaction, fall back to
556
        // scanning manually.
557
        case err != nil:
8✔
558
                chainntnfs.Log.Debugf("Failed getting conf details from "+
8✔
559
                        "index (%v), scanning manually", err)
8✔
560
                return b.confDetailsManually(confRequest, startHeight, endHeight)
8✔
561

562
        // The transaction was found within the node's mempool.
563
        case txStatus == chainntnfs.TxFoundMempool:
17✔
564

565
        // The transaction was found within the node's txindex.
566
        case txStatus == chainntnfs.TxFoundIndex:
12✔
567

568
        // The transaction was not found within the node's mempool or txindex.
569
        case txStatus == chainntnfs.TxNotFoundIndex:
15✔
570

571
        // Unexpected txStatus returned.
572
        default:
×
573
                return nil, txStatus,
×
574
                        fmt.Errorf("Got unexpected txConfStatus: %v", txStatus)
×
575
        }
576

577
        return txConf, txStatus, nil
40✔
578
}
579

580
// confDetailsManually looks up whether a transaction/output script has already
581
// been included in a block in the active chain by scanning the chain's blocks
582
// within the given range. If the transaction/output script is found, its
583
// confirmation details are returned. Otherwise, nil is returned.
584
func (b *BitcoindNotifier) confDetailsManually(confRequest chainntnfs.ConfRequest,
585
        heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation,
586
        chainntnfs.TxConfStatus, error) {
38✔
587

38✔
588
        // Begin scanning blocks at every height to determine where the
38✔
589
        // transaction was included in.
38✔
590
        for height := currentHeight; height >= heightHint && height > 0; height-- {
100✔
591
                // Ensure we haven't been requested to shut down before
62✔
592
                // processing the next height.
62✔
593
                select {
62✔
594
                case <-b.quit:
×
595
                        return nil, chainntnfs.TxNotFoundManually,
×
596
                                chainntnfs.ErrChainNotifierShuttingDown
×
597
                default:
62✔
598
                }
599

600
                blockHash, err := b.chainConn.GetBlockHash(int64(height))
62✔
601
                if err != nil {
62✔
602
                        return nil, chainntnfs.TxNotFoundManually,
×
603
                                fmt.Errorf("unable to get hash from block "+
×
604
                                        "with height %d", height)
×
605
                }
×
606

607
                block, err := b.GetBlock(blockHash)
62✔
608
                if err != nil {
62✔
609
                        return nil, chainntnfs.TxNotFoundManually,
×
610
                                fmt.Errorf("unable to get block with hash "+
×
611
                                        "%v: %v", blockHash, err)
×
612
                }
×
613

614
                // For every transaction in the block, check which one matches
615
                // our request. If we find one that does, we can dispatch its
616
                // confirmation details.
617
                for txIndex, tx := range block.Transactions {
159✔
618
                        if !confRequest.MatchesTx(tx) {
188✔
619
                                continue
91✔
620
                        }
621

622
                        return &chainntnfs.TxConfirmation{
6✔
623
                                Tx:          tx.Copy(),
6✔
624
                                BlockHash:   blockHash,
6✔
625
                                BlockHeight: height,
6✔
626
                                TxIndex:     uint32(txIndex),
6✔
627
                                Block:       block,
6✔
628
                        }, chainntnfs.TxFoundManually, nil
6✔
629
                }
630
        }
631

632
        // If we reach here, then we were not able to find the transaction
633
        // within a block, so we avoid returning an error.
634
        return nil, chainntnfs.TxNotFoundManually, nil
32✔
635
}
636

637
// handleBlockConnected applies a chain update for a new block. Any watched
638
// transactions included this block will processed to either send notifications
639
// now or after numConfirmations confs.
640
func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error {
360✔
641
        // First, we'll fetch the raw block as we'll need to gather all the
360✔
642
        // transactions to determine whether any are relevant to our registered
360✔
643
        // clients.
360✔
644
        rawBlock, err := b.GetBlock(block.Hash)
360✔
645
        if err != nil {
360✔
646
                return fmt.Errorf("unable to get block: %w", err)
×
647
        }
×
648
        utilBlock := btcutil.NewBlock(rawBlock)
360✔
649

360✔
650
        // We'll then extend the txNotifier's height with the information of
360✔
651
        // this new block, which will handle all of the notification logic for
360✔
652
        // us.
360✔
653
        err = b.txNotifier.ConnectTip(utilBlock, uint32(block.Height))
360✔
654
        if err != nil {
360✔
655
                return fmt.Errorf("unable to connect tip: %w", err)
×
656
        }
×
657

658
        chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height,
360✔
659
                block.Hash)
360✔
660

360✔
661
        // Now that we've guaranteed the new block extends the txNotifier's
360✔
662
        // current tip, we'll proceed to dispatch notifications to all of our
360✔
663
        // registered clients whom have had notifications fulfilled. Before
360✔
664
        // doing so, we'll make sure update our in memory state in order to
360✔
665
        // satisfy any client requests based upon the new block.
360✔
666
        b.bestBlock = block
360✔
667

360✔
668
        err = b.txNotifier.NotifyHeight(uint32(block.Height))
360✔
669
        if err != nil {
360✔
670
                return fmt.Errorf("unable to notify height: %w", err)
×
671
        }
×
672

673
        b.notifyBlockEpochs(block.Height, block.Hash, block.BlockHeader)
360✔
674

360✔
675
        return nil
360✔
676
}
677

678
// notifyBlockEpochs notifies all registered block epoch clients of the newly
679
// connected block to the main chain.
680
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
681
        blockHeader *wire.BlockHeader) {
360✔
682

360✔
683
        for _, client := range b.blockEpochClients {
904✔
684
                b.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
544✔
685
        }
544✔
686
}
687

688
// notifyBlockEpochClient sends a registered block epoch client a notification
689
// about a specific block.
690
func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
691
        height int32, sha *chainhash.Hash, header *wire.BlockHeader) {
682✔
692

682✔
693
        epoch := &chainntnfs.BlockEpoch{
682✔
694
                Height:      height,
682✔
695
                Hash:        sha,
682✔
696
                BlockHeader: header,
682✔
697
        }
682✔
698

682✔
699
        select {
682✔
700
        case epochClient.epochQueue.ChanIn() <- epoch:
682✔
701
        case <-epochClient.cancelChan:
×
702
        case <-b.quit:
×
703
        }
704
}
705

706
// RegisterSpendNtfn registers an intent to be notified once the target
707
// outpoint/output script has been spent by a transaction on-chain. When
708
// intending to be notified of the spend of an output script, a nil outpoint
709
// must be used. The heightHint should represent the earliest height in the
710
// chain of the transaction that spent the outpoint/output script.
711
//
712
// Once a spend of has been detected, the details of the spending event will be
713
// sent across the 'Spend' channel.
714
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
715
        pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
54✔
716

54✔
717
        // Register the conf notification with the TxNotifier. A non-nil value
54✔
718
        // for `dispatch` will be returned if we are required to perform a
54✔
719
        // manual scan for the confirmation. Otherwise the notifier will begin
54✔
720
        // watching at tip for the transaction to confirm.
54✔
721
        ntfn, err := b.txNotifier.RegisterSpend(outpoint, pkScript, heightHint)
54✔
722
        if err != nil {
56✔
723
                return nil, err
2✔
724
        }
2✔
725

726
        // We'll then request the backend to notify us when it has detected the
727
        // outpoint/output script as spent.
728
        //
729
        // TODO(wilmer): use LoadFilter API instead.
730
        if outpoint == nil || *outpoint == chainntnfs.ZeroOutPoint {
80✔
731
                _, addrs, _, err := txscript.ExtractPkScriptAddrs(
26✔
732
                        pkScript, b.chainParams,
26✔
733
                )
26✔
734
                if err != nil {
26✔
735
                        return nil, fmt.Errorf("unable to parse script: %w",
×
736
                                err)
×
737
                }
×
738
                if err := b.chainConn.NotifyReceived(addrs); err != nil {
26✔
739
                        return nil, err
×
740
                }
×
741
        } else {
28✔
742
                ops := []*wire.OutPoint{outpoint}
28✔
743
                if err := b.chainConn.NotifySpent(ops); err != nil {
28✔
744
                        return nil, err
×
745
                }
×
746
        }
747

748
        // If the txNotifier didn't return any details to perform a historical
749
        // scan of the chain, then we can return early as there's nothing left
750
        // for us to do.
751
        if ntfn.HistoricalDispatch == nil {
104✔
752
                return ntfn.Event, nil
50✔
753
        }
50✔
754

755
        // Otherwise, we'll need to dispatch a historical rescan to determine if
756
        // the outpoint was already spent at a previous height.
757
        //
758
        // We'll short-circuit the path when dispatching the spend of a script,
759
        // rather than an outpoint, as there aren't any additional checks we can
760
        // make for scripts.
761
        if ntfn.HistoricalDispatch.OutPoint == chainntnfs.ZeroOutPoint {
8✔
762
                select {
2✔
763
                case b.notificationRegistry <- ntfn.HistoricalDispatch:
2✔
764
                case <-b.quit:
×
765
                        return nil, chainntnfs.ErrChainNotifierShuttingDown
×
766
                }
767

768
                return ntfn.Event, nil
2✔
769
        }
770

771
        // When dispatching spends of outpoints, there are a number of checks we
772
        // can make to start our rescan from a better height or completely avoid
773
        // it.
774
        //
775
        // We'll start by checking the backend's UTXO set to determine whether
776
        // the outpoint has been spent. If it hasn't, we can return to the
777
        // caller as well.
778
        txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
4✔
779
        if err != nil {
4✔
780
                return nil, err
×
781
        }
×
782
        if txOut != nil {
6✔
783
                // We'll let the txNotifier know the outpoint is still unspent
2✔
784
                // in order to begin updating its spend hint.
2✔
785
                err := b.txNotifier.UpdateSpendDetails(
2✔
786
                        ntfn.HistoricalDispatch.SpendRequest, nil,
2✔
787
                )
2✔
788
                if err != nil {
2✔
789
                        return nil, err
×
790
                }
×
791

792
                return ntfn.Event, nil
2✔
793
        }
794

795
        // Since the outpoint was spent, as it no longer exists within the UTXO
796
        // set, we'll determine when it happened by scanning the chain.
797
        //
798
        // As a minimal optimization, we'll query the backend's transaction
799
        // index (if enabled) to determine if we have a better rescan starting
800
        // height. We can do this as the GetRawTransaction call will return the
801
        // hash of the block it was included in within the chain.
802
        tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
4✔
803
        if err != nil {
6✔
804
                // Avoid returning an error if the transaction was not found to
2✔
805
                // proceed with fallback methods.
2✔
806
                jsonErr, ok := err.(*btcjson.RPCError)
2✔
807
                if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
2✔
808
                        return nil, fmt.Errorf("unable to query for txid "+
×
809
                                "%v: %w", outpoint.Hash, err)
×
810
                }
×
811
        }
812

813
        // If the transaction index was enabled, we'll use the block's hash to
814
        // retrieve its height and check whether it provides a better starting
815
        // point for our rescan.
816
        if tx != nil {
8✔
817
                // If the transaction containing the outpoint hasn't confirmed
4✔
818
                // on-chain, then there's no need to perform a rescan.
4✔
819
                if tx.BlockHash == "" {
6✔
820
                        return ntfn.Event, nil
2✔
821
                }
2✔
822

823
                blockHash, err := chainhash.NewHashFromStr(tx.BlockHash)
4✔
824
                if err != nil {
4✔
825
                        return nil, err
×
826
                }
×
827
                blockHeight, err := b.chainConn.GetBlockHeight(blockHash)
4✔
828
                if err != nil {
4✔
829
                        return nil, err
×
830
                }
×
831

832
                if uint32(blockHeight) > ntfn.HistoricalDispatch.StartHeight {
6✔
833
                        ntfn.HistoricalDispatch.StartHeight = uint32(blockHeight)
2✔
834
                }
2✔
835
        }
836

837
        // Now that we've determined the starting point of our rescan, we can
838
        // dispatch it and return.
839
        select {
4✔
840
        case b.notificationRegistry <- ntfn.HistoricalDispatch:
4✔
841
        case <-b.quit:
×
842
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
843
        }
844

845
        return ntfn.Event, nil
4✔
846
}
847

848
// historicalSpendDetails attempts to manually scan the chain within the given
849
// height range for a transaction that spends the given outpoint/output script.
850
// If one is found, the spend details are assembled and returned to the caller.
851
// If the spend is not found, a nil spend detail will be returned.
852
func (b *BitcoindNotifier) historicalSpendDetails(
853
        spendRequest chainntnfs.SpendRequest, startHeight, endHeight uint32) (
854
        *chainntnfs.SpendDetail, error) {
6✔
855

6✔
856
        // Begin scanning blocks at every height to determine if the outpoint
6✔
857
        // was spent.
6✔
858
        for height := endHeight; height >= startHeight && height > 0; height-- {
12✔
859
                // Ensure we haven't been requested to shut down before
6✔
860
                // processing the next height.
6✔
861
                select {
6✔
862
                case <-b.quit:
×
863
                        return nil, chainntnfs.ErrChainNotifierShuttingDown
×
864
                default:
6✔
865
                }
866

867
                // First, we'll fetch the block for the current height.
868
                blockHash, err := b.chainConn.GetBlockHash(int64(height))
6✔
869
                if err != nil {
6✔
870
                        return nil, fmt.Errorf("unable to retrieve hash for "+
×
871
                                "block with height %d: %v", height, err)
×
872
                }
×
873
                block, err := b.GetBlock(blockHash)
6✔
874
                if err != nil {
6✔
875
                        return nil, fmt.Errorf("unable to retrieve block "+
×
876
                                "with hash %v: %v", blockHash, err)
×
877
                }
×
878

879
                // Then, we'll manually go over every input in every transaction
880
                // in it and determine whether it spends the request in
881
                // question. If we find one, we'll dispatch the spend details.
882
                for _, tx := range block.Transactions {
16✔
883
                        matches, inputIdx, err := spendRequest.MatchesTx(tx)
10✔
884
                        if err != nil {
10✔
885
                                return nil, err
×
886
                        }
×
887
                        if !matches {
20✔
888
                                continue
10✔
889
                        }
890

891
                        txCopy := tx.Copy()
2✔
892
                        txHash := txCopy.TxHash()
2✔
893
                        spendOutPoint := &txCopy.TxIn[inputIdx].PreviousOutPoint
2✔
894
                        return &chainntnfs.SpendDetail{
2✔
895
                                SpentOutPoint:     spendOutPoint,
2✔
896
                                SpenderTxHash:     &txHash,
2✔
897
                                SpendingTx:        txCopy,
2✔
898
                                SpenderInputIndex: inputIdx,
2✔
899
                                SpendingHeight:    int32(height),
2✔
900
                        }, nil
2✔
901
                }
902
        }
903

904
        return nil, nil
6✔
905
}
906

907
// RegisterConfirmationsNtfn registers an intent to be notified once the target
908
// txid/output script has reached numConfs confirmations on-chain. When
909
// intending to be notified of the confirmation of an output script, a nil txid
910
// must be used. The heightHint should represent the earliest height at which
911
// the txid/output script could have been included in the chain.
912
//
913
// Progress on the number of confirmations left can be read from the 'Updates'
914
// channel. Once it has reached all of its confirmations, a notification will be
915
// sent across the 'Confirmed' channel.
916
func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
917
        pkScript []byte, numConfs, heightHint uint32,
918
        opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) {
98✔
919

98✔
920
        // Register the conf notification with the TxNotifier. A non-nil value
98✔
921
        // for `dispatch` will be returned if we are required to perform a
98✔
922
        // manual scan for the confirmation. Otherwise the notifier will begin
98✔
923
        // watching at tip for the transaction to confirm.
98✔
924
        ntfn, err := b.txNotifier.RegisterConf(
98✔
925
                txid, pkScript, numConfs, heightHint, opts...,
98✔
926
        )
98✔
927
        if err != nil {
98✔
928
                return nil, err
×
929
        }
×
930

931
        if ntfn.HistoricalDispatch == nil {
130✔
932
                return ntfn.Event, nil
32✔
933
        }
32✔
934

935
        select {
68✔
936
        case b.notificationRegistry <- ntfn.HistoricalDispatch:
68✔
937
                return ntfn.Event, nil
68✔
938
        case <-b.quit:
×
939
                return nil, chainntnfs.ErrChainNotifierShuttingDown
×
940
        }
941
}
942

943
// blockEpochRegistration represents a client's intent to receive a
944
// notification with each newly connected block.
945
type blockEpochRegistration struct {
946
        epochID uint64
947

948
        epochChan chan *chainntnfs.BlockEpoch
949

950
        epochQueue *queue.ConcurrentQueue
951

952
        bestBlock *chainntnfs.BlockEpoch
953

954
        errorChan chan error
955

956
        cancelChan chan struct{}
957

958
        wg sync.WaitGroup
959
}
960

961
// epochCancel is a message sent to the BitcoindNotifier when a client wishes
962
// to cancel an outstanding epoch notification that has yet to be dispatched.
963
type epochCancel struct {
964
        epochID uint64
965
}
966

967
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
968
// caller to receive notifications, of each new block connected to the main
969
// chain. Clients have the option of passing in their best known block, which
970
// the notifier uses to check if they are behind on blocks and catch them up. If
971
// they do not provide one, then a notification will be dispatched immediately
972
// for the current tip of the chain upon a successful registration.
973
func (b *BitcoindNotifier) RegisterBlockEpochNtfn(
974
        bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
50✔
975

50✔
976
        reg := &blockEpochRegistration{
50✔
977
                epochQueue: queue.NewConcurrentQueue(20),
50✔
978
                epochChan:  make(chan *chainntnfs.BlockEpoch, 20),
50✔
979
                cancelChan: make(chan struct{}),
50✔
980
                epochID:    atomic.AddUint64(&b.epochClientCounter, 1),
50✔
981
                bestBlock:  bestBlock,
50✔
982
                errorChan:  make(chan error, 1),
50✔
983
        }
50✔
984
        reg.epochQueue.Start()
50✔
985

50✔
986
        // Before we send the request to the main goroutine, we'll launch a new
50✔
987
        // goroutine to proxy items added to our queue to the client itself.
50✔
988
        // This ensures that all notifications are received *in order*.
50✔
989
        reg.wg.Add(1)
50✔
990
        go func() {
100✔
991
                defer reg.wg.Done()
50✔
992

50✔
993
                for {
688✔
994
                        select {
638✔
995
                        case ntfn := <-reg.epochQueue.ChanOut():
594✔
996
                                blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
594✔
997
                                select {
594✔
998
                                case reg.epochChan <- blockNtfn:
590✔
999

1000
                                case <-reg.cancelChan:
2✔
1001
                                        return
2✔
1002

1003
                                case <-b.quit:
4✔
1004
                                        return
4✔
1005
                                }
1006

1007
                        case <-reg.cancelChan:
4✔
1008
                                return
4✔
1009

1010
                        case <-b.quit:
44✔
1011
                                return
44✔
1012
                        }
1013
                }
1014
        }()
1015

1016
        select {
50✔
1017
        case <-b.quit:
×
1018
                // As we're exiting before the registration could be sent,
×
1019
                // we'll stop the queue now ourselves.
×
1020
                reg.epochQueue.Stop()
×
1021

×
1022
                return nil, errors.New("chainntnfs: system interrupt while " +
×
1023
                        "attempting to register for block epoch notification.")
×
1024
        case b.notificationRegistry <- reg:
50✔
1025
                return &chainntnfs.BlockEpochEvent{
50✔
1026
                        Epochs: reg.epochChan,
50✔
1027
                        Cancel: func() {
54✔
1028
                                cancel := &epochCancel{
4✔
1029
                                        epochID: reg.epochID,
4✔
1030
                                }
4✔
1031

4✔
1032
                                // Submit epoch cancellation to notification dispatcher.
4✔
1033
                                select {
4✔
1034
                                case b.notificationCancels <- cancel:
4✔
1035
                                        // Cancellation is being handled, drain the epoch channel until it is
4✔
1036
                                        // closed before yielding to caller.
4✔
1037
                                        for {
10✔
1038
                                                select {
6✔
1039
                                                case _, ok := <-reg.epochChan:
6✔
1040
                                                        if !ok {
10✔
1041
                                                                return
4✔
1042
                                                        }
4✔
1043
                                                case <-b.quit:
×
1044
                                                        return
×
1045
                                                }
1046
                                        }
1047
                                case <-b.quit:
2✔
1048
                                }
1049
                        },
1050
                }, nil
1051
        }
1052
}
1053

1054
// GetBlock is used to retrieve the block with the given hash. This function
1055
// wraps the blockCache's GetBlock function.
1056
func (b *BitcoindNotifier) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock,
1057
        error) {
426✔
1058

426✔
1059
        return b.blockCache.GetBlock(hash, b.chainConn.GetBlock)
426✔
1060
}
426✔
1061

1062
// SubscribeMempoolSpent allows the caller to register a subscription to watch
1063
// for a spend of an outpoint in the mempool.The event will be dispatched once
1064
// the outpoint is spent in the mempool.
1065
//
1066
// NOTE: part of the MempoolWatcher interface.
1067
func (b *BitcoindNotifier) SubscribeMempoolSpent(
1068
        outpoint wire.OutPoint) (*chainntnfs.MempoolSpendEvent, error) {
2✔
1069

2✔
1070
        event := b.memNotifier.SubscribeInput(outpoint)
2✔
1071

2✔
1072
        ops := []*wire.OutPoint{&outpoint}
2✔
1073

2✔
1074
        return event, b.chainConn.NotifySpent(ops)
2✔
1075
}
2✔
1076

1077
// CancelMempoolSpendEvent allows the caller to cancel a subscription to watch
1078
// for a spend of an outpoint in the mempool.
1079
//
1080
// NOTE: part of the MempoolWatcher interface.
1081
func (b *BitcoindNotifier) CancelMempoolSpendEvent(
1082
        sub *chainntnfs.MempoolSpendEvent) {
2✔
1083

2✔
1084
        b.memNotifier.UnsubscribeEvent(sub)
2✔
1085
}
2✔
1086

1087
// LookupInputMempoolSpend takes an outpoint and queries the mempool to find
1088
// its spending tx. Returns the tx if found, otherwise fn.None.
1089
//
1090
// NOTE: part of the MempoolWatcher interface.
1091
func (b *BitcoindNotifier) LookupInputMempoolSpend(
1092
        op wire.OutPoint) fn.Option[wire.MsgTx] {
2✔
1093

2✔
1094
        // Find the spending txid.
2✔
1095
        txid, found := b.chainConn.LookupInputMempoolSpend(op)
2✔
1096
        if !found {
4✔
1097
                return fn.None[wire.MsgTx]()
2✔
1098
        }
2✔
1099

1100
        // Query the spending tx using the id.
1101
        tx, err := b.chainConn.GetRawTransaction(&txid)
2✔
1102
        if err != nil {
3✔
1103
                // TODO(yy): enable logging errors in this package.
1✔
1104
                return fn.None[wire.MsgTx]()
1✔
1105
        }
1✔
1106

1107
        return fn.Some(*tx.MsgTx().Copy())
2✔
1108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc