• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16911773184

12 Aug 2025 02:21PM UTC coverage: 57.471% (-9.4%) from 66.9%
16911773184

Pull #10103

github

web-flow
Merge d64a1234d into f3e1f2f35
Pull Request #10103: Rate limit outgoing gossip bandwidth by peer

57 of 77 new or added lines in 5 files covered. (74.03%)

28294 existing lines in 457 files now uncovered.

99110 of 172451 relevant lines covered (57.47%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.7
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
1✔
70

1✔
71
        chainView := &BitcoindFilteredChainView{
1✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
1✔
73
                filterUpdates:   make(chan filterUpdate),
1✔
74
                filterBlockReqs: make(chan *filterBlockReq),
1✔
75
                blockCache:      blockCache,
1✔
76
                quit:            make(chan struct{}),
1✔
77
        }
1✔
78

1✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
1✔
80
        chainView.blockQueue = newBlockEventQueue()
1✔
81

1✔
82
        return chainView
1✔
83
}
1✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
1✔
89
        // Already started?
1✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
1✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
1✔
95

1✔
96
        err := b.chainClient.Start()
1✔
97
        if err != nil {
1✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
1✔
102
        if err != nil {
1✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
1✔
107
        if err != nil {
1✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
1✔
112
        b.bestHeight = uint32(bestHeight)
1✔
113
        b.bestHeightMtx.Unlock()
1✔
114

1✔
115
        b.blockQueue.Start()
1✔
116

1✔
117
        b.wg.Add(1)
1✔
118
        go b.chainFilterer()
1✔
119

1✔
120
        return nil
1✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
1✔
128
        log.Debug("BitcoindFilteredChainView stopping")
1✔
129
        defer log.Debug("BitcoindFilteredChainView stopped")
1✔
130

1✔
131
        // Already shutting down?
1✔
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
1✔
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
138
        b.chainClient.Stop()
1✔
139
        b.chainClient.WaitForShutdown()
1✔
140

1✔
141
        b.blockQueue.Stop()
1✔
142

1✔
143
        close(b.quit)
1✔
144
        b.wg.Wait()
1✔
145

1✔
146
        return nil
1✔
147
}
148

149
// onFilteredBlockConnected is called for each block that's connected to the
150
// end of the main chain. Based on our current chain filter, the block may or
151
// may not include any relevant transactions.
152
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
153
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
1✔
154

1✔
155
        mtxs := make([]*wire.MsgTx, len(txns))
1✔
156
        b.filterMtx.Lock()
1✔
157
        for i, tx := range txns {
2✔
158
                mtxs[i] = &tx.MsgTx
1✔
159

1✔
160
                for _, txIn := range mtxs[i].TxIn {
2✔
161
                        // We can delete this outpoint from the chainFilter, as
1✔
162
                        // we just received a block where it was spent. In case
1✔
163
                        // of a reorg, this outpoint might get "un-spent", but
1✔
164
                        // that's okay since it would never be wise to consider
1✔
165
                        // the channel open again (since a spending transaction
1✔
166
                        // exists on the network).
1✔
167
                        delete(b.chainFilter, txIn.PreviousOutPoint)
1✔
168
                }
1✔
169

170
        }
171
        b.filterMtx.Unlock()
1✔
172

1✔
173
        // We record the height of the last connected block added to the
1✔
174
        // blockQueue such that we can scan up to this height in case of
1✔
175
        // a rescan. It must be protected by a mutex since a filter update
1✔
176
        // might be trying to read it concurrently.
1✔
177
        b.bestHeightMtx.Lock()
1✔
178
        b.bestHeight = uint32(height)
1✔
179
        b.bestHeightMtx.Unlock()
1✔
180

1✔
181
        block := &FilteredBlock{
1✔
182
                Hash:         hash,
1✔
183
                Height:       uint32(height),
1✔
184
                Transactions: mtxs,
1✔
185
        }
1✔
186

1✔
187
        b.blockQueue.Add(&blockEvent{
1✔
188
                eventType: connected,
1✔
189
                block:     block,
1✔
190
        })
1✔
191
}
192

193
// onFilteredBlockDisconnected is a callback which is executed once a block is
194
// disconnected from the end of the main chain.
195
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
196
        hash chainhash.Hash) {
1✔
197

1✔
198
        log.Debugf("got disconnected block at height %d: %v", height,
1✔
199
                hash)
1✔
200

1✔
201
        filteredBlock := &FilteredBlock{
1✔
202
                Hash:   hash,
1✔
203
                Height: uint32(height),
1✔
204
        }
1✔
205

1✔
206
        b.blockQueue.Add(&blockEvent{
1✔
207
                eventType: disconnected,
1✔
208
                block:     filteredBlock,
1✔
209
        })
1✔
210
}
1✔
211

212
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
213
// result of applying the current registered UTXO sub-set on the block
214
// corresponding to that block hash. If any watched UTOX's are spent by the
215
// selected lock, then the internal chainFilter will also be updated.
216
//
217
// NOTE: This is part of the FilteredChainView interface.
218
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
219
        req := &filterBlockReq{
1✔
220
                blockHash: blockHash,
1✔
221
                resp:      make(chan *FilteredBlock, 1),
1✔
222
                err:       make(chan error, 1),
1✔
223
        }
1✔
224

1✔
225
        select {
1✔
226
        case b.filterBlockReqs <- req:
1✔
227
        case <-b.quit:
×
228
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
229
        }
230

231
        return <-req.resp, <-req.err
1✔
232
}
233

234
// chainFilterer is the primary goroutine which: listens for new blocks coming
235
// and dispatches the relevant FilteredBlock notifications, updates the filter
236
// due to requests by callers, and finally is able to perform targeted block
237
// filtration.
238
//
239
// TODO(roasbeef): change to use loadfilter RPC's
240
func (b *BitcoindFilteredChainView) chainFilterer() {
1✔
241
        defer b.wg.Done()
1✔
242

1✔
243
        // filterBlock is a helper function that scans the given block, and
1✔
244
        // notes which transactions spend outputs which are currently being
1✔
245
        // watched. Additionally, the chain filter will also be updated by
1✔
246
        // removing any spent outputs.
1✔
247
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
2✔
248
                b.filterMtx.Lock()
1✔
249
                defer b.filterMtx.Unlock()
1✔
250

1✔
251
                var filteredTxns []*wire.MsgTx
1✔
252
                for _, tx := range blk.Transactions {
2✔
253
                        var txAlreadyFiltered bool
1✔
254
                        for _, txIn := range tx.TxIn {
2✔
255
                                prevOp := txIn.PreviousOutPoint
1✔
256
                                if _, ok := b.chainFilter[prevOp]; !ok {
2✔
257
                                        continue
1✔
258
                                }
259

260
                                delete(b.chainFilter, prevOp)
1✔
261

1✔
262
                                // Only add this txn to our list of filtered
1✔
263
                                // txns if it is the first previous outpoint to
1✔
264
                                // cause a match.
1✔
265
                                if txAlreadyFiltered {
1✔
266
                                        continue
×
267
                                }
268

269
                                filteredTxns = append(filteredTxns, tx.Copy())
1✔
270
                                txAlreadyFiltered = true
1✔
271
                        }
272
                }
273

274
                return filteredTxns
1✔
275
        }
276

277
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
1✔
278
                height uint32) (*FilteredBlock, error) {
1✔
UNCOV
279
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
UNCOV
280
                if err != nil {
×
281
                        return nil, err
×
282

×
283
                }
×
UNCOV
284
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
UNCOV
285
                for _, str := range block.Transactions {
×
UNCOV
286
                        b, err := hex.DecodeString(str)
×
UNCOV
287
                        if err != nil {
×
288
                                return nil, err
×
289
                        }
×
UNCOV
290
                        tx := &wire.MsgTx{}
×
UNCOV
291
                        err = tx.Deserialize(bytes.NewReader(b))
×
UNCOV
292
                        if err != nil {
×
293
                                return nil, err
×
294
                        }
×
UNCOV
295
                        txs = append(txs, tx)
×
296
                }
UNCOV
297
                return &FilteredBlock{
×
UNCOV
298
                        Hash:         *hash,
×
UNCOV
299
                        Height:       height,
×
UNCOV
300
                        Transactions: txs,
×
UNCOV
301
                }, nil
×
302
        }
303

304
        for {
2✔
305
                select {
1✔
306
                // The caller has just sent an update to the current chain
307
                // filter, so we'll apply the update, possibly rewinding our
308
                // state partially.
309
                case update := <-b.filterUpdates:
1✔
310
                        // First, we'll add all the new UTXO's to the set of
1✔
311
                        // watched UTXO's, eliminating any duplicates in the
1✔
312
                        // process.
1✔
313
                        log.Tracef("Updating chain filter with new UTXO's: %v",
1✔
314
                                update.newUtxos)
1✔
315

1✔
316
                        b.filterMtx.Lock()
1✔
317
                        for _, newOp := range update.newUtxos {
2✔
318
                                b.chainFilter[newOp] = struct{}{}
1✔
319
                        }
1✔
320
                        b.filterMtx.Unlock()
1✔
321

1✔
322
                        // Apply the new TX filter to the chain client, which
1✔
323
                        // will cause all following notifications from and
1✔
324
                        // calls to it return blocks filtered with the new
1✔
325
                        // filter.
1✔
326
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
1✔
327
                        if err != nil {
1✔
328
                                log.Errorf("Unable to update filter: %v", err)
×
329
                                continue
×
330
                        }
331

332
                        // All blocks gotten after we loaded the filter will
333
                        // have the filter applied, but we will need to rescan
334
                        // the blocks up to the height of the block we last
335
                        // added to the blockQueue.
336
                        b.bestHeightMtx.Lock()
1✔
337
                        bestHeight := b.bestHeight
1✔
338
                        b.bestHeightMtx.Unlock()
1✔
339

1✔
340
                        // If the update height matches our best known height,
1✔
341
                        // then we don't need to do any rewinding.
1✔
342
                        if update.updateHeight == bestHeight {
2✔
343
                                continue
1✔
344
                        }
345

346
                        // Otherwise, we'll rewind the state to ensure the
347
                        // caller doesn't miss any relevant notifications.
348
                        // Starting from the height _after_ the update height,
349
                        // we'll walk forwards, rescanning one block at a time
350
                        // with the chain client applying the newly loaded
351
                        // filter to each block.
UNCOV
352
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
UNCOV
353
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
×
UNCOV
354
                                if err != nil {
×
355
                                        log.Warnf("Unable to get block hash "+
×
356
                                                "for block at height %d: %v",
×
357
                                                i, err)
×
358
                                        continue
×
359
                                }
360

361
                                // To avoid dealing with the case where a reorg
362
                                // is happening while we rescan, we scan one
363
                                // block at a time, skipping blocks that might
364
                                // have gone missing.
UNCOV
365
                                rescanned, err := b.chainClient.RescanBlocks(
×
UNCOV
366
                                        []chainhash.Hash{*blockHash},
×
UNCOV
367
                                )
×
UNCOV
368
                                if err != nil {
×
369
                                        log.Warnf("Unable to rescan block "+
×
370
                                                "with hash %v at height %d: %v",
×
371
                                                blockHash, i, err)
×
372
                                        continue
×
373
                                }
374

375
                                // If no block was returned from the rescan, it
376
                                // means no matching transactions were found.
UNCOV
377
                                if len(rescanned) != 1 {
×
378
                                        log.Tracef("rescan of block %v at "+
×
379
                                                "height=%d yielded no "+
×
380
                                                "transactions", blockHash, i)
×
381
                                        continue
×
382
                                }
UNCOV
383
                                decoded, err := decodeJSONBlock(
×
UNCOV
384
                                        &rescanned[0], i,
×
UNCOV
385
                                )
×
UNCOV
386
                                if err != nil {
×
387
                                        log.Errorf("Unable to decode block: %v",
×
388
                                                err)
×
389
                                        continue
×
390
                                }
UNCOV
391
                                b.blockQueue.Add(&blockEvent{
×
UNCOV
392
                                        eventType: connected,
×
UNCOV
393
                                        block:     decoded,
×
UNCOV
394
                                })
×
395
                        }
396

397
                // We've received a new request to manually filter a block.
398
                case req := <-b.filterBlockReqs:
1✔
399
                        // First we'll fetch the block itself as well as some
1✔
400
                        // additional information including its height.
1✔
401
                        block, err := b.GetBlock(req.blockHash)
1✔
402
                        if err != nil {
1✔
403
                                req.err <- err
×
404
                                req.resp <- nil
×
405
                                continue
×
406
                        }
407
                        header, err := b.chainClient.GetBlockHeaderVerbose(
1✔
408
                                req.blockHash)
1✔
409
                        if err != nil {
1✔
410
                                req.err <- err
×
411
                                req.resp <- nil
×
412
                                continue
×
413
                        }
414

415
                        // Once we have this info, we can directly filter the
416
                        // block and dispatch the proper notification.
417
                        req.resp <- &FilteredBlock{
1✔
418
                                Hash:         *req.blockHash,
1✔
419
                                Height:       uint32(header.Height),
1✔
420
                                Transactions: filterBlock(block),
1✔
421
                        }
1✔
422
                        req.err <- err
1✔
423

424
                // We've received a new event from the chain client.
425
                case event := <-b.chainClient.Notifications():
1✔
426
                        switch e := event.(type) {
1✔
427

428
                        case chain.FilteredBlockConnected:
1✔
429
                                b.onFilteredBlockConnected(
1✔
430
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
1✔
431
                                )
1✔
432

433
                        case chain.BlockDisconnected:
1✔
434
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
1✔
435
                        }
436

437
                case <-b.quit:
1✔
438
                        return
1✔
439
                }
440
        }
441
}
442

443
// UpdateFilter updates the UTXO filter which is to be consulted when creating
444
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
445
// meaning repeated calls to this method should _expand_ the size of the UTXO
446
// sub-set currently being watched.  If the set updateHeight is _lower_ than
447
// the best known height of the implementation, then the state should be
448
// rewound to ensure all relevant notifications are dispatched.
449
//
450
// NOTE: This is part of the FilteredChainView interface.
451
func (b *BitcoindFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
452
        updateHeight uint32) error {
1✔
453

1✔
454
        newUtxos := make([]wire.OutPoint, len(ops))
1✔
455
        for i, op := range ops {
2✔
456
                newUtxos[i] = op.OutPoint
1✔
457
        }
1✔
458

459
        select {
1✔
460

461
        case b.filterUpdates <- filterUpdate{
462
                newUtxos:     newUtxos,
463
                updateHeight: updateHeight,
464
        }:
1✔
465
                return nil
1✔
466

467
        case <-b.quit:
×
468
                return fmt.Errorf("chain filter shutting down")
×
469
        }
470
}
471

472
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
473
// Each time a block is connected to the end of a main chain, and appropriate
474
// FilteredBlock which contains the transactions which mutate our watched UTXO
475
// set is to be returned.
476
//
477
// NOTE: This is part of the FilteredChainView interface.
478
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
1✔
479
        return b.blockQueue.newBlocks
1✔
480
}
1✔
481

482
// DisconnectedBlocks returns a receive only channel which will be sent upon
483
// with the empty filtered blocks of blocks which are disconnected from the
484
// main chain in the case of a re-org.
485
//
486
// NOTE: This is part of the FilteredChainView interface.
487
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
488
        return b.blockQueue.staleBlocks
1✔
489
}
1✔
490

491
// GetBlock is used to retrieve the block with the given hash. This function
492
// wraps the blockCache's GetBlock function.
493
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
494
        *wire.MsgBlock, error) {
1✔
495

1✔
496
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
1✔
497
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc