• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13586005509

28 Feb 2025 10:14AM UTC coverage: 68.629% (+9.9%) from 58.77%
13586005509

Pull #9521

github

web-flow
Merge 37d3a70a5 into 8532955b3
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129950 of 189351 relevant lines covered (68.63%)

23726.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.75
/routing/chainview/btcd.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/rpcclient"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BtcdFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by an active websockets connection to btcd.
21
type BtcdFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        btcdConn *rpcclient.Client
33

34
        // blockEventQueue is the ordered queue used to keep the order
35
        // of connected and disconnected blocks sent to the reader of the
36
        // chainView.
37
        blockQueue *blockEventQueue
38

39
        // blockCache is an LRU block cache.
40
        blockCache *blockcache.BlockCache
41

42
        // filterUpdates is a channel in which updates to the utxo filter
43
        // attached to this instance are sent over.
44
        filterUpdates chan filterUpdate
45

46
        // chainFilter is the set of utox's that we're currently watching
47
        // spends for within the chain.
48
        filterMtx   sync.RWMutex
49
        chainFilter map[wire.OutPoint]struct{}
50

51
        // filterBlockReqs is a channel in which requests to filter select
52
        // blocks will be sent over.
53
        filterBlockReqs chan *filterBlockReq
54

55
        quit chan struct{}
56
        wg   sync.WaitGroup
57
}
58

59
// A compile time check to ensure BtcdFilteredChainView implements the
60
// chainview.FilteredChainView.
61
var _ FilteredChainView = (*BtcdFilteredChainView)(nil)
62

63
// NewBtcdFilteredChainView creates a new instance of a FilteredChainView from
64
// RPC credentials for an active btcd instance.
65
func NewBtcdFilteredChainView(config rpcclient.ConnConfig,
66
        blockCache *blockcache.BlockCache) (*BtcdFilteredChainView, error) {
6✔
67

6✔
68
        chainView := &BtcdFilteredChainView{
6✔
69
                chainFilter:     make(map[wire.OutPoint]struct{}),
6✔
70
                filterUpdates:   make(chan filterUpdate),
6✔
71
                filterBlockReqs: make(chan *filterBlockReq),
6✔
72
                blockCache:      blockCache,
6✔
73
                quit:            make(chan struct{}),
6✔
74
        }
6✔
75

6✔
76
        ntfnCallbacks := &rpcclient.NotificationHandlers{
6✔
77
                OnFilteredBlockConnected:    chainView.onFilteredBlockConnected,
6✔
78
                OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected,
6✔
79
        }
6✔
80

6✔
81
        // Disable connecting to btcd within the rpcclient.New method. We
6✔
82
        // defer establishing the connection to our .Start() method.
6✔
83
        config.DisableConnectOnNew = true
6✔
84
        config.DisableAutoReconnect = false
6✔
85
        chainConn, err := rpcclient.New(&config, ntfnCallbacks)
6✔
86
        if err != nil {
6✔
87
                return nil, err
×
88
        }
×
89
        chainView.btcdConn = chainConn
6✔
90

6✔
91
        chainView.blockQueue = newBlockEventQueue()
6✔
92

6✔
93
        return chainView, nil
6✔
94
}
95

96
// Start starts all goroutines necessary for normal operation.
97
//
98
// NOTE: This is part of the FilteredChainView interface.
99
func (b *BtcdFilteredChainView) Start() error {
6✔
100
        // Already started?
6✔
101
        if atomic.AddInt32(&b.started, 1) != 1 {
6✔
102
                return nil
×
103
        }
×
104

105
        log.Infof("FilteredChainView starting")
6✔
106

6✔
107
        // Connect to btcd, and register for notifications on connected, and
6✔
108
        // disconnected blocks.
6✔
109
        if err := b.btcdConn.Connect(20); err != nil {
6✔
110
                return err
×
111
        }
×
112
        if err := b.btcdConn.NotifyBlocks(); err != nil {
6✔
113
                return err
×
114
        }
×
115

116
        _, bestHeight, err := b.btcdConn.GetBestBlock()
6✔
117
        if err != nil {
6✔
118
                return err
×
119
        }
×
120

121
        b.bestHeightMtx.Lock()
6✔
122
        b.bestHeight = uint32(bestHeight)
6✔
123
        b.bestHeightMtx.Unlock()
6✔
124

6✔
125
        b.blockQueue.Start()
6✔
126

6✔
127
        b.wg.Add(1)
6✔
128
        go b.chainFilterer()
6✔
129

6✔
130
        return nil
6✔
131
}
132

133
// Stop stops all goroutines which we launched by the prior call to the Start
134
// method.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (b *BtcdFilteredChainView) Stop() error {
6✔
138
        log.Debug("BtcdFilteredChainView stopping")
6✔
139
        defer log.Debug("BtcdFilteredChainView stopped")
6✔
140

6✔
141
        // Already shutting down?
6✔
142
        if atomic.AddInt32(&b.stopped, 1) != 1 {
6✔
143
                return nil
×
144
        }
×
145

146
        // Shutdown the rpc client, this gracefully disconnects from btcd, and
147
        // cleans up all related resources.
148
        b.btcdConn.Shutdown()
6✔
149
        b.btcdConn.WaitForShutdown()
6✔
150

6✔
151
        b.blockQueue.Stop()
6✔
152

6✔
153
        close(b.quit)
6✔
154
        b.wg.Wait()
6✔
155

6✔
156
        return nil
6✔
157
}
158

159
// onFilteredBlockConnected is called for each block that's connected to the
160
// end of the main chain. Based on our current chain filter, the block may or
161
// may not include any relevant transactions.
162
func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
163
        header *wire.BlockHeader, txns []*btcutil.Tx) {
463✔
164

463✔
165
        mtxs := make([]*wire.MsgTx, len(txns))
463✔
166
        b.filterMtx.Lock()
463✔
167
        for i, tx := range txns {
466✔
168
                mtx := tx.MsgTx()
3✔
169
                mtxs[i] = mtx
3✔
170

3✔
171
                for _, txIn := range mtx.TxIn {
6✔
172
                        // We can delete this outpoint from the chainFilter, as
3✔
173
                        // we just received a block where it was spent. In case
3✔
174
                        // of a reorg, this outpoint might get "un-spent", but
3✔
175
                        // that's okay since it would never be wise to consider
3✔
176
                        // the channel open again (since a spending transaction
3✔
177
                        // exists on the network).
3✔
178
                        delete(b.chainFilter, txIn.PreviousOutPoint)
3✔
179
                }
3✔
180

181
        }
182
        b.filterMtx.Unlock()
463✔
183

463✔
184
        // We record the height of the last connected block added to the
463✔
185
        // blockQueue such that we can scan up to this height in case of
463✔
186
        // a rescan. It must be protected by a mutex since a filter update
463✔
187
        // might be trying to read it concurrently.
463✔
188
        b.bestHeightMtx.Lock()
463✔
189
        b.bestHeight = uint32(height)
463✔
190
        b.bestHeightMtx.Unlock()
463✔
191

463✔
192
        block := &FilteredBlock{
463✔
193
                Hash:         header.BlockHash(),
463✔
194
                Height:       uint32(height),
463✔
195
                Transactions: mtxs,
463✔
196
        }
463✔
197

463✔
198
        b.blockQueue.Add(&blockEvent{
463✔
199
                eventType: connected,
463✔
200
                block:     block,
463✔
201
        })
463✔
202
}
203

204
// onFilteredBlockDisconnected is a callback which is executed once a block is
205
// disconnected from the end of the main chain.
206
func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32,
207
        header *wire.BlockHeader) {
416✔
208

416✔
209
        log.Debugf("got disconnected block at height %d: %v", height,
416✔
210
                header.BlockHash())
416✔
211

416✔
212
        filteredBlock := &FilteredBlock{
416✔
213
                Hash:   header.BlockHash(),
416✔
214
                Height: uint32(height),
416✔
215
        }
416✔
216

416✔
217
        b.blockQueue.Add(&blockEvent{
416✔
218
                eventType: disconnected,
416✔
219
                block:     filteredBlock,
416✔
220
        })
416✔
221
}
416✔
222

223
// filterBlockReq houses a request to manually filter a block specified by
224
// block hash.
225
type filterBlockReq struct {
226
        blockHash *chainhash.Hash
227
        resp      chan *FilteredBlock
228
        err       chan error
229
}
230

231
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
232
// result of applying the current registered UTXO sub-set on the block
233
// corresponding to that block hash. If any watched UTOX's are spent by the
234
// selected lock, then the internal chainFilter will also be updated.
235
//
236
// NOTE: This is part of the FilteredChainView interface.
237
func (b *BtcdFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
2✔
238
        req := &filterBlockReq{
2✔
239
                blockHash: blockHash,
2✔
240
                resp:      make(chan *FilteredBlock, 1),
2✔
241
                err:       make(chan error, 1),
2✔
242
        }
2✔
243

2✔
244
        select {
2✔
245
        case b.filterBlockReqs <- req:
2✔
246
        case <-b.quit:
×
247
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
248
        }
249

250
        return <-req.resp, <-req.err
2✔
251
}
252

253
// chainFilterer is the primary goroutine which: listens for new blocks coming
254
// and dispatches the relevant FilteredBlock notifications, updates the filter
255
// due to requests by callers, and finally is able to preform targeted block
256
// filtration.
257
//
258
// TODO(roasbeef): change to use loadfilter RPC's
259
func (b *BtcdFilteredChainView) chainFilterer() {
6✔
260
        defer b.wg.Done()
6✔
261

6✔
262
        // filterBlock is a helper function that scans the given block, and
6✔
263
        // notes which transactions spend outputs which are currently being
6✔
264
        // watched. Additionally, the chain filter will also be updated by
6✔
265
        // removing any spent outputs.
6✔
266
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
8✔
267
                b.filterMtx.Lock()
2✔
268
                defer b.filterMtx.Unlock()
2✔
269

2✔
270
                var filteredTxns []*wire.MsgTx
2✔
271
                for _, tx := range blk.Transactions {
6✔
272
                        var txAlreadyFiltered bool
4✔
273
                        for _, txIn := range tx.TxIn {
8✔
274
                                prevOp := txIn.PreviousOutPoint
4✔
275
                                if _, ok := b.chainFilter[prevOp]; !ok {
6✔
276
                                        continue
2✔
277
                                }
278

279
                                delete(b.chainFilter, prevOp)
3✔
280

3✔
281
                                // Only add this txn to our list of filtered
3✔
282
                                // txns if it is the first previous outpoint to
3✔
283
                                // cause a match.
3✔
284
                                if txAlreadyFiltered {
3✔
285
                                        continue
×
286
                                }
287

288
                                filteredTxns = append(filteredTxns, tx.Copy())
3✔
289
                                txAlreadyFiltered = true
3✔
290

291
                        }
292
                }
293

294
                return filteredTxns
2✔
295
        }
296

297
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
6✔
298
                height uint32) (*FilteredBlock, error) {
7✔
299
                hash, err := chainhash.NewHashFromStr(block.Hash)
1✔
300
                if err != nil {
1✔
301
                        return nil, err
×
302

×
303
                }
×
304
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
1✔
305
                for _, str := range block.Transactions {
2✔
306
                        b, err := hex.DecodeString(str)
1✔
307
                        if err != nil {
1✔
308
                                return nil, err
×
309
                        }
×
310
                        tx := &wire.MsgTx{}
1✔
311
                        err = tx.Deserialize(bytes.NewReader(b))
1✔
312
                        if err != nil {
1✔
313
                                return nil, err
×
314
                        }
×
315
                        txs = append(txs, tx)
1✔
316
                }
317
                return &FilteredBlock{
1✔
318
                        Hash:         *hash,
1✔
319
                        Height:       height,
1✔
320
                        Transactions: txs,
1✔
321
                }, nil
1✔
322
        }
323

324
        for {
16✔
325
                select {
10✔
326
                // The caller has just sent an update to the current chain
327
                // filter, so we'll apply the update, possibly rewinding our
328
                // state partially.
329
                case update := <-b.filterUpdates:
4✔
330

4✔
331
                        // First, we'll add all the new UTXO's to the set of
4✔
332
                        // watched UTXO's, eliminating any duplicates in the
4✔
333
                        // process.
4✔
334
                        log.Tracef("Updating chain filter with new UTXO's: %v",
4✔
335
                                update.newUtxos)
4✔
336

4✔
337
                        b.filterMtx.Lock()
4✔
338
                        for _, newOp := range update.newUtxos {
10✔
339
                                b.chainFilter[newOp] = struct{}{}
6✔
340
                        }
6✔
341
                        b.filterMtx.Unlock()
4✔
342

4✔
343
                        // Apply the new TX filter to btcd, which will cause
4✔
344
                        // all following notifications from and calls to it
4✔
345
                        // return blocks filtered with the new filter.
4✔
346
                        b.btcdConn.LoadTxFilter(false, []btcutil.Address{},
4✔
347
                                update.newUtxos)
4✔
348

4✔
349
                        // All blocks gotten after we loaded the filter will
4✔
350
                        // have the filter applied, but we will need to rescan
4✔
351
                        // the blocks up to the height of the block we last
4✔
352
                        // added to the blockQueue.
4✔
353
                        b.bestHeightMtx.Lock()
4✔
354
                        bestHeight := b.bestHeight
4✔
355
                        b.bestHeightMtx.Unlock()
4✔
356

4✔
357
                        // If the update height matches our best known height,
4✔
358
                        // then we don't need to do any rewinding.
4✔
359
                        if update.updateHeight == bestHeight {
7✔
360
                                continue
3✔
361
                        }
362

363
                        // Otherwise, we'll rewind the state to ensure the
364
                        // caller doesn't miss any relevant notifications.
365
                        // Starting from the height _after_ the update height,
366
                        // we'll walk forwards, rescanning one block at a time
367
                        // with btcd applying the newly loaded filter to each
368
                        // block.
369
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
2✔
370
                                blockHash, err := b.btcdConn.GetBlockHash(int64(i))
1✔
371
                                if err != nil {
1✔
372
                                        log.Warnf("Unable to get block hash "+
×
373
                                                "for block at height %d: %v",
×
374
                                                i, err)
×
375
                                        continue
×
376
                                }
377

378
                                // To avoid dealing with the case where a reorg
379
                                // is happening while we rescan, we scan one
380
                                // block at a time, skipping blocks that might
381
                                // have gone missing.
382
                                rescanned, err := b.btcdConn.RescanBlocks(
1✔
383
                                        []chainhash.Hash{*blockHash})
1✔
384
                                if err != nil {
1✔
385
                                        log.Warnf("Unable to rescan block "+
×
386
                                                "with hash %v at height %d: %v",
×
387
                                                blockHash, i, err)
×
388
                                        continue
×
389
                                }
390

391
                                // If no block was returned from the rescan, it
392
                                // means no matching transactions were found.
393
                                if len(rescanned) != 1 {
1✔
394
                                        log.Tracef("rescan of block %v at "+
×
395
                                                "height=%d yielded no "+
×
396
                                                "transactions", blockHash, i)
×
397
                                        continue
×
398
                                }
399
                                decoded, err := decodeJSONBlock(
1✔
400
                                        &rescanned[0], uint32(i))
1✔
401
                                if err != nil {
1✔
402
                                        log.Errorf("Unable to decode block: %v",
×
403
                                                err)
×
404
                                        continue
×
405
                                }
406
                                b.blockQueue.Add(&blockEvent{
1✔
407
                                        eventType: connected,
1✔
408
                                        block:     decoded,
1✔
409
                                })
1✔
410
                        }
411

412
                // We've received a new request to manually filter a block.
413
                case req := <-b.filterBlockReqs:
2✔
414
                        // First we'll fetch the block itself as well as some
2✔
415
                        // additional information including its height.
2✔
416
                        block, err := b.GetBlock(req.blockHash)
2✔
417
                        if err != nil {
2✔
418
                                req.err <- err
×
419
                                req.resp <- nil
×
420
                                continue
×
421
                        }
422
                        header, err := b.btcdConn.GetBlockHeaderVerbose(req.blockHash)
2✔
423
                        if err != nil {
2✔
424
                                req.err <- err
×
425
                                req.resp <- nil
×
426
                                continue
×
427
                        }
428

429
                        // Once we have this info, we can directly filter the
430
                        // block and dispatch the proper notification.
431
                        req.resp <- &FilteredBlock{
2✔
432
                                Hash:         *req.blockHash,
2✔
433
                                Height:       uint32(header.Height),
2✔
434
                                Transactions: filterBlock(block),
2✔
435
                        }
2✔
436
                        req.err <- err
2✔
437

438
                case <-b.quit:
6✔
439
                        return
6✔
440
                }
441
        }
442
}
443

444
// filterUpdate is a message sent to the chainFilterer to update the current
445
// chainFilter state.
446
type filterUpdate struct {
447
        newUtxos     []wire.OutPoint
448
        updateHeight uint32
449
}
450

451
// UpdateFilter updates the UTXO filter which is to be consulted when creating
452
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
453
// meaning repeated calls to this method should _expand_ the size of the UTXO
454
// sub-set currently being watched.  If the set updateHeight is _lower_ than
455
// the best known height of the implementation, then the state should be
456
// rewound to ensure all relevant notifications are dispatched.
457
//
458
// NOTE: This is part of the FilteredChainView interface.
459
func (b *BtcdFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
460
        updateHeight uint32) error {
4✔
461

4✔
462
        newUtxos := make([]wire.OutPoint, len(ops))
4✔
463
        for i, op := range ops {
10✔
464
                newUtxos[i] = op.OutPoint
6✔
465
        }
6✔
466

467
        select {
4✔
468

469
        case b.filterUpdates <- filterUpdate{
470
                newUtxos:     newUtxos,
471
                updateHeight: updateHeight,
472
        }:
4✔
473
                return nil
4✔
474

475
        case <-b.quit:
×
476
                return fmt.Errorf("chain filter shutting down")
×
477
        }
478
}
479

480
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
481
// Each time a block is connected to the end of a main chain, and appropriate
482
// FilteredBlock which contains the transactions which mutate our watched UTXO
483
// set is to be returned.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
486
func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
5✔
487
        return b.blockQueue.newBlocks
5✔
488
}
5✔
489

490
// DisconnectedBlocks returns a receive only channel which will be sent upon
491
// with the empty filtered blocks of blocks which are disconnected from the
492
// main chain in the case of a re-org.
493
//
494
// NOTE: This is part of the FilteredChainView interface.
495
func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
2✔
496
        return b.blockQueue.staleBlocks
2✔
497
}
2✔
498

499
// GetBlock is used to retrieve the block with the given hash. This function
500
// wraps the blockCache's GetBlock function.
501
func (b *BtcdFilteredChainView) GetBlock(hash *chainhash.Hash) (
502
        *wire.MsgBlock, error) {
2✔
503

2✔
504
        return b.blockCache.GetBlock(hash, b.btcdConn.GetBlock)
2✔
505
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc