• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12165272839

04 Dec 2024 05:35PM UTC coverage: 58.978% (+0.2%) from 58.789%
12165272839

Pull #9316

github

ziggie1984
docs: add release-notes.
Pull Request #9316: routing: fix mc blinded path behaviour.

125 of 130 new or added lines in 4 files covered. (96.15%)

55 existing lines in 9 files now uncovered.

133554 of 226448 relevant lines covered (58.98%)

19570.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.43
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
6✔
70

6✔
71
        chainView := &BitcoindFilteredChainView{
6✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
6✔
73
                filterUpdates:   make(chan filterUpdate),
6✔
74
                filterBlockReqs: make(chan *filterBlockReq),
6✔
75
                blockCache:      blockCache,
6✔
76
                quit:            make(chan struct{}),
6✔
77
        }
6✔
78

6✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
6✔
80
        chainView.blockQueue = newBlockEventQueue()
6✔
81

6✔
82
        return chainView
6✔
83
}
6✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
6✔
89
        // Already started?
6✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
6✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
6✔
95

6✔
96
        err := b.chainClient.Start()
6✔
97
        if err != nil {
6✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
6✔
102
        if err != nil {
6✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
6✔
107
        if err != nil {
6✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
6✔
112
        b.bestHeight = uint32(bestHeight)
6✔
113
        b.bestHeightMtx.Unlock()
6✔
114

6✔
115
        b.blockQueue.Start()
6✔
116

6✔
117
        b.wg.Add(1)
6✔
118
        go b.chainFilterer()
6✔
119

6✔
120
        return nil
6✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
6✔
128
        log.Debug("BitcoindFilteredChainView stopping")
6✔
129
        defer log.Debug("BitcoindFilteredChainView stopped")
6✔
130

6✔
131
        // Already shutting down?
6✔
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
6✔
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
138
        b.chainClient.Stop()
6✔
139
        b.chainClient.WaitForShutdown()
6✔
140

6✔
141
        b.blockQueue.Stop()
6✔
142

6✔
143
        close(b.quit)
6✔
144
        b.wg.Wait()
6✔
145

6✔
146
        return nil
6✔
147
}
148

149
// onFilteredBlockConnected is called for each block that's connected to the
150
// end of the main chain. Based on our current chain filter, the block may or
151
// may not include any relevant transactions.
152
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
153
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
957✔
154

957✔
155
        mtxs := make([]*wire.MsgTx, len(txns))
957✔
156
        b.filterMtx.Lock()
957✔
157
        for i, tx := range txns {
963✔
158
                mtxs[i] = &tx.MsgTx
6✔
159

6✔
160
                for _, txIn := range mtxs[i].TxIn {
12✔
161
                        // We can delete this outpoint from the chainFilter, as
6✔
162
                        // we just received a block where it was spent. In case
6✔
163
                        // of a reorg, this outpoint might get "un-spent", but
6✔
164
                        // that's okay since it would never be wise to consider
6✔
165
                        // the channel open again (since a spending transaction
6✔
166
                        // exists on the network).
6✔
167
                        delete(b.chainFilter, txIn.PreviousOutPoint)
6✔
168
                }
6✔
169

170
        }
171
        b.filterMtx.Unlock()
957✔
172

957✔
173
        // We record the height of the last connected block added to the
957✔
174
        // blockQueue such that we can scan up to this height in case of
957✔
175
        // a rescan. It must be protected by a mutex since a filter update
957✔
176
        // might be trying to read it concurrently.
957✔
177
        b.bestHeightMtx.Lock()
957✔
178
        b.bestHeight = uint32(height)
957✔
179
        b.bestHeightMtx.Unlock()
957✔
180

957✔
181
        block := &FilteredBlock{
957✔
182
                Hash:         hash,
957✔
183
                Height:       uint32(height),
957✔
184
                Transactions: mtxs,
957✔
185
        }
957✔
186

957✔
187
        b.blockQueue.Add(&blockEvent{
957✔
188
                eventType: connected,
957✔
189
                block:     block,
957✔
190
        })
957✔
191
}
192

193
// onFilteredBlockDisconnected is a callback which is executed once a block is
194
// disconnected from the end of the main chain.
195
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
196
        hash chainhash.Hash) {
832✔
197

832✔
198
        log.Debugf("got disconnected block at height %d: %v", height,
832✔
199
                hash)
832✔
200

832✔
201
        filteredBlock := &FilteredBlock{
832✔
202
                Hash:   hash,
832✔
203
                Height: uint32(height),
832✔
204
        }
832✔
205

832✔
206
        b.blockQueue.Add(&blockEvent{
832✔
207
                eventType: disconnected,
832✔
208
                block:     filteredBlock,
832✔
209
        })
832✔
210
}
832✔
211

212
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
213
// result of applying the current registered UTXO sub-set on the block
214
// corresponding to that block hash. If any watched UTOX's are spent by the
215
// selected lock, then the internal chainFilter will also be updated.
216
//
217
// NOTE: This is part of the FilteredChainView interface.
218
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
4✔
219
        req := &filterBlockReq{
4✔
220
                blockHash: blockHash,
4✔
221
                resp:      make(chan *FilteredBlock, 1),
4✔
222
                err:       make(chan error, 1),
4✔
223
        }
4✔
224

4✔
225
        select {
4✔
226
        case b.filterBlockReqs <- req:
4✔
227
        case <-b.quit:
×
228
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
229
        }
230

231
        return <-req.resp, <-req.err
4✔
232
}
233

234
// chainFilterer is the primary goroutine which: listens for new blocks coming
235
// and dispatches the relevant FilteredBlock notifications, updates the filter
236
// due to requests by callers, and finally is able to perform targeted block
237
// filtration.
238
//
239
// TODO(roasbeef): change to use loadfilter RPC's
240
func (b *BitcoindFilteredChainView) chainFilterer() {
6✔
241
        defer b.wg.Done()
6✔
242

6✔
243
        // filterBlock is a helper function that scans the given block, and
6✔
244
        // notes which transactions spend outputs which are currently being
6✔
245
        // watched. Additionally, the chain filter will also be updated by
6✔
246
        // removing any spent outputs.
6✔
247
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
10✔
248
                b.filterMtx.Lock()
4✔
249
                defer b.filterMtx.Unlock()
4✔
250

4✔
251
                var filteredTxns []*wire.MsgTx
4✔
252
                for _, tx := range blk.Transactions {
12✔
253
                        var txAlreadyFiltered bool
8✔
254
                        for _, txIn := range tx.TxIn {
16✔
255
                                prevOp := txIn.PreviousOutPoint
8✔
256
                                if _, ok := b.chainFilter[prevOp]; !ok {
12✔
257
                                        continue
4✔
258
                                }
259

260
                                delete(b.chainFilter, prevOp)
6✔
261

6✔
262
                                // Only add this txn to our list of filtered
6✔
263
                                // txns if it is the first previous outpoint to
6✔
264
                                // cause a match.
6✔
265
                                if txAlreadyFiltered {
6✔
266
                                        continue
×
267
                                }
268

269
                                filteredTxns = append(filteredTxns, tx.Copy())
6✔
270
                                txAlreadyFiltered = true
6✔
271
                        }
272
                }
273

274
                return filteredTxns
4✔
275
        }
276

277
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
6✔
278
                height uint32) (*FilteredBlock, error) {
8✔
279
                hash, err := chainhash.NewHashFromStr(block.Hash)
2✔
280
                if err != nil {
2✔
281
                        return nil, err
×
282

×
283
                }
×
284
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
2✔
285
                for _, str := range block.Transactions {
4✔
286
                        b, err := hex.DecodeString(str)
2✔
287
                        if err != nil {
2✔
288
                                return nil, err
×
289
                        }
×
290
                        tx := &wire.MsgTx{}
2✔
291
                        err = tx.Deserialize(bytes.NewReader(b))
2✔
292
                        if err != nil {
2✔
293
                                return nil, err
×
294
                        }
×
295
                        txs = append(txs, tx)
2✔
296
                }
297
                return &FilteredBlock{
2✔
298
                        Hash:         *hash,
2✔
299
                        Height:       height,
2✔
300
                        Transactions: txs,
2✔
301
                }, nil
2✔
302
        }
303

304
        for {
2,776✔
305
                select {
2,770✔
306
                // The caller has just sent an update to the current chain
307
                // filter, so we'll apply the update, possibly rewinding our
308
                // state partially.
309
                case update := <-b.filterUpdates:
8✔
310
                        // First, we'll add all the new UTXO's to the set of
8✔
311
                        // watched UTXO's, eliminating any duplicates in the
8✔
312
                        // process.
8✔
313
                        log.Tracef("Updating chain filter with new UTXO's: %v",
8✔
314
                                update.newUtxos)
8✔
315

8✔
316
                        b.filterMtx.Lock()
8✔
317
                        for _, newOp := range update.newUtxos {
20✔
318
                                b.chainFilter[newOp] = struct{}{}
12✔
319
                        }
12✔
320
                        b.filterMtx.Unlock()
8✔
321

8✔
322
                        // Apply the new TX filter to the chain client, which
8✔
323
                        // will cause all following notifications from and
8✔
324
                        // calls to it return blocks filtered with the new
8✔
325
                        // filter.
8✔
326
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
8✔
327
                        if err != nil {
8✔
328
                                log.Errorf("Unable to update filter: %v", err)
×
329
                                continue
×
330
                        }
331

332
                        // All blocks gotten after we loaded the filter will
333
                        // have the filter applied, but we will need to rescan
334
                        // the blocks up to the height of the block we last
335
                        // added to the blockQueue.
336
                        b.bestHeightMtx.Lock()
8✔
337
                        bestHeight := b.bestHeight
8✔
338
                        b.bestHeightMtx.Unlock()
8✔
339

8✔
340
                        // If the update height matches our best known height,
8✔
341
                        // then we don't need to do any rewinding.
8✔
342
                        if update.updateHeight == bestHeight {
14✔
343
                                continue
6✔
344
                        }
345

346
                        // Otherwise, we'll rewind the state to ensure the
347
                        // caller doesn't miss any relevant notifications.
348
                        // Starting from the height _after_ the update height,
349
                        // we'll walk forwards, rescanning one block at a time
350
                        // with the chain client applying the newly loaded
351
                        // filter to each block.
352
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
4✔
353
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
2✔
354
                                if err != nil {
2✔
355
                                        log.Warnf("Unable to get block hash "+
×
356
                                                "for block at height %d: %v",
×
357
                                                i, err)
×
358
                                        continue
×
359
                                }
360

361
                                // To avoid dealing with the case where a reorg
362
                                // is happening while we rescan, we scan one
363
                                // block at a time, skipping blocks that might
364
                                // have gone missing.
365
                                rescanned, err := b.chainClient.RescanBlocks(
2✔
366
                                        []chainhash.Hash{*blockHash},
2✔
367
                                )
2✔
368
                                if err != nil {
2✔
369
                                        log.Warnf("Unable to rescan block "+
×
370
                                                "with hash %v at height %d: %v",
×
371
                                                blockHash, i, err)
×
372
                                        continue
×
373
                                }
374

375
                                // If no block was returned from the rescan, it
376
                                // means no matching transactions were found.
377
                                if len(rescanned) != 1 {
2✔
UNCOV
378
                                        log.Tracef("rescan of block %v at "+
×
UNCOV
379
                                                "height=%d yielded no "+
×
UNCOV
380
                                                "transactions", blockHash, i)
×
UNCOV
381
                                        continue
×
382
                                }
383
                                decoded, err := decodeJSONBlock(
2✔
384
                                        &rescanned[0], i,
2✔
385
                                )
2✔
386
                                if err != nil {
2✔
387
                                        log.Errorf("Unable to decode block: %v",
×
388
                                                err)
×
389
                                        continue
×
390
                                }
391
                                b.blockQueue.Add(&blockEvent{
2✔
392
                                        eventType: connected,
2✔
393
                                        block:     decoded,
2✔
394
                                })
2✔
395
                        }
396

397
                // We've received a new request to manually filter a block.
398
                case req := <-b.filterBlockReqs:
4✔
399
                        // First we'll fetch the block itself as well as some
4✔
400
                        // additional information including its height.
4✔
401
                        block, err := b.GetBlock(req.blockHash)
4✔
402
                        if err != nil {
4✔
403
                                req.err <- err
×
404
                                req.resp <- nil
×
405
                                continue
×
406
                        }
407
                        header, err := b.chainClient.GetBlockHeaderVerbose(
4✔
408
                                req.blockHash)
4✔
409
                        if err != nil {
4✔
410
                                req.err <- err
×
411
                                req.resp <- nil
×
412
                                continue
×
413
                        }
414

415
                        // Once we have this info, we can directly filter the
416
                        // block and dispatch the proper notification.
417
                        req.resp <- &FilteredBlock{
4✔
418
                                Hash:         *req.blockHash,
4✔
419
                                Height:       uint32(header.Height),
4✔
420
                                Transactions: filterBlock(block),
4✔
421
                        }
4✔
422
                        req.err <- err
4✔
423

424
                // We've received a new event from the chain client.
425
                case event := <-b.chainClient.Notifications():
2,758✔
426
                        switch e := event.(type) {
2,758✔
427

428
                        case chain.FilteredBlockConnected:
957✔
429
                                b.onFilteredBlockConnected(
957✔
430
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
957✔
431
                                )
957✔
432

433
                        case chain.BlockDisconnected:
832✔
434
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
832✔
435
                        }
436

437
                case <-b.quit:
6✔
438
                        return
6✔
439
                }
440
        }
441
}
442

443
// UpdateFilter updates the UTXO filter which is to be consulted when creating
444
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
445
// meaning repeated calls to this method should _expand_ the size of the UTXO
446
// sub-set currently being watched.  If the set updateHeight is _lower_ than
447
// the best known height of the implementation, then the state should be
448
// rewound to ensure all relevant notifications are dispatched.
449
//
450
// NOTE: This is part of the FilteredChainView interface.
451
func (b *BitcoindFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
452
        updateHeight uint32) error {
8✔
453

8✔
454
        newUtxos := make([]wire.OutPoint, len(ops))
8✔
455
        for i, op := range ops {
20✔
456
                newUtxos[i] = op.OutPoint
12✔
457
        }
12✔
458

459
        select {
8✔
460

461
        case b.filterUpdates <- filterUpdate{
462
                newUtxos:     newUtxos,
463
                updateHeight: updateHeight,
464
        }:
8✔
465
                return nil
8✔
466

467
        case <-b.quit:
×
468
                return fmt.Errorf("chain filter shutting down")
×
469
        }
470
}
471

472
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
473
// Each time a block is connected to the end of a main chain, and appropriate
474
// FilteredBlock which contains the transactions which mutate our watched UTXO
475
// set is to be returned.
476
//
477
// NOTE: This is part of the FilteredChainView interface.
478
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
10✔
479
        return b.blockQueue.newBlocks
10✔
480
}
10✔
481

482
// DisconnectedBlocks returns a receive only channel which will be sent upon
483
// with the empty filtered blocks of blocks which are disconnected from the
484
// main chain in the case of a re-org.
485
//
486
// NOTE: This is part of the FilteredChainView interface.
487
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
4✔
488
        return b.blockQueue.staleBlocks
4✔
489
}
4✔
490

491
// GetBlock is used to retrieve the block with the given hash. This function
492
// wraps the blockCache's GetBlock function.
493
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
494
        *wire.MsgBlock, error) {
4✔
495

4✔
496
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
4✔
497
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc