• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12262048535

10 Dec 2024 06:05PM UTC coverage: 49.431% (-0.4%) from 49.82%
12262048535

Pull #9316

github

ziggie1984
docs: fix typos in release-notes 19.0
Pull Request #9316: routing: fix mc blinded path behaviour.

200 of 214 new or added lines in 4 files covered. (93.46%)

936 existing lines in 18 files now uncovered.

99466 of 201223 relevant lines covered (49.43%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/routing/chainview/btcd.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/rpcclient"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BtcdFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by an active websockets connection to btcd.
21
type BtcdFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        btcdConn *rpcclient.Client
33

34
        // blockEventQueue is the ordered queue used to keep the order
35
        // of connected and disconnected blocks sent to the reader of the
36
        // chainView.
37
        blockQueue *blockEventQueue
38

39
        // blockCache is an LRU block cache.
40
        blockCache *blockcache.BlockCache
41

42
        // filterUpdates is a channel in which updates to the utxo filter
43
        // attached to this instance are sent over.
44
        filterUpdates chan filterUpdate
45

46
        // chainFilter is the set of utox's that we're currently watching
47
        // spends for within the chain.
48
        filterMtx   sync.RWMutex
49
        chainFilter map[wire.OutPoint]struct{}
50

51
        // filterBlockReqs is a channel in which requests to filter select
52
        // blocks will be sent over.
53
        filterBlockReqs chan *filterBlockReq
54

55
        quit chan struct{}
56
        wg   sync.WaitGroup
57
}
58

59
// A compile time check to ensure BtcdFilteredChainView implements the
60
// chainview.FilteredChainView.
61
var _ FilteredChainView = (*BtcdFilteredChainView)(nil)
62

63
// NewBtcdFilteredChainView creates a new instance of a FilteredChainView from
64
// RPC credentials for an active btcd instance.
65
func NewBtcdFilteredChainView(config rpcclient.ConnConfig,
UNCOV
66
        blockCache *blockcache.BlockCache) (*BtcdFilteredChainView, error) {
×
UNCOV
67

×
UNCOV
68
        chainView := &BtcdFilteredChainView{
×
UNCOV
69
                chainFilter:     make(map[wire.OutPoint]struct{}),
×
UNCOV
70
                filterUpdates:   make(chan filterUpdate),
×
UNCOV
71
                filterBlockReqs: make(chan *filterBlockReq),
×
UNCOV
72
                blockCache:      blockCache,
×
UNCOV
73
                quit:            make(chan struct{}),
×
UNCOV
74
        }
×
UNCOV
75

×
UNCOV
76
        ntfnCallbacks := &rpcclient.NotificationHandlers{
×
UNCOV
77
                OnFilteredBlockConnected:    chainView.onFilteredBlockConnected,
×
UNCOV
78
                OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected,
×
UNCOV
79
        }
×
UNCOV
80

×
UNCOV
81
        // Disable connecting to btcd within the rpcclient.New method. We
×
UNCOV
82
        // defer establishing the connection to our .Start() method.
×
UNCOV
83
        config.DisableConnectOnNew = true
×
UNCOV
84
        config.DisableAutoReconnect = false
×
UNCOV
85
        chainConn, err := rpcclient.New(&config, ntfnCallbacks)
×
UNCOV
86
        if err != nil {
×
87
                return nil, err
×
88
        }
×
UNCOV
89
        chainView.btcdConn = chainConn
×
UNCOV
90

×
UNCOV
91
        chainView.blockQueue = newBlockEventQueue()
×
UNCOV
92

×
UNCOV
93
        return chainView, nil
×
94
}
95

96
// Start starts all goroutines necessary for normal operation.
97
//
98
// NOTE: This is part of the FilteredChainView interface.
UNCOV
99
func (b *BtcdFilteredChainView) Start() error {
×
UNCOV
100
        // Already started?
×
UNCOV
101
        if atomic.AddInt32(&b.started, 1) != 1 {
×
102
                return nil
×
103
        }
×
104

UNCOV
105
        log.Infof("FilteredChainView starting")
×
UNCOV
106

×
UNCOV
107
        // Connect to btcd, and register for notifications on connected, and
×
UNCOV
108
        // disconnected blocks.
×
UNCOV
109
        if err := b.btcdConn.Connect(20); err != nil {
×
110
                return err
×
111
        }
×
UNCOV
112
        if err := b.btcdConn.NotifyBlocks(); err != nil {
×
113
                return err
×
114
        }
×
115

UNCOV
116
        _, bestHeight, err := b.btcdConn.GetBestBlock()
×
UNCOV
117
        if err != nil {
×
118
                return err
×
119
        }
×
120

UNCOV
121
        b.bestHeightMtx.Lock()
×
UNCOV
122
        b.bestHeight = uint32(bestHeight)
×
UNCOV
123
        b.bestHeightMtx.Unlock()
×
UNCOV
124

×
UNCOV
125
        b.blockQueue.Start()
×
UNCOV
126

×
UNCOV
127
        b.wg.Add(1)
×
UNCOV
128
        go b.chainFilterer()
×
UNCOV
129

×
UNCOV
130
        return nil
×
131
}
132

133
// Stop stops all goroutines which we launched by the prior call to the Start
134
// method.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
UNCOV
137
func (b *BtcdFilteredChainView) Stop() error {
×
UNCOV
138
        log.Debug("BtcdFilteredChainView stopping")
×
UNCOV
139
        defer log.Debug("BtcdFilteredChainView stopped")
×
UNCOV
140

×
UNCOV
141
        // Already shutting down?
×
UNCOV
142
        if atomic.AddInt32(&b.stopped, 1) != 1 {
×
143
                return nil
×
144
        }
×
145

146
        // Shutdown the rpc client, this gracefully disconnects from btcd, and
147
        // cleans up all related resources.
UNCOV
148
        b.btcdConn.Shutdown()
×
UNCOV
149
        b.btcdConn.WaitForShutdown()
×
UNCOV
150

×
UNCOV
151
        b.blockQueue.Stop()
×
UNCOV
152

×
UNCOV
153
        close(b.quit)
×
UNCOV
154
        b.wg.Wait()
×
UNCOV
155

×
UNCOV
156
        return nil
×
157
}
158

159
// onFilteredBlockConnected is called for each block that's connected to the
160
// end of the main chain. Based on our current chain filter, the block may or
161
// may not include any relevant transactions.
162
func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
UNCOV
163
        header *wire.BlockHeader, txns []*btcutil.Tx) {
×
UNCOV
164

×
UNCOV
165
        mtxs := make([]*wire.MsgTx, len(txns))
×
UNCOV
166
        b.filterMtx.Lock()
×
UNCOV
167
        for i, tx := range txns {
×
UNCOV
168
                mtx := tx.MsgTx()
×
UNCOV
169
                mtxs[i] = mtx
×
UNCOV
170

×
UNCOV
171
                for _, txIn := range mtx.TxIn {
×
UNCOV
172
                        // We can delete this outpoint from the chainFilter, as
×
UNCOV
173
                        // we just received a block where it was spent. In case
×
UNCOV
174
                        // of a reorg, this outpoint might get "un-spent", but
×
UNCOV
175
                        // that's okay since it would never be wise to consider
×
UNCOV
176
                        // the channel open again (since a spending transaction
×
UNCOV
177
                        // exists on the network).
×
UNCOV
178
                        delete(b.chainFilter, txIn.PreviousOutPoint)
×
UNCOV
179
                }
×
180

181
        }
UNCOV
182
        b.filterMtx.Unlock()
×
UNCOV
183

×
UNCOV
184
        // We record the height of the last connected block added to the
×
UNCOV
185
        // blockQueue such that we can scan up to this height in case of
×
UNCOV
186
        // a rescan. It must be protected by a mutex since a filter update
×
UNCOV
187
        // might be trying to read it concurrently.
×
UNCOV
188
        b.bestHeightMtx.Lock()
×
UNCOV
189
        b.bestHeight = uint32(height)
×
UNCOV
190
        b.bestHeightMtx.Unlock()
×
UNCOV
191

×
UNCOV
192
        block := &FilteredBlock{
×
UNCOV
193
                Hash:         header.BlockHash(),
×
UNCOV
194
                Height:       uint32(height),
×
UNCOV
195
                Transactions: mtxs,
×
UNCOV
196
        }
×
UNCOV
197

×
UNCOV
198
        b.blockQueue.Add(&blockEvent{
×
UNCOV
199
                eventType: connected,
×
UNCOV
200
                block:     block,
×
UNCOV
201
        })
×
202
}
203

204
// onFilteredBlockDisconnected is a callback which is executed once a block is
205
// disconnected from the end of the main chain.
206
func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32,
UNCOV
207
        header *wire.BlockHeader) {
×
UNCOV
208

×
UNCOV
209
        log.Debugf("got disconnected block at height %d: %v", height,
×
UNCOV
210
                header.BlockHash())
×
UNCOV
211

×
UNCOV
212
        filteredBlock := &FilteredBlock{
×
UNCOV
213
                Hash:   header.BlockHash(),
×
UNCOV
214
                Height: uint32(height),
×
UNCOV
215
        }
×
UNCOV
216

×
UNCOV
217
        b.blockQueue.Add(&blockEvent{
×
UNCOV
218
                eventType: disconnected,
×
UNCOV
219
                block:     filteredBlock,
×
UNCOV
220
        })
×
UNCOV
221
}
×
222

223
// filterBlockReq houses a request to manually filter a block specified by
224
// block hash.
225
type filterBlockReq struct {
226
        blockHash *chainhash.Hash
227
        resp      chan *FilteredBlock
228
        err       chan error
229
}
230

231
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
232
// result of applying the current registered UTXO sub-set on the block
233
// corresponding to that block hash. If any watched UTOX's are spent by the
234
// selected lock, then the internal chainFilter will also be updated.
235
//
236
// NOTE: This is part of the FilteredChainView interface.
UNCOV
237
func (b *BtcdFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
×
UNCOV
238
        req := &filterBlockReq{
×
UNCOV
239
                blockHash: blockHash,
×
UNCOV
240
                resp:      make(chan *FilteredBlock, 1),
×
UNCOV
241
                err:       make(chan error, 1),
×
UNCOV
242
        }
×
UNCOV
243

×
UNCOV
244
        select {
×
UNCOV
245
        case b.filterBlockReqs <- req:
×
246
        case <-b.quit:
×
247
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
248
        }
249

UNCOV
250
        return <-req.resp, <-req.err
×
251
}
252

253
// chainFilterer is the primary goroutine which: listens for new blocks coming
254
// and dispatches the relevant FilteredBlock notifications, updates the filter
255
// due to requests by callers, and finally is able to preform targeted block
256
// filtration.
257
//
258
// TODO(roasbeef): change to use loadfilter RPC's
UNCOV
259
func (b *BtcdFilteredChainView) chainFilterer() {
×
UNCOV
260
        defer b.wg.Done()
×
UNCOV
261

×
UNCOV
262
        // filterBlock is a helper function that scans the given block, and
×
UNCOV
263
        // notes which transactions spend outputs which are currently being
×
UNCOV
264
        // watched. Additionally, the chain filter will also be updated by
×
UNCOV
265
        // removing any spent outputs.
×
UNCOV
266
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
×
UNCOV
267
                b.filterMtx.Lock()
×
UNCOV
268
                defer b.filterMtx.Unlock()
×
UNCOV
269

×
UNCOV
270
                var filteredTxns []*wire.MsgTx
×
UNCOV
271
                for _, tx := range blk.Transactions {
×
UNCOV
272
                        var txAlreadyFiltered bool
×
UNCOV
273
                        for _, txIn := range tx.TxIn {
×
UNCOV
274
                                prevOp := txIn.PreviousOutPoint
×
UNCOV
275
                                if _, ok := b.chainFilter[prevOp]; !ok {
×
UNCOV
276
                                        continue
×
277
                                }
278

UNCOV
279
                                delete(b.chainFilter, prevOp)
×
UNCOV
280

×
UNCOV
281
                                // Only add this txn to our list of filtered
×
UNCOV
282
                                // txns if it is the first previous outpoint to
×
UNCOV
283
                                // cause a match.
×
UNCOV
284
                                if txAlreadyFiltered {
×
285
                                        continue
×
286
                                }
287

UNCOV
288
                                filteredTxns = append(filteredTxns, tx.Copy())
×
UNCOV
289
                                txAlreadyFiltered = true
×
290

291
                        }
292
                }
293

UNCOV
294
                return filteredTxns
×
295
        }
296

UNCOV
297
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
×
UNCOV
298
                height uint32) (*FilteredBlock, error) {
×
299
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
300
                if err != nil {
×
301
                        return nil, err
×
302

×
303
                }
×
304
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
305
                for _, str := range block.Transactions {
×
306
                        b, err := hex.DecodeString(str)
×
307
                        if err != nil {
×
308
                                return nil, err
×
309
                        }
×
310
                        tx := &wire.MsgTx{}
×
311
                        err = tx.Deserialize(bytes.NewReader(b))
×
312
                        if err != nil {
×
313
                                return nil, err
×
314
                        }
×
315
                        txs = append(txs, tx)
×
316
                }
317
                return &FilteredBlock{
×
318
                        Hash:         *hash,
×
319
                        Height:       height,
×
320
                        Transactions: txs,
×
321
                }, nil
×
322
        }
323

UNCOV
324
        for {
×
UNCOV
325
                select {
×
326
                // The caller has just sent an update to the current chain
327
                // filter, so we'll apply the update, possibly rewinding our
328
                // state partially.
UNCOV
329
                case update := <-b.filterUpdates:
×
UNCOV
330

×
UNCOV
331
                        // First, we'll add all the new UTXO's to the set of
×
UNCOV
332
                        // watched UTXO's, eliminating any duplicates in the
×
UNCOV
333
                        // process.
×
UNCOV
334
                        log.Tracef("Updating chain filter with new UTXO's: %v",
×
UNCOV
335
                                update.newUtxos)
×
UNCOV
336

×
UNCOV
337
                        b.filterMtx.Lock()
×
UNCOV
338
                        for _, newOp := range update.newUtxos {
×
UNCOV
339
                                b.chainFilter[newOp] = struct{}{}
×
UNCOV
340
                        }
×
UNCOV
341
                        b.filterMtx.Unlock()
×
UNCOV
342

×
UNCOV
343
                        // Apply the new TX filter to btcd, which will cause
×
UNCOV
344
                        // all following notifications from and calls to it
×
UNCOV
345
                        // return blocks filtered with the new filter.
×
UNCOV
346
                        b.btcdConn.LoadTxFilter(false, []btcutil.Address{},
×
UNCOV
347
                                update.newUtxos)
×
UNCOV
348

×
UNCOV
349
                        // All blocks gotten after we loaded the filter will
×
UNCOV
350
                        // have the filter applied, but we will need to rescan
×
UNCOV
351
                        // the blocks up to the height of the block we last
×
UNCOV
352
                        // added to the blockQueue.
×
UNCOV
353
                        b.bestHeightMtx.Lock()
×
UNCOV
354
                        bestHeight := b.bestHeight
×
UNCOV
355
                        b.bestHeightMtx.Unlock()
×
UNCOV
356

×
UNCOV
357
                        // If the update height matches our best known height,
×
UNCOV
358
                        // then we don't need to do any rewinding.
×
UNCOV
359
                        if update.updateHeight == bestHeight {
×
UNCOV
360
                                continue
×
361
                        }
362

363
                        // Otherwise, we'll rewind the state to ensure the
364
                        // caller doesn't miss any relevant notifications.
365
                        // Starting from the height _after_ the update height,
366
                        // we'll walk forwards, rescanning one block at a time
367
                        // with btcd applying the newly loaded filter to each
368
                        // block.
369
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
370
                                blockHash, err := b.btcdConn.GetBlockHash(int64(i))
×
371
                                if err != nil {
×
372
                                        log.Warnf("Unable to get block hash "+
×
373
                                                "for block at height %d: %v",
×
374
                                                i, err)
×
375
                                        continue
×
376
                                }
377

378
                                // To avoid dealing with the case where a reorg
379
                                // is happening while we rescan, we scan one
380
                                // block at a time, skipping blocks that might
381
                                // have gone missing.
382
                                rescanned, err := b.btcdConn.RescanBlocks(
×
383
                                        []chainhash.Hash{*blockHash})
×
384
                                if err != nil {
×
385
                                        log.Warnf("Unable to rescan block "+
×
386
                                                "with hash %v at height %d: %v",
×
387
                                                blockHash, i, err)
×
388
                                        continue
×
389
                                }
390

391
                                // If no block was returned from the rescan, it
392
                                // means no matching transactions were found.
393
                                if len(rescanned) != 1 {
×
394
                                        log.Tracef("rescan of block %v at "+
×
395
                                                "height=%d yielded no "+
×
396
                                                "transactions", blockHash, i)
×
397
                                        continue
×
398
                                }
399
                                decoded, err := decodeJSONBlock(
×
400
                                        &rescanned[0], uint32(i))
×
401
                                if err != nil {
×
402
                                        log.Errorf("Unable to decode block: %v",
×
403
                                                err)
×
404
                                        continue
×
405
                                }
406
                                b.blockQueue.Add(&blockEvent{
×
407
                                        eventType: connected,
×
408
                                        block:     decoded,
×
409
                                })
×
410
                        }
411

412
                // We've received a new request to manually filter a block.
UNCOV
413
                case req := <-b.filterBlockReqs:
×
UNCOV
414
                        // First we'll fetch the block itself as well as some
×
UNCOV
415
                        // additional information including its height.
×
UNCOV
416
                        block, err := b.GetBlock(req.blockHash)
×
UNCOV
417
                        if err != nil {
×
418
                                req.err <- err
×
419
                                req.resp <- nil
×
420
                                continue
×
421
                        }
UNCOV
422
                        header, err := b.btcdConn.GetBlockHeaderVerbose(req.blockHash)
×
UNCOV
423
                        if err != nil {
×
424
                                req.err <- err
×
425
                                req.resp <- nil
×
426
                                continue
×
427
                        }
428

429
                        // Once we have this info, we can directly filter the
430
                        // block and dispatch the proper notification.
UNCOV
431
                        req.resp <- &FilteredBlock{
×
UNCOV
432
                                Hash:         *req.blockHash,
×
UNCOV
433
                                Height:       uint32(header.Height),
×
UNCOV
434
                                Transactions: filterBlock(block),
×
UNCOV
435
                        }
×
UNCOV
436
                        req.err <- err
×
437

UNCOV
438
                case <-b.quit:
×
UNCOV
439
                        return
×
440
                }
441
        }
442
}
443

444
// filterUpdate is a message sent to the chainFilterer to update the current
445
// chainFilter state.
446
type filterUpdate struct {
447
        newUtxos     []wire.OutPoint
448
        updateHeight uint32
449
}
450

451
// UpdateFilter updates the UTXO filter which is to be consulted when creating
452
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
453
// meaning repeated calls to this method should _expand_ the size of the UTXO
454
// sub-set currently being watched.  If the set updateHeight is _lower_ than
455
// the best known height of the implementation, then the state should be
456
// rewound to ensure all relevant notifications are dispatched.
457
//
458
// NOTE: This is part of the FilteredChainView interface.
459
func (b *BtcdFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
UNCOV
460
        updateHeight uint32) error {
×
UNCOV
461

×
UNCOV
462
        newUtxos := make([]wire.OutPoint, len(ops))
×
UNCOV
463
        for i, op := range ops {
×
UNCOV
464
                newUtxos[i] = op.OutPoint
×
UNCOV
465
        }
×
466

UNCOV
467
        select {
×
468

469
        case b.filterUpdates <- filterUpdate{
470
                newUtxos:     newUtxos,
471
                updateHeight: updateHeight,
UNCOV
472
        }:
×
UNCOV
473
                return nil
×
474

475
        case <-b.quit:
×
476
                return fmt.Errorf("chain filter shutting down")
×
477
        }
478
}
479

480
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
481
// Each time a block is connected to the end of a main chain, and appropriate
482
// FilteredBlock which contains the transactions which mutate our watched UTXO
483
// set is to be returned.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
UNCOV
486
func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
×
UNCOV
487
        return b.blockQueue.newBlocks
×
UNCOV
488
}
×
489

490
// DisconnectedBlocks returns a receive only channel which will be sent upon
491
// with the empty filtered blocks of blocks which are disconnected from the
492
// main chain in the case of a re-org.
493
//
494
// NOTE: This is part of the FilteredChainView interface.
UNCOV
495
func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
×
UNCOV
496
        return b.blockQueue.staleBlocks
×
UNCOV
497
}
×
498

499
// GetBlock is used to retrieve the block with the given hash. This function
500
// wraps the blockCache's GetBlock function.
501
func (b *BtcdFilteredChainView) GetBlock(hash *chainhash.Hash) (
UNCOV
502
        *wire.MsgBlock, error) {
×
UNCOV
503

×
UNCOV
504
        return b.blockCache.GetBlock(hash, b.btcdConn.GetBlock)
×
UNCOV
505
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc