• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11126734513

01 Oct 2024 01:47PM UTC coverage: 58.772% (-0.04%) from 58.814%
11126734513

Pull #9147

github

ziggie1984
docs: add release-notes.
Pull Request #9147: [Part 1|3] Introduce SQL Payment schema into LND

130208 of 221548 relevant lines covered (58.77%)

28210.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.38
/routing/chainview/bitcoind.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/chain"
14
        "github.com/btcsuite/btcwallet/wtxmgr"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
)
18

19
// BitcoindFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by bitcoind.
21
type BitcoindFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        // TODO: Factor out common logic between bitcoind and btcd into a
33
        // NodeFilteredView interface.
34
        chainClient *chain.BitcoindClient
35

36
        // blockEventQueue is the ordered queue used to keep the order
37
        // of connected and disconnected blocks sent to the reader of the
38
        // chainView.
39
        blockQueue *blockEventQueue
40

41
        // blockCache is an LRU block cache.
42
        blockCache *blockcache.BlockCache
43

44
        // filterUpdates is a channel in which updates to the utxo filter
45
        // attached to this instance are sent over.
46
        filterUpdates chan filterUpdate
47

48
        // chainFilter is the set of utox's that we're currently watching
49
        // spends for within the chain.
50
        filterMtx   sync.RWMutex
51
        chainFilter map[wire.OutPoint]struct{}
52

53
        // filterBlockReqs is a channel in which requests to filter select
54
        // blocks will be sent over.
55
        filterBlockReqs chan *filterBlockReq
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
}
60

61
// A compile time check to ensure BitcoindFilteredChainView implements the
62
// chainview.FilteredChainView.
63
var _ FilteredChainView = (*BitcoindFilteredChainView)(nil)
64

65
// NewBitcoindFilteredChainView creates a new instance of a FilteredChainView
66
// from RPC credentials and a ZMQ socket address for a bitcoind instance.
67
func NewBitcoindFilteredChainView(
68
        chainConn *chain.BitcoindConn,
69
        blockCache *blockcache.BlockCache) *BitcoindFilteredChainView {
5✔
70

5✔
71
        chainView := &BitcoindFilteredChainView{
5✔
72
                chainFilter:     make(map[wire.OutPoint]struct{}),
5✔
73
                filterUpdates:   make(chan filterUpdate),
5✔
74
                filterBlockReqs: make(chan *filterBlockReq),
5✔
75
                blockCache:      blockCache,
5✔
76
                quit:            make(chan struct{}),
5✔
77
        }
5✔
78

5✔
79
        chainView.chainClient = chainConn.NewBitcoindClient()
5✔
80
        chainView.blockQueue = newBlockEventQueue()
5✔
81

5✔
82
        return chainView
5✔
83
}
5✔
84

85
// Start starts all goroutines necessary for normal operation.
86
//
87
// NOTE: This is part of the FilteredChainView interface.
88
func (b *BitcoindFilteredChainView) Start() error {
5✔
89
        // Already started?
5✔
90
        if atomic.AddInt32(&b.started, 1) != 1 {
5✔
91
                return nil
×
92
        }
×
93

94
        log.Infof("FilteredChainView starting")
5✔
95

5✔
96
        err := b.chainClient.Start()
5✔
97
        if err != nil {
5✔
98
                return err
×
99
        }
×
100

101
        err = b.chainClient.NotifyBlocks()
5✔
102
        if err != nil {
5✔
103
                return err
×
104
        }
×
105

106
        _, bestHeight, err := b.chainClient.GetBestBlock()
5✔
107
        if err != nil {
5✔
108
                return err
×
109
        }
×
110

111
        b.bestHeightMtx.Lock()
5✔
112
        b.bestHeight = uint32(bestHeight)
5✔
113
        b.bestHeightMtx.Unlock()
5✔
114

5✔
115
        b.blockQueue.Start()
5✔
116

5✔
117
        b.wg.Add(1)
5✔
118
        go b.chainFilterer()
5✔
119

5✔
120
        return nil
5✔
121
}
122

123
// Stop stops all goroutines which we launched by the prior call to the Start
124
// method.
125
//
126
// NOTE: This is part of the FilteredChainView interface.
127
func (b *BitcoindFilteredChainView) Stop() error {
4✔
128
        log.Debug("BitcoindFilteredChainView stopping")
4✔
129
        defer log.Debug("BitcoindFilteredChainView stopped")
4✔
130

4✔
131
        // Already shutting down?
4✔
132
        if atomic.AddInt32(&b.stopped, 1) != 1 {
4✔
133
                return nil
×
134
        }
×
135

136
        // Shutdown the rpc client, this gracefully disconnects from bitcoind's
137
        // zmq socket, and cleans up all related resources.
138
        b.chainClient.Stop()
4✔
139

4✔
140
        b.blockQueue.Stop()
4✔
141

4✔
142
        close(b.quit)
4✔
143
        b.wg.Wait()
4✔
144

4✔
145
        return nil
4✔
146
}
147

148
// onFilteredBlockConnected is called for each block that's connected to the
149
// end of the main chain. Based on our current chain filter, the block may or
150
// may not include any relevant transactions.
151
func (b *BitcoindFilteredChainView) onFilteredBlockConnected(height int32,
152
        hash chainhash.Hash, txns []*wtxmgr.TxRecord) {
956✔
153

956✔
154
        mtxs := make([]*wire.MsgTx, len(txns))
956✔
155
        b.filterMtx.Lock()
956✔
156
        for i, tx := range txns {
961✔
157
                mtxs[i] = &tx.MsgTx
5✔
158

5✔
159
                for _, txIn := range mtxs[i].TxIn {
10✔
160
                        // We can delete this outpoint from the chainFilter, as
5✔
161
                        // we just received a block where it was spent. In case
5✔
162
                        // of a reorg, this outpoint might get "un-spent", but
5✔
163
                        // that's okay since it would never be wise to consider
5✔
164
                        // the channel open again (since a spending transaction
5✔
165
                        // exists on the network).
5✔
166
                        delete(b.chainFilter, txIn.PreviousOutPoint)
5✔
167
                }
5✔
168

169
        }
170
        b.filterMtx.Unlock()
956✔
171

956✔
172
        // We record the height of the last connected block added to the
956✔
173
        // blockQueue such that we can scan up to this height in case of
956✔
174
        // a rescan. It must be protected by a mutex since a filter update
956✔
175
        // might be trying to read it concurrently.
956✔
176
        b.bestHeightMtx.Lock()
956✔
177
        b.bestHeight = uint32(height)
956✔
178
        b.bestHeightMtx.Unlock()
956✔
179

956✔
180
        block := &FilteredBlock{
956✔
181
                Hash:         hash,
956✔
182
                Height:       uint32(height),
956✔
183
                Transactions: mtxs,
956✔
184
        }
956✔
185

956✔
186
        b.blockQueue.Add(&blockEvent{
956✔
187
                eventType: connected,
956✔
188
                block:     block,
956✔
189
        })
956✔
190
}
191

192
// onFilteredBlockDisconnected is a callback which is executed once a block is
193
// disconnected from the end of the main chain.
194
func (b *BitcoindFilteredChainView) onFilteredBlockDisconnected(height int32,
195
        hash chainhash.Hash) {
831✔
196

831✔
197
        log.Debugf("got disconnected block at height %d: %v", height,
831✔
198
                hash)
831✔
199

831✔
200
        filteredBlock := &FilteredBlock{
831✔
201
                Hash:   hash,
831✔
202
                Height: uint32(height),
831✔
203
        }
831✔
204

831✔
205
        b.blockQueue.Add(&blockEvent{
831✔
206
                eventType: disconnected,
831✔
207
                block:     filteredBlock,
831✔
208
        })
831✔
209
}
831✔
210

211
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
212
// result of applying the current registered UTXO sub-set on the block
213
// corresponding to that block hash. If any watched UTOX's are spent by the
214
// selected lock, then the internal chainFilter will also be updated.
215
//
216
// NOTE: This is part of the FilteredChainView interface.
217
func (b *BitcoindFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
3✔
218
        req := &filterBlockReq{
3✔
219
                blockHash: blockHash,
3✔
220
                resp:      make(chan *FilteredBlock, 1),
3✔
221
                err:       make(chan error, 1),
3✔
222
        }
3✔
223

3✔
224
        select {
3✔
225
        case b.filterBlockReqs <- req:
3✔
226
        case <-b.quit:
×
227
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
228
        }
229

230
        return <-req.resp, <-req.err
3✔
231
}
232

233
// chainFilterer is the primary goroutine which: listens for new blocks coming
234
// and dispatches the relevant FilteredBlock notifications, updates the filter
235
// due to requests by callers, and finally is able to perform targeted block
236
// filtration.
237
//
238
// TODO(roasbeef): change to use loadfilter RPC's
239
func (b *BitcoindFilteredChainView) chainFilterer() {
5✔
240
        defer b.wg.Done()
5✔
241

5✔
242
        // filterBlock is a helper function that scans the given block, and
5✔
243
        // notes which transactions spend outputs which are currently being
5✔
244
        // watched. Additionally, the chain filter will also be updated by
5✔
245
        // removing any spent outputs.
5✔
246
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
8✔
247
                b.filterMtx.Lock()
3✔
248
                defer b.filterMtx.Unlock()
3✔
249

3✔
250
                var filteredTxns []*wire.MsgTx
3✔
251
                for _, tx := range blk.Transactions {
10✔
252
                        var txAlreadyFiltered bool
7✔
253
                        for _, txIn := range tx.TxIn {
14✔
254
                                prevOp := txIn.PreviousOutPoint
7✔
255
                                if _, ok := b.chainFilter[prevOp]; !ok {
10✔
256
                                        continue
3✔
257
                                }
258

259
                                delete(b.chainFilter, prevOp)
5✔
260

5✔
261
                                // Only add this txn to our list of filtered
5✔
262
                                // txns if it is the first previous outpoint to
5✔
263
                                // cause a match.
5✔
264
                                if txAlreadyFiltered {
5✔
265
                                        continue
×
266
                                }
267

268
                                filteredTxns = append(filteredTxns, tx.Copy())
5✔
269
                                txAlreadyFiltered = true
5✔
270
                        }
271
                }
272

273
                return filteredTxns
3✔
274
        }
275

276
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
5✔
277
                height uint32) (*FilteredBlock, error) {
7✔
278
                hash, err := chainhash.NewHashFromStr(block.Hash)
2✔
279
                if err != nil {
2✔
280
                        return nil, err
×
281

×
282
                }
×
283
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
2✔
284
                for _, str := range block.Transactions {
4✔
285
                        b, err := hex.DecodeString(str)
2✔
286
                        if err != nil {
2✔
287
                                return nil, err
×
288
                        }
×
289
                        tx := &wire.MsgTx{}
2✔
290
                        err = tx.Deserialize(bytes.NewReader(b))
2✔
291
                        if err != nil {
2✔
292
                                return nil, err
×
293
                        }
×
294
                        txs = append(txs, tx)
2✔
295
                }
296
                return &FilteredBlock{
2✔
297
                        Hash:         *hash,
2✔
298
                        Height:       height,
2✔
299
                        Transactions: txs,
2✔
300
                }, nil
2✔
301
        }
302

303
        for {
2,774✔
304
                select {
2,769✔
305
                // The caller has just sent an update to the current chain
306
                // filter, so we'll apply the update, possibly rewinding our
307
                // state partially.
308
                case update := <-b.filterUpdates:
7✔
309
                        // First, we'll add all the new UTXO's to the set of
7✔
310
                        // watched UTXO's, eliminating any duplicates in the
7✔
311
                        // process.
7✔
312
                        log.Tracef("Updating chain filter with new UTXO's: %v",
7✔
313
                                update.newUtxos)
7✔
314

7✔
315
                        b.filterMtx.Lock()
7✔
316
                        for _, newOp := range update.newUtxos {
18✔
317
                                b.chainFilter[newOp] = struct{}{}
11✔
318
                        }
11✔
319
                        b.filterMtx.Unlock()
7✔
320

7✔
321
                        // Apply the new TX filter to the chain client, which
7✔
322
                        // will cause all following notifications from and
7✔
323
                        // calls to it return blocks filtered with the new
7✔
324
                        // filter.
7✔
325
                        err := b.chainClient.LoadTxFilter(false, update.newUtxos)
7✔
326
                        if err != nil {
7✔
327
                                log.Errorf("Unable to update filter: %v", err)
×
328
                                continue
×
329
                        }
330

331
                        // All blocks gotten after we loaded the filter will
332
                        // have the filter applied, but we will need to rescan
333
                        // the blocks up to the height of the block we last
334
                        // added to the blockQueue.
335
                        b.bestHeightMtx.Lock()
7✔
336
                        bestHeight := b.bestHeight
7✔
337
                        b.bestHeightMtx.Unlock()
7✔
338

7✔
339
                        // If the update height matches our best known height,
7✔
340
                        // then we don't need to do any rewinding.
7✔
341
                        if update.updateHeight == bestHeight {
12✔
342
                                continue
5✔
343
                        }
344

345
                        // Otherwise, we'll rewind the state to ensure the
346
                        // caller doesn't miss any relevant notifications.
347
                        // Starting from the height _after_ the update height,
348
                        // we'll walk forwards, rescanning one block at a time
349
                        // with the chain client applying the newly loaded
350
                        // filter to each block.
351
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
4✔
352
                                blockHash, err := b.chainClient.GetBlockHash(int64(i))
2✔
353
                                if err != nil {
2✔
354
                                        log.Warnf("Unable to get block hash "+
×
355
                                                "for block at height %d: %v",
×
356
                                                i, err)
×
357
                                        continue
×
358
                                }
359

360
                                // To avoid dealing with the case where a reorg
361
                                // is happening while we rescan, we scan one
362
                                // block at a time, skipping blocks that might
363
                                // have gone missing.
364
                                rescanned, err := b.chainClient.RescanBlocks(
2✔
365
                                        []chainhash.Hash{*blockHash},
2✔
366
                                )
2✔
367
                                if err != nil {
2✔
368
                                        log.Warnf("Unable to rescan block "+
×
369
                                                "with hash %v at height %d: %v",
×
370
                                                blockHash, i, err)
×
371
                                        continue
×
372
                                }
373

374
                                // If no block was returned from the rescan, it
375
                                // means no matching transactions were found.
376
                                if len(rescanned) != 1 {
2✔
377
                                        log.Tracef("rescan of block %v at "+
×
378
                                                "height=%d yielded no "+
×
379
                                                "transactions", blockHash, i)
×
380
                                        continue
×
381
                                }
382
                                decoded, err := decodeJSONBlock(
2✔
383
                                        &rescanned[0], i,
2✔
384
                                )
2✔
385
                                if err != nil {
2✔
386
                                        log.Errorf("Unable to decode block: %v",
×
387
                                                err)
×
388
                                        continue
×
389
                                }
390
                                b.blockQueue.Add(&blockEvent{
2✔
391
                                        eventType: connected,
2✔
392
                                        block:     decoded,
2✔
393
                                })
2✔
394
                        }
395

396
                // We've received a new request to manually filter a block.
397
                case req := <-b.filterBlockReqs:
3✔
398
                        // First we'll fetch the block itself as well as some
3✔
399
                        // additional information including its height.
3✔
400
                        block, err := b.GetBlock(req.blockHash)
3✔
401
                        if err != nil {
3✔
402
                                req.err <- err
×
403
                                req.resp <- nil
×
404
                                continue
×
405
                        }
406
                        header, err := b.chainClient.GetBlockHeaderVerbose(
3✔
407
                                req.blockHash)
3✔
408
                        if err != nil {
3✔
409
                                req.err <- err
×
410
                                req.resp <- nil
×
411
                                continue
×
412
                        }
413

414
                        // Once we have this info, we can directly filter the
415
                        // block and dispatch the proper notification.
416
                        req.resp <- &FilteredBlock{
3✔
417
                                Hash:         *req.blockHash,
3✔
418
                                Height:       uint32(header.Height),
3✔
419
                                Transactions: filterBlock(block),
3✔
420
                        }
3✔
421
                        req.err <- err
3✔
422

423
                // We've received a new event from the chain client.
424
                case event := <-b.chainClient.Notifications():
2,757✔
425
                        switch e := event.(type) {
2,757✔
426

427
                        case chain.FilteredBlockConnected:
956✔
428
                                b.onFilteredBlockConnected(
956✔
429
                                        e.Block.Height, e.Block.Hash, e.RelevantTxs,
956✔
430
                                )
956✔
431

432
                        case chain.BlockDisconnected:
831✔
433
                                b.onFilteredBlockDisconnected(e.Height, e.Hash)
831✔
434
                        }
435

436
                case <-b.quit:
4✔
437
                        return
4✔
438
                }
439
        }
440
}
441

442
// UpdateFilter updates the UTXO filter which is to be consulted when creating
443
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
444
// meaning repeated calls to this method should _expand_ the size of the UTXO
445
// sub-set currently being watched.  If the set updateHeight is _lower_ than
446
// the best known height of the implementation, then the state should be
447
// rewound to ensure all relevant notifications are dispatched.
448
//
449
// NOTE: This is part of the FilteredChainView interface.
450
func (b *BitcoindFilteredChainView) UpdateFilter(ops []channeldb.EdgePoint,
451
        updateHeight uint32) error {
7✔
452

7✔
453
        newUtxos := make([]wire.OutPoint, len(ops))
7✔
454
        for i, op := range ops {
18✔
455
                newUtxos[i] = op.OutPoint
11✔
456
        }
11✔
457

458
        select {
7✔
459

460
        case b.filterUpdates <- filterUpdate{
461
                newUtxos:     newUtxos,
462
                updateHeight: updateHeight,
463
        }:
7✔
464
                return nil
7✔
465

466
        case <-b.quit:
×
467
                return fmt.Errorf("chain filter shutting down")
×
468
        }
469
}
470

471
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
472
// Each time a block is connected to the end of a main chain, and appropriate
473
// FilteredBlock which contains the transactions which mutate our watched UTXO
474
// set is to be returned.
475
//
476
// NOTE: This is part of the FilteredChainView interface.
477
func (b *BitcoindFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
9✔
478
        return b.blockQueue.newBlocks
9✔
479
}
9✔
480

481
// DisconnectedBlocks returns a receive only channel which will be sent upon
482
// with the empty filtered blocks of blocks which are disconnected from the
483
// main chain in the case of a re-org.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
486
func (b *BitcoindFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
3✔
487
        return b.blockQueue.staleBlocks
3✔
488
}
3✔
489

490
// GetBlock is used to retrieve the block with the given hash. This function
491
// wraps the blockCache's GetBlock function.
492
func (b *BitcoindFilteredChainView) GetBlock(hash *chainhash.Hash) (
493
        *wire.MsgBlock, error) {
3✔
494

3✔
495
        return b.blockCache.GetBlock(hash, b.chainClient.GetBlock)
3✔
496
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc