• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.52
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
2✔
70

2✔
71
        chainView := &BitcoindFilteredChainView{
2✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
2✔
73
                filterUpdates:   make(chan filterUpdate),
2✔
74
                filterBlockReqs: make(chan *filterBlockReq),
2✔
75
                blockCache:      blockCache,
2✔
76
                quit:            make(chan struct{}),
2✔
77
        }
2✔
78

2✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
2✔
80
        chainView.blockQueue = newBlockEventQueue()
2✔
81

2✔
82
        return chainView
2✔
83
}
2✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
2✔
89
        // Already started?
2✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
2✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
2✔
95

2✔
96
        err := b.chainClient.Start()
2✔
97
        if err != nil {
2✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
2✔
102
        if err != nil {
2✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
2✔
107
        if err != nil {
2✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
2✔
112
        b.bestHeight = uint32(bestHeight)
2✔
113
        b.bestHeightMtx.Unlock()
2✔
114

2✔
115
        b.blockQueue.Start()
2✔
116

2✔
117
        b.wg.Add(1)
2✔
118
        go b.chainFilterer()
2✔
119

2✔
120
        return nil
2✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
2✔
128
        // Already shutting down?
2✔
129
        if atomic.AddInt32(&b.stopped, 1) != 1 {
2✔
130
                return nil
×
131
        }
×
132

133
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
134
        // zmq socket, and cleans up all related resources.
135
        b.chainClient.Stop()
2✔
136

2✔
137
        b.blockQueue.Stop()
2✔
138

2✔
139
        log.Infof("FilteredChainView stopping")
2✔
140

2✔
141
        close(b.quit)
2✔
142
        b.wg.Wait()
2✔
143

2✔
144
        return nil
2✔
145
}
146

147
// onFilteredBlockConnected is called for each block that's connected to the
148
// end of the main chain. Based on our current chain filter, the block may or
149
// may not include any relevant transactions.
150
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
151
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
2✔
152

2✔
153
        mtxs := make([]*wire.MsgTx, len(txns))
2✔
154
        b.filterMtx.Lock()
2✔
155
        for i, tx := range txns {
4✔
156
                mtxs[i] = &tx.MsgTx
2✔
157

2✔
158
                for _, txIn := range mtxs[i].TxIn {
4✔
159
                        // We can delete this outpoint from the chainFilter, as
2✔
160
                        // we just received a block where it was spent. In case
2✔
161
                        // of a reorg, this outpoint might get "un-spent", but
2✔
162
                        // that's okay since it would never be wise to consider
2✔
163
                        // the channel open again (since a spending transaction
2✔
164
                        // exists on the network).
2✔
165
                        delete(b.chainFilter, txIn.PreviousOutPoint)
2✔
166
                }
2✔
167

168
        }
169
        b.filterMtx.Unlock()
2✔
170

2✔
171
        // We record the height of the last connected block added to the
2✔
172
        // blockQueue such that we can scan up to this height in case of
2✔
173
        // a rescan. It must be protected by a mutex since a filter update
2✔
174
        // might be trying to read it concurrently.
2✔
175
        b.bestHeightMtx.Lock()
2✔
176
        b.bestHeight = uint32(height)
2✔
177
        b.bestHeightMtx.Unlock()
2✔
178

2✔
179
        block := &FilteredBlock{
2✔
180
                Hash:         hash,
2✔
181
                Height:       uint32(height),
2✔
182
                Transactions: mtxs,
2✔
183
        }
2✔
184

2✔
185
        b.blockQueue.Add(&blockEvent{
2✔
186
                eventType: connected,
2✔
187
                block:     block,
2✔
188
        })
2✔
189
}
190

191
// onFilteredBlockDisconnected is a callback which is executed once a block is
192
// disconnected from the end of the main chain.
193
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
194
        hash chainhash.Hash) {
2✔
195

2✔
196
        log.Debugf("got disconnected block at height %d: %v", height,
2✔
197
                hash)
2✔
198

2✔
199
        filteredBlock := &FilteredBlock{
2✔
200
                Hash:   hash,
2✔
201
                Height: uint32(height),
2✔
202
        }
2✔
203

2✔
204
        b.blockQueue.Add(&blockEvent{
2✔
205
                eventType: disconnected,
2✔
206
                block:     filteredBlock,
2✔
207
        })
2✔
208
}
2✔
209

210
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
211
// result of applying the current registered UTXO sub-set on the block
212
// corresponding to that block hash. If any watched UTOX's are spent by the
213
// selected lock, then the internal chainFilter will also be updated.
214
//
215
// NOTE: This is part of the FilteredChainView interface.
216
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
2✔
217
        req := &filterBlockReq{
2✔
218
                blockHash: blockHash,
2✔
219
                resp:      make(chan *FilteredBlock, 1),
2✔
220
                err:       make(chan error, 1),
2✔
221
        }
2✔
222

2✔
223
        select {
2✔
224
        case b.filterBlockReqs <- req:
2✔
225
        case <-b.quit:
×
226
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
227
        }
228

229
        return <-req.resp, <-req.err
2✔
230
}
231

232
// chainFilterer is the primary goroutine which: listens for new blocks coming
233
// and dispatches the relevant FilteredBlock notifications, updates the filter
234
// due to requests by callers, and finally is able to perform targeted block
235
// filtration.
236
//
237
// TODO(roasbeef): change to use loadfilter RPC's
238
func (b *BitcoindFilteredChainView) chainFilterer() {
2✔
239
        defer b.wg.Done()
2✔
240

2✔
241
        // filterBlock is a helper function that scans the given block, and
2✔
242
        // notes which transactions spend outputs which are currently being
2✔
243
        // watched. Additionally, the chain filter will also be updated by
2✔
244
        // removing any spent outputs.
2✔
245
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
4✔
246
                b.filterMtx.Lock()
2✔
247
                defer b.filterMtx.Unlock()
2✔
248

2✔
249
                var filteredTxns []*wire.MsgTx
2✔
250
                for _, tx := range blk.Transactions {
4✔
251
                        var txAlreadyFiltered bool
2✔
252
                        for _, txIn := range tx.TxIn {
4✔
253
                                prevOp := txIn.PreviousOutPoint
2✔
254
                                if _, ok := b.chainFilter[prevOp]; !ok {
4✔
255
                                        continue
2✔
256
                                }
257

258
                                delete(b.chainFilter, prevOp)
2✔
259

2✔
260
                                // Only add this txn to our list of filtered
2✔
261
                                // txns if it is the first previous outpoint to
2✔
262
                                // cause a match.
2✔
263
                                if txAlreadyFiltered {
2✔
264
                                        continue
×
265
                                }
266

267
                                filteredTxns = append(filteredTxns, tx.Copy())
2✔
268
                                txAlreadyFiltered = true
2✔
269
                        }
270
                }
271

272
                return filteredTxns
2✔
273
        }
274

275
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
2✔
276
                height uint32) (*FilteredBlock, error) {
2✔
277
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
278
                if err != nil {
×
279
                        return nil, err
×
280

×
281
                }
×
282
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
283
                for _, str := range block.Transactions {
×
284
                        b, err := hex.DecodeString(str)
×
285
                        if err != nil {
×
286
                                return nil, err
×
287
                        }
×
288
                        tx := &wire.MsgTx{}
×
289
                        err = tx.Deserialize(bytes.NewReader(b))
×
290
                        if err != nil {
×
291
                                return nil, err
×
292
                        }
×
293
                        txs = append(txs, tx)
×
294
                }
295
                return &FilteredBlock{
×
296
                        Hash:         *hash,
×
297
                        Height:       height,
×
298
                        Transactions: txs,
×
299
                }, nil
×
300
        }
301

302
        for {
4✔
303
                select {
2✔
304
                // The caller has just sent an update to the current chain
305
                // filter, so we'll apply the update, possibly rewinding our
306
                // state partially.
307
                case update := <-b.filterUpdates:
2✔
308
                        // First, we'll add all the new UTXO's to the set of
2✔
309
                        // watched UTXO's, eliminating any duplicates in the
2✔
310
                        // process.
2✔
311
                        log.Tracef("Updating chain filter with new UTXO's: %v",
2✔
312
                                update.newUtxos)
2✔
313

2✔
314
                        b.filterMtx.Lock()
2✔
315
                        for _, newOp := range update.newUtxos {
4✔
316
                                b.chainFilter[newOp] = struct{}{}
2✔
317
                        }
2✔
318
                        b.filterMtx.Unlock()
2✔
319

2✔
320
                        // Apply the new TX filter to the chain client, which
2✔
321
                        // will cause all following notifications from and
2✔
322
                        // calls to it return blocks filtered with the new
2✔
323
                        // filter.
2✔
324
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
2✔
325
                        if err != nil {
2✔
326
                                log.Errorf("Unable to update filter: %v", err)
×
327
                                continue
×
328
                        }
329

330
                        // All blocks gotten after we loaded the filter will
331
                        // have the filter applied, but we will need to rescan
332
                        // the blocks up to the height of the block we last
333
                        // added to the blockQueue.
334
                        b.bestHeightMtx.Lock()
2✔
335
                        bestHeight := b.bestHeight
2✔
336
                        b.bestHeightMtx.Unlock()
2✔
337

2✔
338
                        // If the update height matches our best known height,
2✔
339
                        // then we don't need to do any rewinding.
2✔
340
                        if update.updateHeight == bestHeight {
4✔
341
                                continue
2✔
342
                        }
343

344
                        // Otherwise, we'll rewind the state to ensure the
345
                        // caller doesn't miss any relevant notifications.
346
                        // Starting from the height _after_ the update height,
347
                        // we'll walk forwards, rescanning one block at a time
348
                        // with the chain client applying the newly loaded
349
                        // filter to each block.
350
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
351
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
×
352
                                if err != nil {
×
353
                                        log.Warnf("Unable to get block hash "+
×
354
                                                "for block at height %d: %v",
×
355
                                                i, err)
×
356
                                        continue
×
357
                                }
358

359
                                // To avoid dealing with the case where a reorg
360
                                // is happening while we rescan, we scan one
361
                                // block at a time, skipping blocks that might
362
                                // have gone missing.
363
                                rescanned, err := b.chainClient.RescanBlocks(
×
364
                                        []chainhash.Hash{*blockHash},
×
365
                                )
×
366
                                if err != nil {
×
367
                                        log.Warnf("Unable to rescan block "+
×
368
                                                "with hash %v at height %d: %v",
×
369
                                                blockHash, i, err)
×
370
                                        continue
×
371
                                }
372

373
                                // If no block was returned from the rescan, it
374
                                // means no matching transactions were found.
375
                                if len(rescanned) != 1 {
×
376
                                        log.Tracef("rescan of block %v at "+
×
377
                                                "height=%d yielded no "+
×
378
                                                "transactions", blockHash, i)
×
379
                                        continue
×
380
                                }
381
                                decoded, err := decodeJSONBlock(
×
382
                                        &rescanned[0], i,
×
383
                                )
×
384
                                if err != nil {
×
385
                                        log.Errorf("Unable to decode block: %v",
×
386
                                                err)
×
387
                                        continue
×
388
                                }
389
                                b.blockQueue.Add(&blockEvent{
×
390
                                        eventType: connected,
×
391
                                        block:     decoded,
×
392
                                })
×
393
                        }
394

395
                // We've received a new request to manually filter a block.
396
                case req := <-b.filterBlockReqs:
2✔
397
                        // First we'll fetch the block itself as well as some
2✔
398
                        // additional information including its height.
2✔
399
                        block, err := b.GetBlock(req.blockHash)
2✔
400
                        if err != nil {
2✔
401
                                req.err <- err
×
402
                                req.resp <- nil
×
403
                                continue
×
404
                        }
405
                        header, err := b.chainClient.GetBlockHeaderVerbose(
2✔
406
                                req.blockHash)
2✔
407
                        if err != nil {
2✔
408
                                req.err <- err
×
409
                                req.resp <- nil
×
410
                                continue
×
411
                        }
412

413
                        // Once we have this info, we can directly filter the
414
                        // block and dispatch the proper notification.
415
                        req.resp <- &FilteredBlock{
2✔
416
                                Hash:         *req.blockHash,
2✔
417
                                Height:       uint32(header.Height),
2✔
418
                                Transactions: filterBlock(block),
2✔
419
                        }
2✔
420
                        req.err <- err
2✔
421

422
                // We've received a new event from the chain client.
423
                case event := <-b.chainClient.Notifications():
2✔
424
                        switch e := event.(type) {
2✔
425

426
                        case chain.FilteredBlockConnected:
2✔
427
                                b.onFilteredBlockConnected(
2✔
428
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
2✔
429
                                )
2✔
430

431
                        case chain.BlockDisconnected:
2✔
432
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
2✔
433
                        }
434

435
                case <-b.quit:
2✔
436
                        return
2✔
437
                }
438
        }
439
}
440

441
// UpdateFilter updates the UTXO filter which is to be consulted when creating
442
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
443
// meaning repeated calls to this method should _expand_ the size of the UTXO
444
// sub-set currently being watched.  If the set updateHeight is _lower_ than
445
// the best known height of the implementation, then the state should be
446
// rewound to ensure all relevant notifications are dispatched.
447
//
448
// NOTE: This is part of the FilteredChainView interface.
449
func (b *BitcoindFilteredChainView) UpdateFilter(ops []channeldb.EdgePoint,
450
        updateHeight uint32) error {
2✔
451

2✔
452
        newUtxos := make([]wire.OutPoint, len(ops))
2✔
453
        for i, op := range ops {
4✔
454
                newUtxos[i] = op.OutPoint
2✔
455
        }
2✔
456

457
        select {
2✔
458

459
        case b.filterUpdates <- filterUpdate{
460
                newUtxos:     newUtxos,
461
                updateHeight: updateHeight,
462
        }:
2✔
463
                return nil
2✔
464

465
        case <-b.quit:
×
466
                return fmt.Errorf("chain filter shutting down")
×
467
        }
468
}
469

470
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
471
// Each time a block is connected to the end of a main chain, and appropriate
472
// FilteredBlock which contains the transactions which mutate our watched UTXO
473
// set is to be returned.
474
//
475
// NOTE: This is part of the FilteredChainView interface.
476
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
2✔
477
        return b.blockQueue.newBlocks
2✔
478
}
2✔
479

480
// DisconnectedBlocks returns a receive only channel which will be sent upon
481
// with the empty filtered blocks of blocks which are disconnected from the
482
// main chain in the case of a re-org.
483
//
484
// NOTE: This is part of the FilteredChainView interface.
485
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
2✔
486
        return b.blockQueue.staleBlocks
2✔
487
}
2✔
488

489
// GetBlock is used to retrieve the block with the given hash. This function
490
// wraps the blockCache's GetBlock function.
491
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
492
        *wire.MsgBlock, error) {
2✔
493

2✔
494
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
2✔
495
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc