• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14446551845

14 Apr 2025 01:12PM UTC coverage: 57.404% (-1.2%) from 58.624%
14446551845

push

github

web-flow
Merge pull request #9703 from yyforyongyu/fix-attempt-hash

Patch htlc attempt hash for legacy payments

12 of 26 new or added lines in 1 file covered. (46.15%)

2039 existing lines in 44 files now uncovered.

95138 of 165734 relevant lines covered (57.4%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
UNCOV
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
×
UNCOV
70

×
UNCOV
71
        chainView := &BitcoindFilteredChainView{
×
UNCOV
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
×
UNCOV
73
                filterUpdates:   make(chan filterUpdate),
×
UNCOV
74
                filterBlockReqs: make(chan *filterBlockReq),
×
UNCOV
75
                blockCache:      blockCache,
×
UNCOV
76
                quit:            make(chan struct{}),
×
UNCOV
77
        }
×
UNCOV
78

×
UNCOV
79
        chainView.chainClient = chainConn.NewBitcoindClient()
×
UNCOV
80
        chainView.blockQueue = newBlockEventQueue()
×
UNCOV
81

×
UNCOV
82
        return chainView
×
UNCOV
83
}
×
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
UNCOV
88
func (b *BitcoindFilteredChainView) Start() error {
×
UNCOV
89
        // Already started?
×
UNCOV
90
        if atomic.AddInt32(&b.started, 1) != 1 {
×
91
                return nil
×
92
        }
×
93

UNCOV
94
        log.Infof("FilteredChainView starting")
×
UNCOV
95

×
UNCOV
96
        err := b.chainClient.Start()
×
UNCOV
97
        if err != nil {
×
98
                return err
×
99
        }
×
100

UNCOV
101
        err = b.chainClient.NotifyBlocks()
×
UNCOV
102
        if err != nil {
×
103
                return err
×
104
        }
×
105

UNCOV
106
        _, bestHeight, err := b.chainClient.GetBestBlock()
×
UNCOV
107
        if err != nil {
×
108
                return err
×
109
        }
×
110

UNCOV
111
        b.bestHeightMtx.Lock()
×
UNCOV
112
        b.bestHeight = uint32(bestHeight)
×
UNCOV
113
        b.bestHeightMtx.Unlock()
×
UNCOV
114

×
UNCOV
115
        b.blockQueue.Start()
×
UNCOV
116

×
UNCOV
117
        b.wg.Add(1)
×
UNCOV
118
        go b.chainFilterer()
×
UNCOV
119

×
UNCOV
120
        return nil
×
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
UNCOV
127
func (b *BitcoindFilteredChainView) Stop() error {
×
UNCOV
128
        log.Debug("BitcoindFilteredChainView stopping")
×
UNCOV
129
        defer log.Debug("BitcoindFilteredChainView stopped")
×
UNCOV
130

×
UNCOV
131
        // Already shutting down?
×
UNCOV
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
×
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
UNCOV
138
        b.chainClient.Stop()
×
UNCOV
139
        b.chainClient.WaitForShutdown()
×
UNCOV
140

×
UNCOV
141
        b.blockQueue.Stop()
×
UNCOV
142

×
UNCOV
143
        close(b.quit)
×
UNCOV
144
        b.wg.Wait()
×
UNCOV
145

×
UNCOV
146
        return nil
×
147
}
148

149
// onFilteredBlockConnected is called for each block that's connected to the
150
// end of the main chain. Based on our current chain filter, the block may or
151
// may not include any relevant transactions.
152
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
UNCOV
153
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
×
UNCOV
154

×
UNCOV
155
        mtxs := make([]*wire.MsgTx, len(txns))
×
UNCOV
156
        b.filterMtx.Lock()
×
UNCOV
157
        for i, tx := range txns {
×
UNCOV
158
                mtxs[i] = &tx.MsgTx
×
UNCOV
159

×
UNCOV
160
                for _, txIn := range mtxs[i].TxIn {
×
UNCOV
161
                        // We can delete this outpoint from the chainFilter, as
×
UNCOV
162
                        // we just received a block where it was spent. In case
×
UNCOV
163
                        // of a reorg, this outpoint might get "un-spent", but
×
UNCOV
164
                        // that's okay since it would never be wise to consider
×
UNCOV
165
                        // the channel open again (since a spending transaction
×
UNCOV
166
                        // exists on the network).
×
UNCOV
167
                        delete(b.chainFilter, txIn.PreviousOutPoint)
×
UNCOV
168
                }
×
169

170
        }
UNCOV
171
        b.filterMtx.Unlock()
×
UNCOV
172

×
UNCOV
173
        // We record the height of the last connected block added to the
×
UNCOV
174
        // blockQueue such that we can scan up to this height in case of
×
UNCOV
175
        // a rescan. It must be protected by a mutex since a filter update
×
UNCOV
176
        // might be trying to read it concurrently.
×
UNCOV
177
        b.bestHeightMtx.Lock()
×
UNCOV
178
        b.bestHeight = uint32(height)
×
UNCOV
179
        b.bestHeightMtx.Unlock()
×
UNCOV
180

×
UNCOV
181
        block := &FilteredBlock{
×
UNCOV
182
                Hash:         hash,
×
UNCOV
183
                Height:       uint32(height),
×
UNCOV
184
                Transactions: mtxs,
×
UNCOV
185
        }
×
UNCOV
186

×
UNCOV
187
        b.blockQueue.Add(&blockEvent{
×
UNCOV
188
                eventType: connected,
×
UNCOV
189
                block:     block,
×
UNCOV
190
        })
×
191
}
192

193
// onFilteredBlockDisconnected is a callback which is executed once a block is
194
// disconnected from the end of the main chain.
195
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
UNCOV
196
        hash chainhash.Hash) {
×
UNCOV
197

×
UNCOV
198
        log.Debugf("got disconnected block at height %d: %v", height,
×
UNCOV
199
                hash)
×
UNCOV
200

×
UNCOV
201
        filteredBlock := &FilteredBlock{
×
UNCOV
202
                Hash:   hash,
×
UNCOV
203
                Height: uint32(height),
×
UNCOV
204
        }
×
UNCOV
205

×
UNCOV
206
        b.blockQueue.Add(&blockEvent{
×
UNCOV
207
                eventType: disconnected,
×
UNCOV
208
                block:     filteredBlock,
×
UNCOV
209
        })
×
UNCOV
210
}
×
211

212
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
213
// result of applying the current registered UTXO sub-set on the block
214
// corresponding to that block hash. If any watched UTOX's are spent by the
215
// selected lock, then the internal chainFilter will also be updated.
216
//
217
// NOTE: This is part of the FilteredChainView interface.
UNCOV
218
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
×
UNCOV
219
        req := &filterBlockReq{
×
UNCOV
220
                blockHash: blockHash,
×
UNCOV
221
                resp:      make(chan *FilteredBlock, 1),
×
UNCOV
222
                err:       make(chan error, 1),
×
UNCOV
223
        }
×
UNCOV
224

×
UNCOV
225
        select {
×
UNCOV
226
        case b.filterBlockReqs <- req:
×
227
        case <-b.quit:
×
228
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
229
        }
230

UNCOV
231
        return <-req.resp, <-req.err
×
232
}
233

234
// chainFilterer is the primary goroutine which: listens for new blocks coming
235
// and dispatches the relevant FilteredBlock notifications, updates the filter
236
// due to requests by callers, and finally is able to perform targeted block
237
// filtration.
238
//
239
// TODO(roasbeef): change to use loadfilter RPC's
UNCOV
240
func (b *BitcoindFilteredChainView) chainFilterer() {
×
UNCOV
241
        defer b.wg.Done()
×
UNCOV
242

×
UNCOV
243
        // filterBlock is a helper function that scans the given block, and
×
UNCOV
244
        // notes which transactions spend outputs which are currently being
×
UNCOV
245
        // watched. Additionally, the chain filter will also be updated by
×
UNCOV
246
        // removing any spent outputs.
×
UNCOV
247
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
×
UNCOV
248
                b.filterMtx.Lock()
×
UNCOV
249
                defer b.filterMtx.Unlock()
×
UNCOV
250

×
UNCOV
251
                var filteredTxns []*wire.MsgTx
×
UNCOV
252
                for _, tx := range blk.Transactions {
×
UNCOV
253
                        var txAlreadyFiltered bool
×
UNCOV
254
                        for _, txIn := range tx.TxIn {
×
UNCOV
255
                                prevOp := txIn.PreviousOutPoint
×
UNCOV
256
                                if _, ok := b.chainFilter[prevOp]; !ok {
×
UNCOV
257
                                        continue
×
258
                                }
259

UNCOV
260
                                delete(b.chainFilter, prevOp)
×
UNCOV
261

×
UNCOV
262
                                // Only add this txn to our list of filtered
×
UNCOV
263
                                // txns if it is the first previous outpoint to
×
UNCOV
264
                                // cause a match.
×
UNCOV
265
                                if txAlreadyFiltered {
×
266
                                        continue
×
267
                                }
268

UNCOV
269
                                filteredTxns = append(filteredTxns, tx.Copy())
×
UNCOV
270
                                txAlreadyFiltered = true
×
271
                        }
272
                }
273

UNCOV
274
                return filteredTxns
×
275
        }
276

UNCOV
277
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
×
UNCOV
278
                height uint32) (*FilteredBlock, error) {
×
279
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
280
                if err != nil {
×
281
                        return nil, err
×
282

×
283
                }
×
284
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
285
                for _, str := range block.Transactions {
×
286
                        b, err := hex.DecodeString(str)
×
287
                        if err != nil {
×
288
                                return nil, err
×
289
                        }
×
290
                        tx := &wire.MsgTx{}
×
291
                        err = tx.Deserialize(bytes.NewReader(b))
×
292
                        if err != nil {
×
293
                                return nil, err
×
294
                        }
×
295
                        txs = append(txs, tx)
×
296
                }
297
                return &FilteredBlock{
×
298
                        Hash:         *hash,
×
299
                        Height:       height,
×
300
                        Transactions: txs,
×
301
                }, nil
×
302
        }
303

UNCOV
304
        for {
×
UNCOV
305
                select {
×
306
                // The caller has just sent an update to the current chain
307
                // filter, so we'll apply the update, possibly rewinding our
308
                // state partially.
UNCOV
309
                case update := <-b.filterUpdates:
×
UNCOV
310
                        // First, we'll add all the new UTXO's to the set of
×
UNCOV
311
                        // watched UTXO's, eliminating any duplicates in the
×
UNCOV
312
                        // process.
×
UNCOV
313
                        log.Tracef("Updating chain filter with new UTXO's: %v",
×
UNCOV
314
                                update.newUtxos)
×
UNCOV
315

×
UNCOV
316
                        b.filterMtx.Lock()
×
UNCOV
317
                        for _, newOp := range update.newUtxos {
×
UNCOV
318
                                b.chainFilter[newOp] = struct{}{}
×
UNCOV
319
                        }
×
UNCOV
320
                        b.filterMtx.Unlock()
×
UNCOV
321

×
UNCOV
322
                        // Apply the new TX filter to the chain client, which
×
UNCOV
323
                        // will cause all following notifications from and
×
UNCOV
324
                        // calls to it return blocks filtered with the new
×
UNCOV
325
                        // filter.
×
UNCOV
326
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
×
UNCOV
327
                        if err != nil {
×
328
                                log.Errorf("Unable to update filter: %v", err)
×
329
                                continue
×
330
                        }
331

332
                        // All blocks gotten after we loaded the filter will
333
                        // have the filter applied, but we will need to rescan
334
                        // the blocks up to the height of the block we last
335
                        // added to the blockQueue.
UNCOV
336
                        b.bestHeightMtx.Lock()
×
UNCOV
337
                        bestHeight := b.bestHeight
×
UNCOV
338
                        b.bestHeightMtx.Unlock()
×
UNCOV
339

×
UNCOV
340
                        // If the update height matches our best known height,
×
UNCOV
341
                        // then we don't need to do any rewinding.
×
UNCOV
342
                        if update.updateHeight == bestHeight {
×
UNCOV
343
                                continue
×
344
                        }
345

346
                        // Otherwise, we'll rewind the state to ensure the
347
                        // caller doesn't miss any relevant notifications.
348
                        // Starting from the height _after_ the update height,
349
                        // we'll walk forwards, rescanning one block at a time
350
                        // with the chain client applying the newly loaded
351
                        // filter to each block.
352
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
353
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
×
354
                                if err != nil {
×
355
                                        log.Warnf("Unable to get block hash "+
×
356
                                                "for block at height %d: %v",
×
357
                                                i, err)
×
358
                                        continue
×
359
                                }
360

361
                                // To avoid dealing with the case where a reorg
362
                                // is happening while we rescan, we scan one
363
                                // block at a time, skipping blocks that might
364
                                // have gone missing.
365
                                rescanned, err := b.chainClient.RescanBlocks(
×
366
                                        []chainhash.Hash{*blockHash},
×
367
                                )
×
368
                                if err != nil {
×
369
                                        log.Warnf("Unable to rescan block "+
×
370
                                                "with hash %v at height %d: %v",
×
371
                                                blockHash, i, err)
×
372
                                        continue
×
373
                                }
374

375
                                // If no block was returned from the rescan, it
376
                                // means no matching transactions were found.
377
                                if len(rescanned) != 1 {
×
378
                                        log.Tracef("rescan of block %v at "+
×
379
                                                "height=%d yielded no "+
×
380
                                                "transactions", blockHash, i)
×
381
                                        continue
×
382
                                }
383
                                decoded, err := decodeJSONBlock(
×
384
                                        &rescanned[0], i,
×
385
                                )
×
386
                                if err != nil {
×
387
                                        log.Errorf("Unable to decode block: %v",
×
388
                                                err)
×
389
                                        continue
×
390
                                }
391
                                b.blockQueue.Add(&blockEvent{
×
392
                                        eventType: connected,
×
393
                                        block:     decoded,
×
394
                                })
×
395
                        }
396

397
                // We've received a new request to manually filter a block.
UNCOV
398
                case req := <-b.filterBlockReqs:
×
UNCOV
399
                        // First we'll fetch the block itself as well as some
×
UNCOV
400
                        // additional information including its height.
×
UNCOV
401
                        block, err := b.GetBlock(req.blockHash)
×
UNCOV
402
                        if err != nil {
×
403
                                req.err <- err
×
404
                                req.resp <- nil
×
405
                                continue
×
406
                        }
UNCOV
407
                        header, err := b.chainClient.GetBlockHeaderVerbose(
×
UNCOV
408
                                req.blockHash)
×
UNCOV
409
                        if err != nil {
×
410
                                req.err <- err
×
411
                                req.resp <- nil
×
412
                                continue
×
413
                        }
414

415
                        // Once we have this info, we can directly filter the
416
                        // block and dispatch the proper notification.
UNCOV
417
                        req.resp <- &FilteredBlock{
×
UNCOV
418
                                Hash:         *req.blockHash,
×
UNCOV
419
                                Height:       uint32(header.Height),
×
UNCOV
420
                                Transactions: filterBlock(block),
×
UNCOV
421
                        }
×
UNCOV
422
                        req.err <- err
×
423

424
                // We've received a new event from the chain client.
UNCOV
425
                case event := <-b.chainClient.Notifications():
×
UNCOV
426
                        switch e := event.(type) {
×
427

UNCOV
428
                        case chain.FilteredBlockConnected:
×
UNCOV
429
                                b.onFilteredBlockConnected(
×
UNCOV
430
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
×
UNCOV
431
                                )
×
432

UNCOV
433
                        case chain.BlockDisconnected:
×
UNCOV
434
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
×
435
                        }
436

UNCOV
437
                case <-b.quit:
×
UNCOV
438
                        return
×
439
                }
440
        }
441
}
442

443
// UpdateFilter updates the UTXO filter which is to be consulted when creating
444
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
445
// meaning repeated calls to this method should _expand_ the size of the UTXO
446
// sub-set currently being watched.  If the set updateHeight is _lower_ than
447
// the best known height of the implementation, then the state should be
448
// rewound to ensure all relevant notifications are dispatched.
449
//
450
// NOTE: This is part of the FilteredChainView interface.
451
func (b *BitcoindFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
UNCOV
452
        updateHeight uint32) error {
×
UNCOV
453

×
UNCOV
454
        newUtxos := make([]wire.OutPoint, len(ops))
×
UNCOV
455
        for i, op := range ops {
×
UNCOV
456
                newUtxos[i] = op.OutPoint
×
UNCOV
457
        }
×
458

UNCOV
459
        select {
×
460

461
        case b.filterUpdates <- filterUpdate{
462
                newUtxos:     newUtxos,
463
                updateHeight: updateHeight,
UNCOV
464
        }:
×
UNCOV
465
                return nil
×
466

467
        case <-b.quit:
×
468
                return fmt.Errorf("chain filter shutting down")
×
469
        }
470
}
471

472
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
473
// Each time a block is connected to the end of a main chain, and appropriate
474
// FilteredBlock which contains the transactions which mutate our watched UTXO
475
// set is to be returned.
476
//
477
// NOTE: This is part of the FilteredChainView interface.
UNCOV
478
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
×
UNCOV
479
        return b.blockQueue.newBlocks
×
UNCOV
480
}
×
481

482
// DisconnectedBlocks returns a receive only channel which will be sent upon
483
// with the empty filtered blocks of blocks which are disconnected from the
484
// main chain in the case of a re-org.
485
//
486
// NOTE: This is part of the FilteredChainView interface.
UNCOV
487
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
×
UNCOV
488
        return b.blockQueue.staleBlocks
×
UNCOV
489
}
×
490

491
// GetBlock is used to retrieve the block with the given hash. This function
492
// wraps the blockCache's GetBlock function.
493
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
UNCOV
494
        *wire.MsgBlock, error) {
×
UNCOV
495

×
UNCOV
496
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
×
UNCOV
497
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc