• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9111774206

pending completion
9111774206

Pull #8765

github

hieblmi
routing: log edge when skipping it
Pull Request #8765: routing: log edge when skipping it

1 of 1 new or added line in 1 file covered. (100.0%)

104 existing lines in 27 files now uncovered.

122984 of 210570 relevant lines covered (58.41%)

28065.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.32
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
6✔
70

6✔
71
        chainView := &BitcoindFilteredChainView{
6✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
6✔
73
                filterUpdates:   make(chan filterUpdate),
6✔
74
                filterBlockReqs: make(chan *filterBlockReq),
6✔
75
                blockCache:      blockCache,
6✔
76
                quit:            make(chan struct{}),
6✔
77
        }
6✔
78

6✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
6✔
80
        chainView.blockQueue = newBlockEventQueue()
6✔
81

6✔
82
        return chainView
6✔
83
}
6✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
6✔
89
        // Already started?
6✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
6✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
6✔
95

6✔
96
        err := b.chainClient.Start()
6✔
97
        if err != nil {
6✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
6✔
102
        if err != nil {
6✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
6✔
107
        if err != nil {
6✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
6✔
112
        b.bestHeight = uint32(bestHeight)
6✔
113
        b.bestHeightMtx.Unlock()
6✔
114

6✔
115
        b.blockQueue.Start()
6✔
116

6✔
117
        b.wg.Add(1)
6✔
118
        go b.chainFilterer()
6✔
119

6✔
120
        return nil
6✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
6✔
128
        // Already shutting down?
6✔
129
        if atomic.AddInt32(&b.stopped, 1) != 1 {
6✔
130
                return nil
×
131
        }
×
132

133
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
134
        // zmq socket, and cleans up all related resources.
135
        b.chainClient.Stop()
6✔
136

6✔
137
        b.blockQueue.Stop()
6✔
138

6✔
139
        log.Infof("FilteredChainView stopping")
6✔
140

6✔
141
        close(b.quit)
6✔
142
        b.wg.Wait()
6✔
143

6✔
144
        return nil
6✔
145
}
146

147
// onFilteredBlockConnected is called for each block that's connected to the
148
// end of the main chain. Based on our current chain filter, the block may or
149
// may not include any relevant transactions.
150
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
151
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
347✔
152

347✔
153
        mtxs := make([]*wire.MsgTx, len(txns))
347✔
154
        b.filterMtx.Lock()
347✔
155
        for i, tx := range txns {
353✔
156
                mtxs[i] = &tx.MsgTx
6✔
157

6✔
158
                for _, txIn := range mtxs[i].TxIn {
12✔
159
                        // We can delete this outpoint from the chainFilter, as
6✔
160
                        // we just received a block where it was spent. In case
6✔
161
                        // of a reorg, this outpoint might get "un-spent", but
6✔
162
                        // that's okay since it would never be wise to consider
6✔
163
                        // the channel open again (since a spending transaction
6✔
164
                        // exists on the network).
6✔
165
                        delete(b.chainFilter, txIn.PreviousOutPoint)
6✔
166
                }
6✔
167

168
        }
169
        b.filterMtx.Unlock()
347✔
170

347✔
171
        // We record the height of the last connected block added to the
347✔
172
        // blockQueue such that we can scan up to this height in case of
347✔
173
        // a rescan. It must be protected by a mutex since a filter update
347✔
174
        // might be trying to read it concurrently.
347✔
175
        b.bestHeightMtx.Lock()
347✔
176
        b.bestHeight = uint32(height)
347✔
177
        b.bestHeightMtx.Unlock()
347✔
178

347✔
179
        block := &FilteredBlock{
347✔
180
                Hash:         hash,
347✔
181
                Height:       uint32(height),
347✔
182
                Transactions: mtxs,
347✔
183
        }
347✔
184

347✔
185
        b.blockQueue.Add(&blockEvent{
347✔
186
                eventType: connected,
347✔
187
                block:     block,
347✔
188
        })
347✔
189
}
190

191
// onFilteredBlockDisconnected is a callback which is executed once a block is
192
// disconnected from the end of the main chain.
193
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
194
        hash chainhash.Hash) {
222✔
195

222✔
196
        log.Debugf("got disconnected block at height %d: %v", height,
222✔
197
                hash)
222✔
198

222✔
199
        filteredBlock := &FilteredBlock{
222✔
200
                Hash:   hash,
222✔
201
                Height: uint32(height),
222✔
202
        }
222✔
203

222✔
204
        b.blockQueue.Add(&blockEvent{
222✔
205
                eventType: disconnected,
222✔
206
                block:     filteredBlock,
222✔
207
        })
222✔
208
}
222✔
209

210
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
211
// result of applying the current registered UTXO sub-set on the block
212
// corresponding to that block hash. If any watched UTOX's are spent by the
213
// selected lock, then the internal chainFilter will also be updated.
214
//
215
// NOTE: This is part of the FilteredChainView interface.
216
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
4✔
217
        req := &filterBlockReq{
4✔
218
                blockHash: blockHash,
4✔
219
                resp:      make(chan *FilteredBlock, 1),
4✔
220
                err:       make(chan error, 1),
4✔
221
        }
4✔
222

4✔
223
        select {
4✔
224
        case b.filterBlockReqs <- req:
4✔
225
        case <-b.quit:
×
226
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
227
        }
228

229
        return <-req.resp, <-req.err
4✔
230
}
231

232
// chainFilterer is the primary goroutine which: listens for new blocks coming
233
// and dispatches the relevant FilteredBlock notifications, updates the filter
234
// due to requests by callers, and finally is able to perform targeted block
235
// filtration.
236
//
237
// TODO(roasbeef): change to use loadfilter RPC's
238
func (b *BitcoindFilteredChainView) chainFilterer() {
6✔
239
        defer b.wg.Done()
6✔
240

6✔
241
        // filterBlock is a helper function that scans the given block, and
6✔
242
        // notes which transactions spend outputs which are currently being
6✔
243
        // watched. Additionally, the chain filter will also be updated by
6✔
244
        // removing any spent outputs.
6✔
245
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
10✔
246
                b.filterMtx.Lock()
4✔
247
                defer b.filterMtx.Unlock()
4✔
248

4✔
249
                var filteredTxns []*wire.MsgTx
4✔
250
                for _, tx := range blk.Transactions {
12✔
251
                        var txAlreadyFiltered bool
8✔
252
                        for _, txIn := range tx.TxIn {
16✔
253
                                prevOp := txIn.PreviousOutPoint
8✔
254
                                if _, ok := b.chainFilter[prevOp]; !ok {
12✔
255
                                        continue
4✔
256
                                }
257

258
                                delete(b.chainFilter, prevOp)
6✔
259

6✔
260
                                // Only add this txn to our list of filtered
6✔
261
                                // txns if it is the first previous outpoint to
6✔
262
                                // cause a match.
6✔
263
                                if txAlreadyFiltered {
6✔
264
                                        continue
×
265
                                }
266

267
                                filteredTxns = append(filteredTxns, tx.Copy())
6✔
268
                                txAlreadyFiltered = true
6✔
269
                        }
270
                }
271

272
                return filteredTxns
4✔
273
        }
274

275
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
6✔
276
                height uint32) (*FilteredBlock, error) {
8✔
277
                hash, err := chainhash.NewHashFromStr(block.Hash)
2✔
278
                if err != nil {
2✔
279
                        return nil, err
×
280

×
281
                }
×
282
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
2✔
283
                for _, str := range block.Transactions {
4✔
284
                        b, err := hex.DecodeString(str)
2✔
285
                        if err != nil {
2✔
286
                                return nil, err
×
287
                        }
×
288
                        tx := &wire.MsgTx{}
2✔
289
                        err = tx.Deserialize(bytes.NewReader(b))
2✔
290
                        if err != nil {
2✔
291
                                return nil, err
×
292
                        }
×
293
                        txs = append(txs, tx)
2✔
294
                }
295
                return &FilteredBlock{
2✔
296
                        Hash:         *hash,
2✔
297
                        Height:       height,
2✔
298
                        Transactions: txs,
2✔
299
                }, nil
2✔
300
        }
301

302
        for {
946✔
303
                select {
940✔
304
                // The caller has just sent an update to the current chain
305
                // filter, so we'll apply the update, possibly rewinding our
306
                // state partially.
307
                case update := <-b.filterUpdates:
8✔
308
                        // First, we'll add all the new UTXO's to the set of
8✔
309
                        // watched UTXO's, eliminating any duplicates in the
8✔
310
                        // process.
8✔
311
                        log.Tracef("Updating chain filter with new UTXO's: %v",
8✔
312
                                update.newUtxos)
8✔
313

8✔
314
                        b.filterMtx.Lock()
8✔
315
                        for _, newOp := range update.newUtxos {
20✔
316
                                b.chainFilter[newOp] = struct{}{}
12✔
317
                        }
12✔
318
                        b.filterMtx.Unlock()
8✔
319

8✔
320
                        // Apply the new TX filter to the chain client, which
8✔
321
                        // will cause all following notifications from and
8✔
322
                        // calls to it return blocks filtered with the new
8✔
323
                        // filter.
8✔
324
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
8✔
325
                        if err != nil {
8✔
326
                                log.Errorf("Unable to update filter: %v", err)
×
327
                                continue
×
328
                        }
329

330
                        // All blocks gotten after we loaded the filter will
331
                        // have the filter applied, but we will need to rescan
332
                        // the blocks up to the height of the block we last
333
                        // added to the blockQueue.
334
                        b.bestHeightMtx.Lock()
8✔
335
                        bestHeight := b.bestHeight
8✔
336
                        b.bestHeightMtx.Unlock()
8✔
337

8✔
338
                        // If the update height matches our best known height,
8✔
339
                        // then we don't need to do any rewinding.
8✔
340
                        if update.updateHeight == bestHeight {
14✔
341
                                continue
6✔
342
                        }
343

344
                        // Otherwise, we'll rewind the state to ensure the
345
                        // caller doesn't miss any relevant notifications.
346
                        // Starting from the height _after_ the update height,
347
                        // we'll walk forwards, rescanning one block at a time
348
                        // with the chain client applying the newly loaded
349
                        // filter to each block.
350
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
4✔
351
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
2✔
352
                                if err != nil {
2✔
353
                                        log.Warnf("Unable to get block hash "+
×
354
                                                "for block at height %d: %v",
×
355
                                                i, err)
×
356
                                        continue
×
357
                                }
358

359
                                // To avoid dealing with the case where a reorg
360
                                // is happening while we rescan, we scan one
361
                                // block at a time, skipping blocks that might
362
                                // have gone missing.
363
                                rescanned, err := b.chainClient.RescanBlocks(
2✔
364
                                        []chainhash.Hash{*blockHash},
2✔
365
                                )
2✔
366
                                if err != nil {
2✔
367
                                        log.Warnf("Unable to rescan block "+
×
368
                                                "with hash %v at height %d: %v",
×
369
                                                blockHash, i, err)
×
370
                                        continue
×
371
                                }
372

373
                                // If no block was returned from the rescan, it
374
                                // means no matching transactions were found.
375
                                if len(rescanned) != 1 {
2✔
UNCOV
376
                                        log.Tracef("rescan of block %v at "+
×
UNCOV
377
                                                "height=%d yielded no "+
×
UNCOV
378
                                                "transactions", blockHash, i)
×
UNCOV
379
                                        continue
×
380
                                }
381
                                decoded, err := decodeJSONBlock(
2✔
382
                                        &rescanned[0], i,
2✔
383
                                )
2✔
384
                                if err != nil {
2✔
385
                                        log.Errorf("Unable to decode block: %v",
×
386
                                                err)
×
387
                                        continue
×
388
                                }
389
                                b.blockQueue.Add(&blockEvent{
2✔
390
                                        eventType: connected,
2✔
391
                                        block:     decoded,
2✔
392
                                })
2✔
393
                        }
394

395
                // We've received a new request to manually filter a block.
396
                case req := <-b.filterBlockReqs:
4✔
397
                        // First we'll fetch the block itself as well as some
4✔
398
                        // additional information including its height.
4✔
399
                        block, err := b.GetBlock(req.blockHash)
4✔
400
                        if err != nil {
4✔
401
                                req.err <- err
×
402
                                req.resp <- nil
×
403
                                continue
×
404
                        }
405
                        header, err := b.chainClient.GetBlockHeaderVerbose(
4✔
406
                                req.blockHash)
4✔
407
                        if err != nil {
4✔
408
                                req.err <- err
×
409
                                req.resp <- nil
×
410
                                continue
×
411
                        }
412

413
                        // Once we have this info, we can directly filter the
414
                        // block and dispatch the proper notification.
415
                        req.resp <- &FilteredBlock{
4✔
416
                                Hash:         *req.blockHash,
4✔
417
                                Height:       uint32(header.Height),
4✔
418
                                Transactions: filterBlock(block),
4✔
419
                        }
4✔
420
                        req.err <- err
4✔
421

422
                // We've received a new event from the chain client.
423
                case event := <-b.chainClient.Notifications():
928✔
424
                        switch e := event.(type) {
928✔
425

426
                        case chain.FilteredBlockConnected:
347✔
427
                                b.onFilteredBlockConnected(
347✔
428
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
347✔
429
                                )
347✔
430

431
                        case chain.BlockDisconnected:
222✔
432
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
222✔
433
                        }
434

435
                case <-b.quit:
6✔
436
                        return
6✔
437
                }
438
        }
439
}
440

441
// UpdateFilter updates the UTXO filter which is to be consulted when creating
442
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
443
// meaning repeated calls to this method should _expand_ the size of the UTXO
444
// sub-set currently being watched.  If the set updateHeight is _lower_ than
445
// the best known height of the implementation, then the state should be
446
// rewound to ensure all relevant notifications are dispatched.
447
//
448
// NOTE: This is part of the FilteredChainView interface.
449
func (b *BitcoindFilteredChainView) UpdateFilter(ops []channeldb.EdgePoint,
450
        updateHeight uint32) error {
8✔
451

8✔
452
        newUtxos := make([]wire.OutPoint, len(ops))
8✔
453
        for i, op := range ops {
20✔
454
                newUtxos[i] = op.OutPoint
12✔
455
        }
12✔
456

457
        select {
8✔
458

459
        case b.filterUpdates <- filterUpdate{
460
                newUtxos:     newUtxos,
461
                updateHeight: updateHeight,
462
        }:
8✔
463
                return nil
8✔
464

465
        case <-b.quit:
×
466
                return fmt.Errorf("chain filter shutting down")
×
467
        }
468
}
469

470
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
471
// Each time a block is connected to the end of a main chain, and appropriate
472
// FilteredBlock which contains the transactions which mutate our watched UTXO
473
// set is to be returned.
474
//
475
// NOTE: This is part of the FilteredChainView interface.
476
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
10✔
477
        return b.blockQueue.newBlocks
10✔
478
}
10✔
479

480
// DisconnectedBlocks returns a receive only channel which will be sent upon
481
// with the empty filtered blocks of blocks which are disconnected from the
482
// main chain in the case of a re-org.
483
//
484
// NOTE: This is part of the FilteredChainView interface.
485
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
4✔
486
        return b.blockQueue.staleBlocks
4✔
487
}
4✔
488

489
// GetBlock is used to retrieve the block with the given hash. This function
490
// wraps the blockCache's GetBlock function.
491
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
492
        *wire.MsgBlock, error) {
4✔
493

4✔
494
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
4✔
495
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc