• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.7
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
1✔
70

1✔
71
        chainView := &BitcoindFilteredChainView{
1✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
1✔
73
                filterUpdates:   make(chan filterUpdate),
1✔
74
                filterBlockReqs: make(chan *filterBlockReq),
1✔
75
                blockCache:      blockCache,
1✔
76
                quit:            make(chan struct{}),
1✔
77
        }
1✔
78

1✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
1✔
80
        chainView.blockQueue = newBlockEventQueue()
1✔
81

1✔
82
        return chainView
1✔
83
}
1✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
1✔
89
        // Already started?
1✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
1✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
1✔
95

1✔
96
        err := b.chainClient.Start()
1✔
97
        if err != nil {
1✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
1✔
102
        if err != nil {
1✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
1✔
107
        if err != nil {
1✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
1✔
112
        b.bestHeight = uint32(bestHeight)
1✔
113
        b.bestHeightMtx.Unlock()
1✔
114

1✔
115
        b.blockQueue.Start()
1✔
116

1✔
117
        b.wg.Add(1)
1✔
118
        go b.chainFilterer()
1✔
119

1✔
120
        return nil
1✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
1✔
128
        log.Debug("BitcoindFilteredChainView stopping")
1✔
129
        defer log.Debug("BitcoindFilteredChainView stopped")
1✔
130

1✔
131
        // Already shutting down?
1✔
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
1✔
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
138
        b.chainClient.Stop()
1✔
139
        b.chainClient.WaitForShutdown()
1✔
140

1✔
141
        b.blockQueue.Stop()
1✔
142

1✔
143
        close(b.quit)
1✔
144
        b.wg.Wait()
1✔
145

1✔
146
        return nil
1✔
147
}
148

149
// onFilteredBlockConnected is called for each block that's connected to the
150
// end of the main chain. Based on our current chain filter, the block may or
151
// may not include any relevant transactions.
152
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
153
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
1✔
154

1✔
155
        mtxs := make([]*wire.MsgTx, len(txns))
1✔
156
        b.filterMtx.Lock()
1✔
157
        for i, tx := range txns {
2✔
158
                mtxs[i] = &tx.MsgTx
1✔
159

1✔
160
                for _, txIn := range mtxs[i].TxIn {
2✔
161
                        // We can delete this outpoint from the chainFilter, as
1✔
162
                        // we just received a block where it was spent. In case
1✔
163
                        // of a reorg, this outpoint might get "un-spent", but
1✔
164
                        // that's okay since it would never be wise to consider
1✔
165
                        // the channel open again (since a spending transaction
1✔
166
                        // exists on the network).
1✔
167
                        delete(b.chainFilter, txIn.PreviousOutPoint)
1✔
168
                }
1✔
169

170
        }
171
        b.filterMtx.Unlock()
1✔
172

1✔
173
        // We record the height of the last connected block added to the
1✔
174
        // blockQueue such that we can scan up to this height in case of
1✔
175
        // a rescan. It must be protected by a mutex since a filter update
1✔
176
        // might be trying to read it concurrently.
1✔
177
        b.bestHeightMtx.Lock()
1✔
178
        b.bestHeight = uint32(height)
1✔
179
        b.bestHeightMtx.Unlock()
1✔
180

1✔
181
        block := &FilteredBlock{
1✔
182
                Hash:         hash,
1✔
183
                Height:       uint32(height),
1✔
184
                Transactions: mtxs,
1✔
185
        }
1✔
186

1✔
187
        b.blockQueue.Add(&blockEvent{
1✔
188
                eventType: connected,
1✔
189
                block:     block,
1✔
190
        })
1✔
191
}
192

193
// onFilteredBlockDisconnected is a callback which is executed once a block is
194
// disconnected from the end of the main chain.
195
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
196
        hash chainhash.Hash) {
1✔
197

1✔
198
        log.Debugf("got disconnected block at height %d: %v", height,
1✔
199
                hash)
1✔
200

1✔
201
        filteredBlock := &FilteredBlock{
1✔
202
                Hash:   hash,
1✔
203
                Height: uint32(height),
1✔
204
        }
1✔
205

1✔
206
        b.blockQueue.Add(&blockEvent{
1✔
207
                eventType: disconnected,
1✔
208
                block:     filteredBlock,
1✔
209
        })
1✔
210
}
1✔
211

212
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
213
// result of applying the current registered UTXO sub-set on the block
214
// corresponding to that block hash. If any watched UTOX's are spent by the
215
// selected lock, then the internal chainFilter will also be updated.
216
//
217
// NOTE: This is part of the FilteredChainView interface.
218
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
219
        req := &filterBlockReq{
1✔
220
                blockHash: blockHash,
1✔
221
                resp:      make(chan *FilteredBlock, 1),
1✔
222
                err:       make(chan error, 1),
1✔
223
        }
1✔
224

1✔
225
        select {
1✔
226
        case b.filterBlockReqs <- req:
1✔
227
        case <-b.quit:
×
228
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
229
        }
230

231
        return <-req.resp, <-req.err
1✔
232
}
233

234
// chainFilterer is the primary goroutine which: listens for new blocks coming
235
// and dispatches the relevant FilteredBlock notifications, updates the filter
236
// due to requests by callers, and finally is able to perform targeted block
237
// filtration.
238
//
239
// TODO(roasbeef): change to use loadfilter RPC's
240
func (b *BitcoindFilteredChainView) chainFilterer() {
1✔
241
        defer b.wg.Done()
1✔
242

1✔
243
        // filterBlock is a helper function that scans the given block, and
1✔
244
        // notes which transactions spend outputs which are currently being
1✔
245
        // watched. Additionally, the chain filter will also be updated by
1✔
246
        // removing any spent outputs.
1✔
247
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
2✔
248
                b.filterMtx.Lock()
1✔
249
                defer b.filterMtx.Unlock()
1✔
250

1✔
251
                var filteredTxns []*wire.MsgTx
1✔
252
                for _, tx := range blk.Transactions {
2✔
253
                        var txAlreadyFiltered bool
1✔
254
                        for _, txIn := range tx.TxIn {
2✔
255
                                prevOp := txIn.PreviousOutPoint
1✔
256
                                if _, ok := b.chainFilter[prevOp]; !ok {
2✔
257
                                        continue
1✔
258
                                }
259

260
                                delete(b.chainFilter, prevOp)
1✔
261

1✔
262
                                // Only add this txn to our list of filtered
1✔
263
                                // txns if it is the first previous outpoint to
1✔
264
                                // cause a match.
1✔
265
                                if txAlreadyFiltered {
1✔
266
                                        continue
×
267
                                }
268

269
                                filteredTxns = append(filteredTxns, tx.Copy())
1✔
270
                                txAlreadyFiltered = true
1✔
271
                        }
272
                }
273

274
                return filteredTxns
1✔
275
        }
276

277
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
1✔
278
                height uint32) (*FilteredBlock, error) {
1✔
UNCOV
279
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
UNCOV
280
                if err != nil {
×
281
                        return nil, err
×
282

×
283
                }
×
UNCOV
284
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
UNCOV
285
                for _, str := range block.Transactions {
×
UNCOV
286
                        b, err := hex.DecodeString(str)
×
UNCOV
287
                        if err != nil {
×
288
                                return nil, err
×
289
                        }
×
UNCOV
290
                        tx := &wire.MsgTx{}
×
UNCOV
291
                        err = tx.Deserialize(bytes.NewReader(b))
×
UNCOV
292
                        if err != nil {
×
293
                                return nil, err
×
294
                        }
×
UNCOV
295
                        txs = append(txs, tx)
×
296
                }
UNCOV
297
                return &FilteredBlock{
×
UNCOV
298
                        Hash:         *hash,
×
UNCOV
299
                        Height:       height,
×
UNCOV
300
                        Transactions: txs,
×
UNCOV
301
                }, nil
×
302
        }
303

304
        for {
2✔
305
                select {
1✔
306
                // The caller has just sent an update to the current chain
307
                // filter, so we'll apply the update, possibly rewinding our
308
                // state partially.
309
                case update := <-b.filterUpdates:
1✔
310
                        // First, we'll add all the new UTXO's to the set of
1✔
311
                        // watched UTXO's, eliminating any duplicates in the
1✔
312
                        // process.
1✔
313
                        log.Tracef("Updating chain filter with new UTXO's: %v",
1✔
314
                                update.newUtxos)
1✔
315

1✔
316
                        b.filterMtx.Lock()
1✔
317
                        for _, newOp := range update.newUtxos {
2✔
318
                                b.chainFilter[newOp] = struct{}{}
1✔
319
                        }
1✔
320
                        b.filterMtx.Unlock()
1✔
321

1✔
322
                        // Apply the new TX filter to the chain client, which
1✔
323
                        // will cause all following notifications from and
1✔
324
                        // calls to it return blocks filtered with the new
1✔
325
                        // filter.
1✔
326
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
1✔
327
                        if err != nil {
1✔
328
                                log.Errorf("Unable to update filter: %v", err)
×
329
                                continue
×
330
                        }
331

332
                        // All blocks gotten after we loaded the filter will
333
                        // have the filter applied, but we will need to rescan
334
                        // the blocks up to the height of the block we last
335
                        // added to the blockQueue.
336
                        b.bestHeightMtx.Lock()
1✔
337
                        bestHeight := b.bestHeight
1✔
338
                        b.bestHeightMtx.Unlock()
1✔
339

1✔
340
                        // If the update height matches our best known height,
1✔
341
                        // then we don't need to do any rewinding.
1✔
342
                        if update.updateHeight == bestHeight {
2✔
343
                                continue
1✔
344
                        }
345

346
                        // Otherwise, we'll rewind the state to ensure the
347
                        // caller doesn't miss any relevant notifications.
348
                        // Starting from the height _after_ the update height,
349
                        // we'll walk forwards, rescanning one block at a time
350
                        // with the chain client applying the newly loaded
351
                        // filter to each block.
UNCOV
352
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
UNCOV
353
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
×
UNCOV
354
                                if err != nil {
×
355
                                        log.Warnf("Unable to get block hash "+
×
356
                                                "for block at height %d: %v",
×
357
                                                i, err)
×
358
                                        continue
×
359
                                }
360

361
                                // To avoid dealing with the case where a reorg
362
                                // is happening while we rescan, we scan one
363
                                // block at a time, skipping blocks that might
364
                                // have gone missing.
UNCOV
365
                                rescanned, err := b.chainClient.RescanBlocks(
×
UNCOV
366
                                        []chainhash.Hash{*blockHash},
×
UNCOV
367
                                )
×
UNCOV
368
                                if err != nil {
×
369
                                        log.Warnf("Unable to rescan block "+
×
370
                                                "with hash %v at height %d: %v",
×
371
                                                blockHash, i, err)
×
372
                                        continue
×
373
                                }
374

375
                                // If no block was returned from the rescan, it
376
                                // means no matching transactions were found.
UNCOV
377
                                if len(rescanned) != 1 {
×
378
                                        log.Tracef("rescan of block %v at "+
×
379
                                                "height=%d yielded no "+
×
380
                                                "transactions", blockHash, i)
×
381
                                        continue
×
382
                                }
UNCOV
383
                                decoded, err := decodeJSONBlock(
×
UNCOV
384
                                        &rescanned[0], i,
×
UNCOV
385
                                )
×
UNCOV
386
                                if err != nil {
×
387
                                        log.Errorf("Unable to decode block: %v",
×
388
                                                err)
×
389
                                        continue
×
390
                                }
UNCOV
391
                                b.blockQueue.Add(&blockEvent{
×
UNCOV
392
                                        eventType: connected,
×
UNCOV
393
                                        block:     decoded,
×
UNCOV
394
                                })
×
395
                        }
396

397
                // We've received a new request to manually filter a block.
398
                case req := <-b.filterBlockReqs:
1✔
399
                        // First we'll fetch the block itself as well as some
1✔
400
                        // additional information including its height.
1✔
401
                        block, err := b.GetBlock(req.blockHash)
1✔
402
                        if err != nil {
1✔
403
                                req.err <- err
×
404
                                req.resp <- nil
×
405
                                continue
×
406
                        }
407
                        header, err := b.chainClient.GetBlockHeaderVerbose(
1✔
408
                                req.blockHash)
1✔
409
                        if err != nil {
1✔
410
                                req.err <- err
×
411
                                req.resp <- nil
×
412
                                continue
×
413
                        }
414

415
                        // Once we have this info, we can directly filter the
416
                        // block and dispatch the proper notification.
417
                        req.resp <- &FilteredBlock{
1✔
418
                                Hash:         *req.blockHash,
1✔
419
                                Height:       uint32(header.Height),
1✔
420
                                Transactions: filterBlock(block),
1✔
421
                        }
1✔
422
                        req.err <- err
1✔
423

424
                // We've received a new event from the chain client.
425
                case event := <-b.chainClient.Notifications():
1✔
426
                        switch e := event.(type) {
1✔
427

428
                        case chain.FilteredBlockConnected:
1✔
429
                                b.onFilteredBlockConnected(
1✔
430
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
1✔
431
                                )
1✔
432

433
                        case chain.BlockDisconnected:
1✔
434
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
1✔
435
                        }
436

437
                case <-b.quit:
1✔
438
                        return
1✔
439
                }
440
        }
441
}
442

443
// UpdateFilter updates the UTXO filter which is to be consulted when creating
444
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
445
// meaning repeated calls to this method should _expand_ the size of the UTXO
446
// sub-set currently being watched.  If the set updateHeight is _lower_ than
447
// the best known height of the implementation, then the state should be
448
// rewound to ensure all relevant notifications are dispatched.
449
//
450
// NOTE: This is part of the FilteredChainView interface.
451
func (b *BitcoindFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
452
        updateHeight uint32) error {
1✔
453

1✔
454
        newUtxos := make([]wire.OutPoint, len(ops))
1✔
455
        for i, op := range ops {
2✔
456
                newUtxos[i] = op.OutPoint
1✔
457
        }
1✔
458

459
        select {
1✔
460

461
        case b.filterUpdates <- filterUpdate{
462
                newUtxos:     newUtxos,
463
                updateHeight: updateHeight,
464
        }:
1✔
465
                return nil
1✔
466

467
        case <-b.quit:
×
468
                return fmt.Errorf("chain filter shutting down")
×
469
        }
470
}
471

472
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
473
// Each time a block is connected to the end of a main chain, and appropriate
474
// FilteredBlock which contains the transactions which mutate our watched UTXO
475
// set is to be returned.
476
//
477
// NOTE: This is part of the FilteredChainView interface.
478
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
1✔
479
        return b.blockQueue.newBlocks
1✔
480
}
1✔
481

482
// DisconnectedBlocks returns a receive only channel which will be sent upon
483
// with the empty filtered blocks of blocks which are disconnected from the
484
// main chain in the case of a re-org.
485
//
486
// NOTE: This is part of the FilteredChainView interface.
487
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
488
        return b.blockQueue.staleBlocks
1✔
489
}
1✔
490

491
// GetBlock is used to retrieve the block with the given hash. This function
492
// wraps the blockCache's GetBlock function.
493
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
494
        *wire.MsgBlock, error) {
1✔
495

1✔
496
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
1✔
497
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc