• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.92
/routing/chainview/btcd.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/rpcclient"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BtcdFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by an active websockets connection to btcd.
21
type BtcdFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        btcdConn *rpcclient.Client
33

34
        // blockEventQueue is the ordered queue used to keep the order
35
        // of connected and disconnected blocks sent to the reader of the
36
        // chainView.
37
        blockQueue *blockEventQueue
38

39
        // blockCache is an LRU block cache.
40
        blockCache *blockcache.BlockCache
41

42
        // filterUpdates is a channel in which updates to the utxo filter
43
        // attached to this instance are sent over.
44
        filterUpdates chan filterUpdate
45

46
        // chainFilter is the set of utox's that we're currently watching
47
        // spends for within the chain.
48
        filterMtx   sync.RWMutex
49
        chainFilter map[wire.OutPoint]struct{}
50

51
        // filterBlockReqs is a channel in which requests to filter select
52
        // blocks will be sent over.
53
        filterBlockReqs chan *filterBlockReq
54

55
        quit chan struct{}
56
        wg   sync.WaitGroup
57
}
58

59
// A compile time check to ensure BtcdFilteredChainView implements the
60
// chainview.FilteredChainView.
61
var _ FilteredChainView = (*BtcdFilteredChainView)(nil)
62

63
// NewBtcdFilteredChainView creates a new instance of a FilteredChainView from
64
// RPC credentials for an active btcd instance.
65
func NewBtcdFilteredChainView(config rpcclient.ConnConfig,
66
        blockCache *blockcache.BlockCache) (*BtcdFilteredChainView, error) {
1✔
67

1✔
68
        chainView := &BtcdFilteredChainView{
1✔
69
                chainFilter:     make(map[wire.OutPoint]struct{}),
1✔
70
                filterUpdates:   make(chan filterUpdate),
1✔
71
                filterBlockReqs: make(chan *filterBlockReq),
1✔
72
                blockCache:      blockCache,
1✔
73
                quit:            make(chan struct{}),
1✔
74
        }
1✔
75

1✔
76
        ntfnCallbacks := &rpcclient.NotificationHandlers{
1✔
77
                OnFilteredBlockConnected:    chainView.onFilteredBlockConnected,
1✔
78
                OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected,
1✔
79
        }
1✔
80

1✔
81
        // Disable connecting to btcd within the rpcclient.New method. We
1✔
82
        // defer establishing the connection to our .Start() method.
1✔
83
        config.DisableConnectOnNew = true
1✔
84
        config.DisableAutoReconnect = false
1✔
85
        chainConn, err := rpcclient.New(&config, ntfnCallbacks)
1✔
86
        if err != nil {
1✔
87
                return nil, err
×
88
        }
×
89
        chainView.btcdConn = chainConn
1✔
90

1✔
91
        chainView.blockQueue = newBlockEventQueue()
1✔
92

1✔
93
        return chainView, nil
1✔
94
}
95

96
// Start starts all goroutines necessary for normal operation.
97
//
98
// NOTE: This is part of the FilteredChainView interface.
99
func (b *BtcdFilteredChainView) Start() error {
1✔
100
        // Already started?
1✔
101
        if atomic.AddInt32(&b.started, 1) != 1 {
1✔
102
                return nil
×
103
        }
×
104

105
        log.Infof("FilteredChainView starting")
1✔
106

1✔
107
        // Connect to btcd, and register for notifications on connected, and
1✔
108
        // disconnected blocks.
1✔
109
        if err := b.btcdConn.Connect(20); err != nil {
1✔
110
                return err
×
111
        }
×
112
        if err := b.btcdConn.NotifyBlocks(); err != nil {
1✔
113
                return err
×
114
        }
×
115

116
        _, bestHeight, err := b.btcdConn.GetBestBlock()
1✔
117
        if err != nil {
1✔
118
                return err
×
119
        }
×
120

121
        b.bestHeightMtx.Lock()
1✔
122
        b.bestHeight = uint32(bestHeight)
1✔
123
        b.bestHeightMtx.Unlock()
1✔
124

1✔
125
        b.blockQueue.Start()
1✔
126

1✔
127
        b.wg.Add(1)
1✔
128
        go b.chainFilterer()
1✔
129

1✔
130
        return nil
1✔
131
}
132

133
// Stop stops all goroutines which we launched by the prior call to the Start
134
// method.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (b *BtcdFilteredChainView) Stop() error {
1✔
138
        log.Debug("BtcdFilteredChainView stopping")
1✔
139
        defer log.Debug("BtcdFilteredChainView stopped")
1✔
140

1✔
141
        // Already shutting down?
1✔
142
        if atomic.AddInt32(&b.stopped, 1) != 1 {
1✔
143
                return nil
×
144
        }
×
145

146
        // Shutdown the rpc client, this gracefully disconnects from btcd, and
147
        // cleans up all related resources.
148
        b.btcdConn.Shutdown()
1✔
149
        b.btcdConn.WaitForShutdown()
1✔
150

1✔
151
        b.blockQueue.Stop()
1✔
152

1✔
153
        close(b.quit)
1✔
154
        b.wg.Wait()
1✔
155

1✔
156
        return nil
1✔
157
}
158

159
// onFilteredBlockConnected is called for each block that's connected to the
160
// end of the main chain. Based on our current chain filter, the block may or
161
// may not include any relevant transactions.
162
func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
163
        header *wire.BlockHeader, txns []*btcutil.Tx) {
1✔
164

1✔
165
        mtxs := make([]*wire.MsgTx, len(txns))
1✔
166
        b.filterMtx.Lock()
1✔
167
        for i, tx := range txns {
2✔
168
                mtx := tx.MsgTx()
1✔
169
                mtxs[i] = mtx
1✔
170

1✔
171
                for _, txIn := range mtx.TxIn {
2✔
172
                        // We can delete this outpoint from the chainFilter, as
1✔
173
                        // we just received a block where it was spent. In case
1✔
174
                        // of a reorg, this outpoint might get "un-spent", but
1✔
175
                        // that's okay since it would never be wise to consider
1✔
176
                        // the channel open again (since a spending transaction
1✔
177
                        // exists on the network).
1✔
178
                        delete(b.chainFilter, txIn.PreviousOutPoint)
1✔
179
                }
1✔
180

181
        }
182
        b.filterMtx.Unlock()
1✔
183

1✔
184
        // We record the height of the last connected block added to the
1✔
185
        // blockQueue such that we can scan up to this height in case of
1✔
186
        // a rescan. It must be protected by a mutex since a filter update
1✔
187
        // might be trying to read it concurrently.
1✔
188
        b.bestHeightMtx.Lock()
1✔
189
        b.bestHeight = uint32(height)
1✔
190
        b.bestHeightMtx.Unlock()
1✔
191

1✔
192
        block := &FilteredBlock{
1✔
193
                Hash:         header.BlockHash(),
1✔
194
                Height:       uint32(height),
1✔
195
                Transactions: mtxs,
1✔
196
        }
1✔
197

1✔
198
        b.blockQueue.Add(&blockEvent{
1✔
199
                eventType: connected,
1✔
200
                block:     block,
1✔
201
        })
1✔
202
}
203

204
// onFilteredBlockDisconnected is a callback which is executed once a block is
205
// disconnected from the end of the main chain.
206
func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32,
207
        header *wire.BlockHeader) {
1✔
208

1✔
209
        log.Debugf("got disconnected block at height %d: %v", height,
1✔
210
                header.BlockHash())
1✔
211

1✔
212
        filteredBlock := &FilteredBlock{
1✔
213
                Hash:   header.BlockHash(),
1✔
214
                Height: uint32(height),
1✔
215
        }
1✔
216

1✔
217
        b.blockQueue.Add(&blockEvent{
1✔
218
                eventType: disconnected,
1✔
219
                block:     filteredBlock,
1✔
220
        })
1✔
221
}
1✔
222

223
// filterBlockReq houses a request to manually filter a block specified by
224
// block hash.
225
type filterBlockReq struct {
226
        blockHash *chainhash.Hash
227
        resp      chan *FilteredBlock
228
        err       chan error
229
}
230

231
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
232
// result of applying the current registered UTXO sub-set on the block
233
// corresponding to that block hash. If any watched UTOX's are spent by the
234
// selected lock, then the internal chainFilter will also be updated.
235
//
236
// NOTE: This is part of the FilteredChainView interface.
237
func (b *BtcdFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
238
        req := &filterBlockReq{
1✔
239
                blockHash: blockHash,
1✔
240
                resp:      make(chan *FilteredBlock, 1),
1✔
241
                err:       make(chan error, 1),
1✔
242
        }
1✔
243

1✔
244
        select {
1✔
245
        case b.filterBlockReqs <- req:
1✔
246
        case <-b.quit:
×
247
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
248
        }
249

250
        return <-req.resp, <-req.err
1✔
251
}
252

253
// chainFilterer is the primary goroutine which: listens for new blocks coming
254
// and dispatches the relevant FilteredBlock notifications, updates the filter
255
// due to requests by callers, and finally is able to preform targeted block
256
// filtration.
257
//
258
// TODO(roasbeef): change to use loadfilter RPC's
259
func (b *BtcdFilteredChainView) chainFilterer() {
1✔
260
        defer b.wg.Done()
1✔
261

1✔
262
        // filterBlock is a helper function that scans the given block, and
1✔
263
        // notes which transactions spend outputs which are currently being
1✔
264
        // watched. Additionally, the chain filter will also be updated by
1✔
265
        // removing any spent outputs.
1✔
266
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
2✔
267
                b.filterMtx.Lock()
1✔
268
                defer b.filterMtx.Unlock()
1✔
269

1✔
270
                var filteredTxns []*wire.MsgTx
1✔
271
                for _, tx := range blk.Transactions {
2✔
272
                        var txAlreadyFiltered bool
1✔
273
                        for _, txIn := range tx.TxIn {
2✔
274
                                prevOp := txIn.PreviousOutPoint
1✔
275
                                if _, ok := b.chainFilter[prevOp]; !ok {
2✔
276
                                        continue
1✔
277
                                }
278

279
                                delete(b.chainFilter, prevOp)
1✔
280

1✔
281
                                // Only add this txn to our list of filtered
1✔
282
                                // txns if it is the first previous outpoint to
1✔
283
                                // cause a match.
1✔
284
                                if txAlreadyFiltered {
1✔
285
                                        continue
×
286
                                }
287

288
                                filteredTxns = append(filteredTxns, tx.Copy())
1✔
289
                                txAlreadyFiltered = true
1✔
290

291
                        }
292
                }
293

294
                return filteredTxns
1✔
295
        }
296

297
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
1✔
298
                height uint32) (*FilteredBlock, error) {
1✔
UNCOV
299
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
UNCOV
300
                if err != nil {
×
301
                        return nil, err
×
302

×
303
                }
×
UNCOV
304
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
UNCOV
305
                for _, str := range block.Transactions {
×
UNCOV
306
                        b, err := hex.DecodeString(str)
×
UNCOV
307
                        if err != nil {
×
308
                                return nil, err
×
309
                        }
×
UNCOV
310
                        tx := &wire.MsgTx{}
×
UNCOV
311
                        err = tx.Deserialize(bytes.NewReader(b))
×
UNCOV
312
                        if err != nil {
×
313
                                return nil, err
×
314
                        }
×
UNCOV
315
                        txs = append(txs, tx)
×
316
                }
UNCOV
317
                return &FilteredBlock{
×
UNCOV
318
                        Hash:         *hash,
×
UNCOV
319
                        Height:       height,
×
UNCOV
320
                        Transactions: txs,
×
UNCOV
321
                }, nil
×
322
        }
323

324
        for {
2✔
325
                select {
1✔
326
                // The caller has just sent an update to the current chain
327
                // filter, so we'll apply the update, possibly rewinding our
328
                // state partially.
329
                case update := <-b.filterUpdates:
1✔
330

1✔
331
                        // First, we'll add all the new UTXO's to the set of
1✔
332
                        // watched UTXO's, eliminating any duplicates in the
1✔
333
                        // process.
1✔
334
                        log.Tracef("Updating chain filter with new UTXO's: %v",
1✔
335
                                update.newUtxos)
1✔
336

1✔
337
                        b.filterMtx.Lock()
1✔
338
                        for _, newOp := range update.newUtxos {
2✔
339
                                b.chainFilter[newOp] = struct{}{}
1✔
340
                        }
1✔
341
                        b.filterMtx.Unlock()
1✔
342

1✔
343
                        // Apply the new TX filter to btcd, which will cause
1✔
344
                        // all following notifications from and calls to it
1✔
345
                        // return blocks filtered with the new filter.
1✔
346
                        b.btcdConn.LoadTxFilter(false, []btcutil.Address{},
1✔
347
                                update.newUtxos)
1✔
348

1✔
349
                        // All blocks gotten after we loaded the filter will
1✔
350
                        // have the filter applied, but we will need to rescan
1✔
351
                        // the blocks up to the height of the block we last
1✔
352
                        // added to the blockQueue.
1✔
353
                        b.bestHeightMtx.Lock()
1✔
354
                        bestHeight := b.bestHeight
1✔
355
                        b.bestHeightMtx.Unlock()
1✔
356

1✔
357
                        // If the update height matches our best known height,
1✔
358
                        // then we don't need to do any rewinding.
1✔
359
                        if update.updateHeight == bestHeight {
2✔
360
                                continue
1✔
361
                        }
362

363
                        // Otherwise, we'll rewind the state to ensure the
364
                        // caller doesn't miss any relevant notifications.
365
                        // Starting from the height _after_ the update height,
366
                        // we'll walk forwards, rescanning one block at a time
367
                        // with btcd applying the newly loaded filter to each
368
                        // block.
UNCOV
369
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
UNCOV
370
                                blockHash, err := b.btcdConn.GetBlockHash(int64(i))
×
UNCOV
371
                                if err != nil {
×
372
                                        log.Warnf("Unable to get block hash "+
×
373
                                                "for block at height %d: %v",
×
374
                                                i, err)
×
375
                                        continue
×
376
                                }
377

378
                                // To avoid dealing with the case where a reorg
379
                                // is happening while we rescan, we scan one
380
                                // block at a time, skipping blocks that might
381
                                // have gone missing.
UNCOV
382
                                rescanned, err := b.btcdConn.RescanBlocks(
×
UNCOV
383
                                        []chainhash.Hash{*blockHash})
×
UNCOV
384
                                if err != nil {
×
385
                                        log.Warnf("Unable to rescan block "+
×
386
                                                "with hash %v at height %d: %v",
×
387
                                                blockHash, i, err)
×
388
                                        continue
×
389
                                }
390

391
                                // If no block was returned from the rescan, it
392
                                // means no matching transactions were found.
UNCOV
393
                                if len(rescanned) != 1 {
×
394
                                        log.Tracef("rescan of block %v at "+
×
395
                                                "height=%d yielded no "+
×
396
                                                "transactions", blockHash, i)
×
397
                                        continue
×
398
                                }
UNCOV
399
                                decoded, err := decodeJSONBlock(
×
UNCOV
400
                                        &rescanned[0], uint32(i))
×
UNCOV
401
                                if err != nil {
×
402
                                        log.Errorf("Unable to decode block: %v",
×
403
                                                err)
×
404
                                        continue
×
405
                                }
UNCOV
406
                                b.blockQueue.Add(&blockEvent{
×
UNCOV
407
                                        eventType: connected,
×
UNCOV
408
                                        block:     decoded,
×
UNCOV
409
                                })
×
410
                        }
411

412
                // We've received a new request to manually filter a block.
413
                case req := <-b.filterBlockReqs:
1✔
414
                        // First we'll fetch the block itself as well as some
1✔
415
                        // additional information including its height.
1✔
416
                        block, err := b.GetBlock(req.blockHash)
1✔
417
                        if err != nil {
1✔
418
                                req.err <- err
×
419
                                req.resp <- nil
×
420
                                continue
×
421
                        }
422
                        header, err := b.btcdConn.GetBlockHeaderVerbose(req.blockHash)
1✔
423
                        if err != nil {
1✔
424
                                req.err <- err
×
425
                                req.resp <- nil
×
426
                                continue
×
427
                        }
428

429
                        // Once we have this info, we can directly filter the
430
                        // block and dispatch the proper notification.
431
                        req.resp <- &FilteredBlock{
1✔
432
                                Hash:         *req.blockHash,
1✔
433
                                Height:       uint32(header.Height),
1✔
434
                                Transactions: filterBlock(block),
1✔
435
                        }
1✔
436
                        req.err <- err
1✔
437

438
                case <-b.quit:
1✔
439
                        return
1✔
440
                }
441
        }
442
}
443

444
// filterUpdate is a message sent to the chainFilterer to update the current
445
// chainFilter state.
446
type filterUpdate struct {
447
        newUtxos     []wire.OutPoint
448
        updateHeight uint32
449
}
450

451
// UpdateFilter updates the UTXO filter which is to be consulted when creating
452
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
453
// meaning repeated calls to this method should _expand_ the size of the UTXO
454
// sub-set currently being watched.  If the set updateHeight is _lower_ than
455
// the best known height of the implementation, then the state should be
456
// rewound to ensure all relevant notifications are dispatched.
457
//
458
// NOTE: This is part of the FilteredChainView interface.
459
func (b *BtcdFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
460
        updateHeight uint32) error {
1✔
461

1✔
462
        newUtxos := make([]wire.OutPoint, len(ops))
1✔
463
        for i, op := range ops {
2✔
464
                newUtxos[i] = op.OutPoint
1✔
465
        }
1✔
466

467
        select {
1✔
468

469
        case b.filterUpdates <- filterUpdate{
470
                newUtxos:     newUtxos,
471
                updateHeight: updateHeight,
472
        }:
1✔
473
                return nil
1✔
474

475
        case <-b.quit:
×
476
                return fmt.Errorf("chain filter shutting down")
×
477
        }
478
}
479

480
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
481
// Each time a block is connected to the end of a main chain, and appropriate
482
// FilteredBlock which contains the transactions which mutate our watched UTXO
483
// set is to be returned.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
486
func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
1✔
487
        return b.blockQueue.newBlocks
1✔
488
}
1✔
489

490
// DisconnectedBlocks returns a receive only channel which will be sent upon
491
// with the empty filtered blocks of blocks which are disconnected from the
492
// main chain in the case of a re-org.
493
//
494
// NOTE: This is part of the FilteredChainView interface.
495
func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
496
        return b.blockQueue.staleBlocks
1✔
497
}
1✔
498

499
// GetBlock is used to retrieve the block with the given hash. This function
500
// wraps the blockCache's GetBlock function.
501
func (b *BtcdFilteredChainView) GetBlock(hash *chainhash.Hash) (
502
        *wire.MsgBlock, error) {
1✔
503

1✔
504
        return b.blockCache.GetBlock(hash, b.btcdConn.GetBlock)
1✔
505
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc