• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.22
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
1✔
70

1✔
71
        chainView := &BitcoindFilteredChainView{
1✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
1✔
73
                filterUpdates:   make(chan filterUpdate),
1✔
74
                filterBlockReqs: make(chan *filterBlockReq),
1✔
75
                blockCache:      blockCache,
1✔
76
                quit:            make(chan struct{}),
1✔
77
        }
1✔
78

1✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
1✔
80
        chainView.blockQueue = newBlockEventQueue()
1✔
81

1✔
82
        return chainView
1✔
83
}
1✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
1✔
89
        // Already started?
1✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
1✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
1✔
95

1✔
96
        err := b.chainClient.Start()
1✔
97
        if err != nil {
1✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
1✔
102
        if err != nil {
1✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
1✔
107
        if err != nil {
1✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
1✔
112
        b.bestHeight = uint32(bestHeight)
1✔
113
        b.bestHeightMtx.Unlock()
1✔
114

1✔
115
        b.blockQueue.Start()
1✔
116

1✔
117
        b.wg.Add(1)
1✔
118
        go b.chainFilterer()
1✔
119

1✔
120
        return nil
1✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
UNCOV
127
func (b *BitcoindFilteredChainView) Stop() error {
×
UNCOV
128
        log.Debug("BitcoindFilteredChainView stopping")
×
UNCOV
129
        defer log.Debug("BitcoindFilteredChainView stopped")
×
UNCOV
130

×
UNCOV
131
        // Already shutting down?
×
UNCOV
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
×
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
UNCOV
138
        b.chainClient.Stop()
×
UNCOV
139

×
UNCOV
140
        b.blockQueue.Stop()
×
UNCOV
141

×
UNCOV
142
        close(b.quit)
×
UNCOV
143
        b.wg.Wait()
×
UNCOV
144

×
UNCOV
145
        return nil
×
146
}
147

148
// onFilteredBlockConnected is called for each block that's connected to the
149
// end of the main chain. Based on our current chain filter, the block may or
150
// may not include any relevant transactions.
151
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
152
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
1✔
153

1✔
154
        mtxs := make([]*wire.MsgTx, len(txns))
1✔
155
        b.filterMtx.Lock()
1✔
156
        for i, tx := range txns {
2✔
157
                mtxs[i] = &tx.MsgTx
1✔
158

1✔
159
                for _, txIn := range mtxs[i].TxIn {
2✔
160
                        // We can delete this outpoint from the chainFilter, as
1✔
161
                        // we just received a block where it was spent. In case
1✔
162
                        // of a reorg, this outpoint might get "un-spent", but
1✔
163
                        // that's okay since it would never be wise to consider
1✔
164
                        // the channel open again (since a spending transaction
1✔
165
                        // exists on the network).
1✔
166
                        delete(b.chainFilter, txIn.PreviousOutPoint)
1✔
167
                }
1✔
168

169
        }
170
        b.filterMtx.Unlock()
1✔
171

1✔
172
        // We record the height of the last connected block added to the
1✔
173
        // blockQueue such that we can scan up to this height in case of
1✔
174
        // a rescan. It must be protected by a mutex since a filter update
1✔
175
        // might be trying to read it concurrently.
1✔
176
        b.bestHeightMtx.Lock()
1✔
177
        b.bestHeight = uint32(height)
1✔
178
        b.bestHeightMtx.Unlock()
1✔
179

1✔
180
        block := &FilteredBlock{
1✔
181
                Hash:         hash,
1✔
182
                Height:       uint32(height),
1✔
183
                Transactions: mtxs,
1✔
184
        }
1✔
185

1✔
186
        b.blockQueue.Add(&blockEvent{
1✔
187
                eventType: connected,
1✔
188
                block:     block,
1✔
189
        })
1✔
190
}
191

192
// onFilteredBlockDisconnected is a callback which is executed once a block is
193
// disconnected from the end of the main chain.
194
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
195
        hash chainhash.Hash) {
1✔
196

1✔
197
        log.Debugf("got disconnected block at height %d: %v", height,
1✔
198
                hash)
1✔
199

1✔
200
        filteredBlock := &FilteredBlock{
1✔
201
                Hash:   hash,
1✔
202
                Height: uint32(height),
1✔
203
        }
1✔
204

1✔
205
        b.blockQueue.Add(&blockEvent{
1✔
206
                eventType: disconnected,
1✔
207
                block:     filteredBlock,
1✔
208
        })
1✔
209
}
1✔
210

211
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
212
// result of applying the current registered UTXO sub-set on the block
213
// corresponding to that block hash. If any watched UTOX's are spent by the
214
// selected lock, then the internal chainFilter will also be updated.
215
//
216
// NOTE: This is part of the FilteredChainView interface.
217
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
218
        req := &filterBlockReq{
1✔
219
                blockHash: blockHash,
1✔
220
                resp:      make(chan *FilteredBlock, 1),
1✔
221
                err:       make(chan error, 1),
1✔
222
        }
1✔
223

1✔
224
        select {
1✔
225
        case b.filterBlockReqs <- req:
1✔
226
        case <-b.quit:
×
227
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
228
        }
229

230
        return <-req.resp, <-req.err
1✔
231
}
232

233
// chainFilterer is the primary goroutine which: listens for new blocks coming
234
// and dispatches the relevant FilteredBlock notifications, updates the filter
235
// due to requests by callers, and finally is able to perform targeted block
236
// filtration.
237
//
238
// TODO(roasbeef): change to use loadfilter RPC's
239
func (b *BitcoindFilteredChainView) chainFilterer() {
1✔
240
        defer b.wg.Done()
1✔
241

1✔
242
        // filterBlock is a helper function that scans the given block, and
1✔
243
        // notes which transactions spend outputs which are currently being
1✔
244
        // watched. Additionally, the chain filter will also be updated by
1✔
245
        // removing any spent outputs.
1✔
246
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
2✔
247
                b.filterMtx.Lock()
1✔
248
                defer b.filterMtx.Unlock()
1✔
249

1✔
250
                var filteredTxns []*wire.MsgTx
1✔
251
                for _, tx := range blk.Transactions {
2✔
252
                        var txAlreadyFiltered bool
1✔
253
                        for _, txIn := range tx.TxIn {
2✔
254
                                prevOp := txIn.PreviousOutPoint
1✔
255
                                if _, ok := b.chainFilter[prevOp]; !ok {
2✔
256
                                        continue
1✔
257
                                }
258

259
                                delete(b.chainFilter, prevOp)
1✔
260

1✔
261
                                // Only add this txn to our list of filtered
1✔
262
                                // txns if it is the first previous outpoint to
1✔
263
                                // cause a match.
1✔
264
                                if txAlreadyFiltered {
1✔
265
                                        continue
×
266
                                }
267

268
                                filteredTxns = append(filteredTxns, tx.Copy())
1✔
269
                                txAlreadyFiltered = true
1✔
270
                        }
271
                }
272

273
                return filteredTxns
1✔
274
        }
275

276
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
1✔
277
                height uint32) (*FilteredBlock, error) {
1✔
UNCOV
278
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
UNCOV
279
                if err != nil {
×
280
                        return nil, err
×
281

×
282
                }
×
UNCOV
283
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
UNCOV
284
                for _, str := range block.Transactions {
×
UNCOV
285
                        b, err := hex.DecodeString(str)
×
UNCOV
286
                        if err != nil {
×
287
                                return nil, err
×
288
                        }
×
UNCOV
289
                        tx := &wire.MsgTx{}
×
UNCOV
290
                        err = tx.Deserialize(bytes.NewReader(b))
×
UNCOV
291
                        if err != nil {
×
292
                                return nil, err
×
293
                        }
×
UNCOV
294
                        txs = append(txs, tx)
×
295
                }
UNCOV
296
                return &FilteredBlock{
×
UNCOV
297
                        Hash:         *hash,
×
UNCOV
298
                        Height:       height,
×
UNCOV
299
                        Transactions: txs,
×
UNCOV
300
                }, nil
×
301
        }
302

303
        for {
2✔
304
                select {
1✔
305
                // The caller has just sent an update to the current chain
306
                // filter, so we'll apply the update, possibly rewinding our
307
                // state partially.
308
                case update := <-b.filterUpdates:
1✔
309
                        // First, we'll add all the new UTXO's to the set of
1✔
310
                        // watched UTXO's, eliminating any duplicates in the
1✔
311
                        // process.
1✔
312
                        log.Tracef("Updating chain filter with new UTXO's: %v",
1✔
313
                                update.newUtxos)
1✔
314

1✔
315
                        b.filterMtx.Lock()
1✔
316
                        for _, newOp := range update.newUtxos {
2✔
317
                                b.chainFilter[newOp] = struct{}{}
1✔
318
                        }
1✔
319
                        b.filterMtx.Unlock()
1✔
320

1✔
321
                        // Apply the new TX filter to the chain client, which
1✔
322
                        // will cause all following notifications from and
1✔
323
                        // calls to it return blocks filtered with the new
1✔
324
                        // filter.
1✔
325
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
1✔
326
                        if err != nil {
1✔
327
                                log.Errorf("Unable to update filter: %v", err)
×
328
                                continue
×
329
                        }
330

331
                        // All blocks gotten after we loaded the filter will
332
                        // have the filter applied, but we will need to rescan
333
                        // the blocks up to the height of the block we last
334
                        // added to the blockQueue.
335
                        b.bestHeightMtx.Lock()
1✔
336
                        bestHeight := b.bestHeight
1✔
337
                        b.bestHeightMtx.Unlock()
1✔
338

1✔
339
                        // If the update height matches our best known height,
1✔
340
                        // then we don't need to do any rewinding.
1✔
341
                        if update.updateHeight == bestHeight {
2✔
342
                                continue
1✔
343
                        }
344

345
                        // Otherwise, we'll rewind the state to ensure the
346
                        // caller doesn't miss any relevant notifications.
347
                        // Starting from the height _after_ the update height,
348
                        // we'll walk forwards, rescanning one block at a time
349
                        // with the chain client applying the newly loaded
350
                        // filter to each block.
351
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
2✔
352
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
1✔
353
                                if err != nil {
1✔
354
                                        log.Warnf("Unable to get block hash "+
×
355
                                                "for block at height %d: %v",
×
356
                                                i, err)
×
357
                                        continue
×
358
                                }
359

360
                                // To avoid dealing with the case where a reorg
361
                                // is happening while we rescan, we scan one
362
                                // block at a time, skipping blocks that might
363
                                // have gone missing.
364
                                rescanned, err := b.chainClient.RescanBlocks(
1✔
365
                                        []chainhash.Hash{*blockHash},
1✔
366
                                )
1✔
367
                                if err != nil {
1✔
368
                                        log.Warnf("Unable to rescan block "+
×
369
                                                "with hash %v at height %d: %v",
×
370
                                                blockHash, i, err)
×
371
                                        continue
×
372
                                }
373

374
                                // If no block was returned from the rescan, it
375
                                // means no matching transactions were found.
376
                                if len(rescanned) != 1 {
2✔
377
                                        log.Tracef("rescan of block %v at "+
1✔
378
                                                "height=%d yielded no "+
1✔
379
                                                "transactions", blockHash, i)
1✔
380
                                        continue
1✔
381
                                }
UNCOV
382
                                decoded, err := decodeJSONBlock(
×
UNCOV
383
                                        &rescanned[0], i,
×
UNCOV
384
                                )
×
UNCOV
385
                                if err != nil {
×
386
                                        log.Errorf("Unable to decode block: %v",
×
387
                                                err)
×
388
                                        continue
×
389
                                }
UNCOV
390
                                b.blockQueue.Add(&blockEvent{
×
UNCOV
391
                                        eventType: connected,
×
UNCOV
392
                                        block:     decoded,
×
UNCOV
393
                                })
×
394
                        }
395

396
                // We've received a new request to manually filter a block.
397
                case req := <-b.filterBlockReqs:
1✔
398
                        // First we'll fetch the block itself as well as some
1✔
399
                        // additional information including its height.
1✔
400
                        block, err := b.GetBlock(req.blockHash)
1✔
401
                        if err != nil {
1✔
402
                                req.err <- err
×
403
                                req.resp <- nil
×
404
                                continue
×
405
                        }
406
                        header, err := b.chainClient.GetBlockHeaderVerbose(
1✔
407
                                req.blockHash)
1✔
408
                        if err != nil {
1✔
409
                                req.err <- err
×
410
                                req.resp <- nil
×
411
                                continue
×
412
                        }
413

414
                        // Once we have this info, we can directly filter the
415
                        // block and dispatch the proper notification.
416
                        req.resp <- &FilteredBlock{
1✔
417
                                Hash:         *req.blockHash,
1✔
418
                                Height:       uint32(header.Height),
1✔
419
                                Transactions: filterBlock(block),
1✔
420
                        }
1✔
421
                        req.err <- err
1✔
422

423
                // We've received a new event from the chain client.
424
                case event := <-b.chainClient.Notifications():
1✔
425
                        switch e := event.(type) {
1✔
426

427
                        case chain.FilteredBlockConnected:
1✔
428
                                b.onFilteredBlockConnected(
1✔
429
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
1✔
430
                                )
1✔
431

432
                        case chain.BlockDisconnected:
1✔
433
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
1✔
434
                        }
435

UNCOV
436
                case <-b.quit:
×
UNCOV
437
                        return
×
438
                }
439
        }
440
}
441

442
// UpdateFilter updates the UTXO filter which is to be consulted when creating
443
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
444
// meaning repeated calls to this method should _expand_ the size of the UTXO
445
// sub-set currently being watched.  If the set updateHeight is _lower_ than
446
// the best known height of the implementation, then the state should be
447
// rewound to ensure all relevant notifications are dispatched.
448
//
449
// NOTE: This is part of the FilteredChainView interface.
450
func (b *BitcoindFilteredChainView) UpdateFilter(ops []channeldb.EdgePoint,
451
        updateHeight uint32) error {
1✔
452

1✔
453
        newUtxos := make([]wire.OutPoint, len(ops))
1✔
454
        for i, op := range ops {
2✔
455
                newUtxos[i] = op.OutPoint
1✔
456
        }
1✔
457

458
        select {
1✔
459

460
        case b.filterUpdates <- filterUpdate{
461
                newUtxos:     newUtxos,
462
                updateHeight: updateHeight,
463
        }:
1✔
464
                return nil
1✔
465

466
        case <-b.quit:
×
467
                return fmt.Errorf("chain filter shutting down")
×
468
        }
469
}
470

471
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
472
// Each time a block is connected to the end of a main chain, and appropriate
473
// FilteredBlock which contains the transactions which mutate our watched UTXO
474
// set is to be returned.
475
//
476
// NOTE: This is part of the FilteredChainView interface.
477
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
1✔
478
        return b.blockQueue.newBlocks
1✔
479
}
1✔
480

481
// DisconnectedBlocks returns a receive only channel which will be sent upon
482
// with the empty filtered blocks of blocks which are disconnected from the
483
// main chain in the case of a re-org.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
486
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
487
        return b.blockQueue.staleBlocks
1✔
488
}
1✔
489

490
// GetBlock is used to retrieve the block with the given hash. This function
491
// wraps the blockCache's GetBlock function.
492
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
493
        *wire.MsgBlock, error) {
1✔
494

1✔
495
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
1✔
496
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc