• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.92
/routing/chainview/btcd.go
1
package chainview
2

3
import (
4
        "bytes"
5
        "encoding/hex"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcjson"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/rpcclient"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/lightningnetwork/lnd/blockcache"
16
        graphdb "github.com/lightningnetwork/lnd/graph/db"
17
)
18

19
// BtcdFilteredChainView is an implementation of the FilteredChainView
20
// interface which is backed by an active websockets connection to btcd.
21
type BtcdFilteredChainView struct {
22
        started int32 // To be used atomically.
23
        stopped int32 // To be used atomically.
24

25
        // bestHeight is the height of the latest block added to the
26
        // blockQueue from the onFilteredConnectedMethod. It is used to
27
        // determine up to what height we would need to rescan in case
28
        // of a filter update.
29
        bestHeightMtx sync.Mutex
30
        bestHeight    uint32
31

32
        btcdConn *rpcclient.Client
33

34
        // blockEventQueue is the ordered queue used to keep the order
35
        // of connected and disconnected blocks sent to the reader of the
36
        // chainView.
37
        blockQueue *blockEventQueue
38

39
        // blockCache is an LRU block cache.
40
        blockCache *blockcache.BlockCache
41

42
        // filterUpdates is a channel in which updates to the utxo filter
43
        // attached to this instance are sent over.
44
        filterUpdates chan filterUpdate
45

46
        // chainFilter is the set of utox's that we're currently watching
47
        // spends for within the chain.
48
        filterMtx   sync.RWMutex
49
        chainFilter map[wire.OutPoint]struct{}
50

51
        // filterBlockReqs is a channel in which requests to filter select
52
        // blocks will be sent over.
53
        filterBlockReqs chan *filterBlockReq
54

55
        quit chan struct{}
56
        wg   sync.WaitGroup
57
}
58

59
// A compile time check to ensure BtcdFilteredChainView implements the
60
// chainview.FilteredChainView.
61
var _ FilteredChainView = (*BtcdFilteredChainView)(nil)
62

63
// NewBtcdFilteredChainView creates a new instance of a FilteredChainView from
64
// RPC credentials for an active btcd instance.
65
func NewBtcdFilteredChainView(config rpcclient.ConnConfig,
66
        blockCache *blockcache.BlockCache) (*BtcdFilteredChainView, error) {
1✔
67

1✔
68
        chainView := &BtcdFilteredChainView{
1✔
69
                chainFilter:     make(map[wire.OutPoint]struct{}),
1✔
70
                filterUpdates:   make(chan filterUpdate),
1✔
71
                filterBlockReqs: make(chan *filterBlockReq),
1✔
72
                blockCache:      blockCache,
1✔
73
                quit:            make(chan struct{}),
1✔
74
        }
1✔
75

1✔
76
        ntfnCallbacks := &rpcclient.NotificationHandlers{
1✔
77
                OnFilteredBlockConnected:    chainView.onFilteredBlockConnected,
1✔
78
                OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected,
1✔
79
        }
1✔
80

1✔
81
        // Disable connecting to btcd within the rpcclient.New method. We
1✔
82
        // defer establishing the connection to our .Start() method.
1✔
83
        config.DisableConnectOnNew = true
1✔
84
        config.DisableAutoReconnect = false
1✔
85
        chainConn, err := rpcclient.New(&config, ntfnCallbacks)
1✔
86
        if err != nil {
1✔
87
                return nil, err
×
88
        }
×
89
        chainView.btcdConn = chainConn
1✔
90

1✔
91
        chainView.blockQueue = newBlockEventQueue()
1✔
92

1✔
93
        return chainView, nil
1✔
94
}
95

96
// Start starts all goroutines necessary for normal operation.
97
//
98
// NOTE: This is part of the FilteredChainView interface.
99
func (b *BtcdFilteredChainView) Start() error {
1✔
100
        // Already started?
1✔
101
        if atomic.AddInt32(&b.started, 1) != 1 {
1✔
102
                return nil
×
103
        }
×
104

105
        log.Infof("FilteredChainView starting")
1✔
106

1✔
107
        // Connect to btcd, and register for notifications on connected, and
1✔
108
        // disconnected blocks.
1✔
109
        if err := b.btcdConn.Connect(20); err != nil {
1✔
110
                return err
×
111
        }
×
112
        if err := b.btcdConn.NotifyBlocks(); err != nil {
1✔
113
                return err
×
114
        }
×
115

116
        _, bestHeight, err := b.btcdConn.GetBestBlock()
1✔
117
        if err != nil {
1✔
118
                return err
×
119
        }
×
120

121
        b.bestHeightMtx.Lock()
1✔
122
        b.bestHeight = uint32(bestHeight)
1✔
123
        b.bestHeightMtx.Unlock()
1✔
124

1✔
125
        b.blockQueue.Start()
1✔
126

1✔
127
        b.wg.Add(1)
1✔
128
        go b.chainFilterer()
1✔
129

1✔
130
        return nil
1✔
131
}
132

133
// Stop stops all goroutines which we launched by the prior call to the Start
134
// method.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (b *BtcdFilteredChainView) Stop() error {
1✔
138
        log.Debug("BtcdFilteredChainView stopping")
1✔
139
        defer log.Debug("BtcdFilteredChainView stopped")
1✔
140

1✔
141
        // Already shutting down?
1✔
142
        if atomic.AddInt32(&b.stopped, 1) != 1 {
1✔
143
                return nil
×
144
        }
×
145

146
        // Shutdown the rpc client, this gracefully disconnects from btcd, and
147
        // cleans up all related resources.
148
        b.btcdConn.Shutdown()
1✔
149
        b.btcdConn.WaitForShutdown()
1✔
150

1✔
151
        b.blockQueue.Stop()
1✔
152

1✔
153
        close(b.quit)
1✔
154
        b.wg.Wait()
1✔
155

1✔
156
        return nil
1✔
157
}
158

159
// onFilteredBlockConnected is called for each block that's connected to the
160
// end of the main chain. Based on our current chain filter, the block may or
161
// may not include any relevant transactions.
162
func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32,
163
        header *wire.BlockHeader, txns []*btcutil.Tx) {
1✔
164

1✔
165
        mtxs := make([]*wire.MsgTx, len(txns))
1✔
166
        b.filterMtx.Lock()
1✔
167
        for i, tx := range txns {
2✔
168
                mtx := tx.MsgTx()
1✔
169
                mtxs[i] = mtx
1✔
170

1✔
171
                for _, txIn := range mtx.TxIn {
2✔
172
                        // We can delete this outpoint from the chainFilter, as
1✔
173
                        // we just received a block where it was spent. In case
1✔
174
                        // of a reorg, this outpoint might get "un-spent", but
1✔
175
                        // that's okay since it would never be wise to consider
1✔
176
                        // the channel open again (since a spending transaction
1✔
177
                        // exists on the network).
1✔
178
                        delete(b.chainFilter, txIn.PreviousOutPoint)
1✔
179
                }
1✔
180

181
        }
182
        b.filterMtx.Unlock()
1✔
183

1✔
184
        // We record the height of the last connected block added to the
1✔
185
        // blockQueue such that we can scan up to this height in case of
1✔
186
        // a rescan. It must be protected by a mutex since a filter update
1✔
187
        // might be trying to read it concurrently.
1✔
188
        b.bestHeightMtx.Lock()
1✔
189
        b.bestHeight = uint32(height)
1✔
190
        b.bestHeightMtx.Unlock()
1✔
191

1✔
192
        block := &FilteredBlock{
1✔
193
                Hash:         header.BlockHash(),
1✔
194
                Height:       uint32(height),
1✔
195
                Transactions: mtxs,
1✔
196
        }
1✔
197

1✔
198
        b.blockQueue.Add(&blockEvent{
1✔
199
                eventType: connected,
1✔
200
                block:     block,
1✔
201
        })
1✔
202
}
203

204
// onFilteredBlockDisconnected is a callback which is executed once a block is
205
// disconnected from the end of the main chain.
206
func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32,
207
        header *wire.BlockHeader) {
1✔
208

1✔
209
        log.Debugf("got disconnected block at height %d: %v", height,
1✔
210
                header.BlockHash())
1✔
211

1✔
212
        filteredBlock := &FilteredBlock{
1✔
213
                Hash:   header.BlockHash(),
1✔
214
                Height: uint32(height),
1✔
215
        }
1✔
216

1✔
217
        b.blockQueue.Add(&blockEvent{
1✔
218
                eventType: disconnected,
1✔
219
                block:     filteredBlock,
1✔
220
        })
1✔
221
}
1✔
222

223
// filterBlockReq houses a request to manually filter a block specified by
224
// block hash.
225
type filterBlockReq struct {
226
        blockHash *chainhash.Hash
227
        resp      chan *FilteredBlock
228
        err       chan error
229
}
230

231
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
232
// result of applying the current registered UTXO sub-set on the block
233
// corresponding to that block hash. If any watched UTOX's are spent by the
234
// selected lock, then the internal chainFilter will also be updated.
235
//
236
// NOTE: This is part of the FilteredChainView interface.
237
func (b *BtcdFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
238
        req := &filterBlockReq{
1✔
239
                blockHash: blockHash,
1✔
240
                resp:      make(chan *FilteredBlock, 1),
1✔
241
                err:       make(chan error, 1),
1✔
242
        }
1✔
243

1✔
244
        select {
1✔
245
        case b.filterBlockReqs <- req:
1✔
246
        case <-b.quit:
×
247
                return nil, fmt.Errorf("FilteredChainView shutting down")
×
248
        }
249

250
        return <-req.resp, <-req.err
1✔
251
}
252

253
// chainFilterer is the primary goroutine which: listens for new blocks coming
254
// and dispatches the relevant FilteredBlock notifications, updates the filter
255
// due to requests by callers, and finally is able to preform targeted block
256
// filtration.
257
//
258
// TODO(roasbeef): change to use loadfilter RPC's
259
func (b *BtcdFilteredChainView) chainFilterer() {
1✔
260
        defer b.wg.Done()
1✔
261

1✔
262
        // filterBlock is a helper function that scans the given block, and
1✔
263
        // notes which transactions spend outputs which are currently being
1✔
264
        // watched. Additionally, the chain filter will also be updated by
1✔
265
        // removing any spent outputs.
1✔
266
        filterBlock := func(blk *wire.MsgBlock) []*wire.MsgTx {
2✔
267
                b.filterMtx.Lock()
1✔
268
                defer b.filterMtx.Unlock()
1✔
269

1✔
270
                var filteredTxns []*wire.MsgTx
1✔
271
                for _, tx := range blk.Transactions {
2✔
272
                        var txAlreadyFiltered bool
1✔
273
                        for _, txIn := range tx.TxIn {
2✔
274
                                prevOp := txIn.PreviousOutPoint
1✔
275
                                if _, ok := b.chainFilter[prevOp]; !ok {
2✔
276
                                        continue
1✔
277
                                }
278

279
                                delete(b.chainFilter, prevOp)
1✔
280

1✔
281
                                // Only add this txn to our list of filtered
1✔
282
                                // txns if it is the first previous outpoint to
1✔
283
                                // cause a match.
1✔
284
                                if txAlreadyFiltered {
1✔
285
                                        continue
×
286
                                }
287

288
                                filteredTxns = append(filteredTxns, tx.Copy())
1✔
289
                                txAlreadyFiltered = true
1✔
290

291
                        }
292
                }
293

294
                return filteredTxns
1✔
295
        }
296

297
        decodeJSONBlock := func(block *btcjson.RescannedBlock,
1✔
298
                height uint32) (*FilteredBlock, error) {
1✔
299
                hash, err := chainhash.NewHashFromStr(block.Hash)
×
300
                if err != nil {
×
301
                        return nil, err
×
302

×
303
                }
×
304
                txs := make([]*wire.MsgTx, 0, len(block.Transactions))
×
305
                for _, str := range block.Transactions {
×
306
                        b, err := hex.DecodeString(str)
×
307
                        if err != nil {
×
308
                                return nil, err
×
309
                        }
×
310
                        tx := &wire.MsgTx{}
×
311
                        err = tx.Deserialize(bytes.NewReader(b))
×
312
                        if err != nil {
×
313
                                return nil, err
×
314
                        }
×
315
                        txs = append(txs, tx)
×
316
                }
317
                return &FilteredBlock{
×
318
                        Hash:         *hash,
×
319
                        Height:       height,
×
320
                        Transactions: txs,
×
321
                }, nil
×
322
        }
323

324
        for {
2✔
325
                select {
1✔
326
                // The caller has just sent an update to the current chain
327
                // filter, so we'll apply the update, possibly rewinding our
328
                // state partially.
329
                case update := <-b.filterUpdates:
1✔
330

1✔
331
                        // First, we'll add all the new UTXO's to the set of
1✔
332
                        // watched UTXO's, eliminating any duplicates in the
1✔
333
                        // process.
1✔
334
                        log.Tracef("Updating chain filter with new UTXO's: %v",
1✔
335
                                update.newUtxos)
1✔
336

1✔
337
                        b.filterMtx.Lock()
1✔
338
                        for _, newOp := range update.newUtxos {
2✔
339
                                b.chainFilter[newOp] = struct{}{}
1✔
340
                        }
1✔
341
                        b.filterMtx.Unlock()
1✔
342

1✔
343
                        // Apply the new TX filter to btcd, which will cause
1✔
344
                        // all following notifications from and calls to it
1✔
345
                        // return blocks filtered with the new filter.
1✔
346
                        b.btcdConn.LoadTxFilter(false, []btcutil.Address{},
1✔
347
                                update.newUtxos)
1✔
348

1✔
349
                        // All blocks gotten after we loaded the filter will
1✔
350
                        // have the filter applied, but we will need to rescan
1✔
351
                        // the blocks up to the height of the block we last
1✔
352
                        // added to the blockQueue.
1✔
353
                        b.bestHeightMtx.Lock()
1✔
354
                        bestHeight := b.bestHeight
1✔
355
                        b.bestHeightMtx.Unlock()
1✔
356

1✔
357
                        // If the update height matches our best known height,
1✔
358
                        // then we don't need to do any rewinding.
1✔
359
                        if update.updateHeight == bestHeight {
2✔
360
                                continue
1✔
361
                        }
362

363
                        // Otherwise, we'll rewind the state to ensure the
364
                        // caller doesn't miss any relevant notifications.
365
                        // Starting from the height _after_ the update height,
366
                        // we'll walk forwards, rescanning one block at a time
367
                        // with btcd applying the newly loaded filter to each
368
                        // block.
369
                        for i := update.updateHeight + 1; i < bestHeight+1; i++ {
×
370
                                blockHash, err := b.btcdConn.GetBlockHash(int64(i))
×
371
                                if err != nil {
×
372
                                        log.Warnf("Unable to get block hash "+
×
373
                                                "for block at height %d: %v",
×
374
                                                i, err)
×
375
                                        continue
×
376
                                }
377

378
                                // To avoid dealing with the case where a reorg
379
                                // is happening while we rescan, we scan one
380
                                // block at a time, skipping blocks that might
381
                                // have gone missing.
382
                                rescanned, err := b.btcdConn.RescanBlocks(
×
383
                                        []chainhash.Hash{*blockHash})
×
384
                                if err != nil {
×
385
                                        log.Warnf("Unable to rescan block "+
×
386
                                                "with hash %v at height %d: %v",
×
387
                                                blockHash, i, err)
×
388
                                        continue
×
389
                                }
390

391
                                // If no block was returned from the rescan, it
392
                                // means no matching transactions were found.
393
                                if len(rescanned) != 1 {
×
394
                                        log.Tracef("rescan of block %v at "+
×
395
                                                "height=%d yielded no "+
×
396
                                                "transactions", blockHash, i)
×
397
                                        continue
×
398
                                }
399
                                decoded, err := decodeJSONBlock(
×
400
                                        &rescanned[0], uint32(i))
×
401
                                if err != nil {
×
402
                                        log.Errorf("Unable to decode block: %v",
×
403
                                                err)
×
404
                                        continue
×
405
                                }
406
                                b.blockQueue.Add(&blockEvent{
×
407
                                        eventType: connected,
×
408
                                        block:     decoded,
×
409
                                })
×
410
                        }
411

412
                // We've received a new request to manually filter a block.
413
                case req := <-b.filterBlockReqs:
1✔
414
                        // First we'll fetch the block itself as well as some
1✔
415
                        // additional information including its height.
1✔
416
                        block, err := b.GetBlock(req.blockHash)
1✔
417
                        if err != nil {
1✔
418
                                req.err <- err
×
419
                                req.resp <- nil
×
420
                                continue
×
421
                        }
422
                        header, err := b.btcdConn.GetBlockHeaderVerbose(req.blockHash)
1✔
423
                        if err != nil {
1✔
424
                                req.err <- err
×
425
                                req.resp <- nil
×
426
                                continue
×
427
                        }
428

429
                        // Once we have this info, we can directly filter the
430
                        // block and dispatch the proper notification.
431
                        req.resp <- &FilteredBlock{
1✔
432
                                Hash:         *req.blockHash,
1✔
433
                                Height:       uint32(header.Height),
1✔
434
                                Transactions: filterBlock(block),
1✔
435
                        }
1✔
436
                        req.err <- err
1✔
437

438
                case <-b.quit:
1✔
439
                        return
1✔
440
                }
441
        }
442
}
443

444
// filterUpdate is a message sent to the chainFilterer to update the current
445
// chainFilter state.
446
type filterUpdate struct {
447
        newUtxos     []wire.OutPoint
448
        updateHeight uint32
449
}
450

451
// UpdateFilter updates the UTXO filter which is to be consulted when creating
452
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
453
// meaning repeated calls to this method should _expand_ the size of the UTXO
454
// sub-set currently being watched.  If the set updateHeight is _lower_ than
455
// the best known height of the implementation, then the state should be
456
// rewound to ensure all relevant notifications are dispatched.
457
//
458
// NOTE: This is part of the FilteredChainView interface.
459
func (b *BtcdFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
460
        updateHeight uint32) error {
1✔
461

1✔
462
        newUtxos := make([]wire.OutPoint, len(ops))
1✔
463
        for i, op := range ops {
2✔
464
                newUtxos[i] = op.OutPoint
1✔
465
        }
1✔
466

467
        select {
1✔
468

469
        case b.filterUpdates <- filterUpdate{
470
                newUtxos:     newUtxos,
471
                updateHeight: updateHeight,
472
        }:
1✔
473
                return nil
1✔
474

475
        case <-b.quit:
×
476
                return fmt.Errorf("chain filter shutting down")
×
477
        }
478
}
479

480
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
481
// Each time a block is connected to the end of a main chain, and appropriate
482
// FilteredBlock which contains the transactions which mutate our watched UTXO
483
// set is to be returned.
484
//
485
// NOTE: This is part of the FilteredChainView interface.
486
func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
1✔
487
        return b.blockQueue.newBlocks
1✔
488
}
1✔
489

490
// DisconnectedBlocks returns a receive only channel which will be sent upon
491
// with the empty filtered blocks of blocks which are disconnected from the
492
// main chain in the case of a re-org.
493
//
494
// NOTE: This is part of the FilteredChainView interface.
495
func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
496
        return b.blockQueue.staleBlocks
1✔
497
}
1✔
498

499
// GetBlock is used to retrieve the block with the given hash. This function
500
// wraps the blockCache's GetBlock function.
501
func (b *BtcdFilteredChainView) GetBlock(hash *chainhash.Hash) (
502
        *wire.MsgBlock, error) {
1✔
503

1✔
504
        return b.blockCache.GetBlock(hash, b.btcdConn.GetBlock)
1✔
505
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc