• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.78
/routing/chainview/neutrino.go
1
package chainview
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightninglabs/neutrino"
14
        "github.com/lightningnetwork/lnd/blockcache"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
)
18

19
// CfFilteredChainView is an implementation of the FilteredChainView interface
20
// which is supported by an underlying Bitcoin light client which supports
21
// client side filtering of Golomb Coded Sets. Rather than fetching all the
22
// blocks, the light client is able to query filters locally, to test if an
23
// item in a block modifies any of our watched set of UTXOs.
24
type CfFilteredChainView struct {
25
        started int32 // To be used atomically.
26
        stopped int32 // To be used atomically.
27

28
        // p2pNode is a pointer to the running GCS-filter supported Bitcoin
29
        // light clientl
30
        p2pNode *neutrino.ChainService
31

32
        // chainView is the active rescan which only watches our specified
33
        // sub-set of the UTXO set.
34
        chainView *neutrino.Rescan
35

36
        // rescanErrChan is the channel that any errors encountered during the
37
        // rescan will be sent over.
38
        rescanErrChan <-chan error
39

40
        // blockEventQueue is the ordered queue used to keep the order
41
        // of connected and disconnected blocks sent to the reader of the
42
        // chainView.
43
        blockQueue *blockEventQueue
44

45
        // blockCache is an LRU block cache.
46
        blockCache *blockcache.BlockCache
47

48
        // chainFilter is the
49
        filterMtx   sync.RWMutex
50
        chainFilter map[wire.OutPoint][]byte
51

52
        quit chan struct{}
53
        wg   sync.WaitGroup
54
}
55

56
// A compile time check to ensure CfFilteredChainView implements the
57
// chainview.FilteredChainView.
58
var _ FilteredChainView = (*CfFilteredChainView)(nil)
59

60
// NewCfFilteredChainView creates a new instance of the CfFilteredChainView
61
// which is connected to an active neutrino node.
62
//
63
// NOTE: The node should already be running and syncing before being passed into
64
// this function.
65
func NewCfFilteredChainView(node *neutrino.ChainService,
66
        blockCache *blockcache.BlockCache) (*CfFilteredChainView, error) {
2✔
67

2✔
68
        return &CfFilteredChainView{
2✔
69
                blockQueue:    newBlockEventQueue(),
2✔
70
                quit:          make(chan struct{}),
2✔
71
                rescanErrChan: make(chan error),
2✔
72
                chainFilter:   make(map[wire.OutPoint][]byte),
2✔
73
                p2pNode:       node,
2✔
74
                blockCache:    blockCache,
2✔
75
        }, nil
2✔
76
}
2✔
77

78
// Start kicks off the FilteredChainView implementation. This function must be
79
// called before any calls to UpdateFilter can be processed.
80
//
81
// NOTE: This is part of the FilteredChainView interface.
82
func (c *CfFilteredChainView) Start() error {
2✔
83
        // Already started?
2✔
84
        if atomic.AddInt32(&c.started, 1) != 1 {
2✔
85
                return nil
×
86
        }
×
87

88
        log.Infof("FilteredChainView starting")
2✔
89

2✔
90
        // First, we'll obtain the latest block height of the p2p node. We'll
2✔
91
        // start the auto-rescan from this point. Once a caller actually wishes
2✔
92
        // to register a chain view, the rescan state will be rewound
2✔
93
        // accordingly.
2✔
94
        startingPoint, err := c.p2pNode.BestBlock()
2✔
95
        if err != nil {
2✔
96
                return err
×
97
        }
×
98

99
        // Next, we'll create our set of rescan options. Currently it's
100
        // required that an user MUST set a addr/outpoint/txid when creating a
101
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
102
        // actually be matched.
103
        var zeroPoint neutrino.InputWithScript
2✔
104
        rescanOptions := []neutrino.RescanOption{
2✔
105
                neutrino.StartBlock(startingPoint),
2✔
106
                neutrino.QuitChan(c.quit),
2✔
107
                neutrino.NotificationHandlers(
2✔
108
                        rpcclient.NotificationHandlers{
2✔
109
                                OnFilteredBlockConnected:    c.onFilteredBlockConnected,
2✔
110
                                OnFilteredBlockDisconnected: c.onFilteredBlockDisconnected,
2✔
111
                        },
2✔
112
                ),
2✔
113
                neutrino.WatchInputs(zeroPoint),
2✔
114
        }
2✔
115

2✔
116
        // Finally, we'll create our rescan struct, start it, and launch all
2✔
117
        // the goroutines we need to operate this FilteredChainView instance.
2✔
118
        c.chainView = neutrino.NewRescan(
2✔
119
                &neutrino.RescanChainSource{
2✔
120
                        ChainService: c.p2pNode,
2✔
121
                },
2✔
122
                rescanOptions...,
2✔
123
        )
2✔
124
        c.rescanErrChan = c.chainView.Start()
2✔
125

2✔
126
        c.blockQueue.Start()
2✔
127

2✔
128
        c.wg.Add(1)
2✔
129
        go c.chainFilterer()
2✔
130

2✔
131
        return nil
2✔
132
}
133

134
// Stop signals all active goroutines for a graceful shutdown.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (c *CfFilteredChainView) Stop() error {
2✔
138
        log.Debug("CfFilteredChainView stopping")
2✔
139
        defer log.Debug("CfFilteredChainView stopped")
2✔
140

2✔
141
        // Already shutting down?
2✔
142
        if atomic.AddInt32(&c.stopped, 1) != 1 {
2✔
143
                return nil
×
144
        }
×
145

146
        close(c.quit)
2✔
147
        c.blockQueue.Stop()
2✔
148
        c.wg.Wait()
2✔
149

2✔
150
        return nil
2✔
151
}
152

153
// onFilteredBlockConnected is called for each block that's connected to the
154
// end of the main chain. Based on our current chain filter, the block may or
155
// may not include any relevant transactions.
156
func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
157
        header *wire.BlockHeader, txns []*btcutil.Tx) {
504✔
158

504✔
159
        mtxs := make([]*wire.MsgTx, len(txns))
504✔
160
        for i, tx := range txns {
507✔
161
                mtx := tx.MsgTx()
3✔
162
                mtxs[i] = mtx
3✔
163

3✔
164
                for _, txIn := range mtx.TxIn {
6✔
165
                        c.filterMtx.Lock()
3✔
166
                        delete(c.chainFilter, txIn.PreviousOutPoint)
3✔
167
                        c.filterMtx.Unlock()
3✔
168
                }
3✔
169

170
        }
171

172
        block := &FilteredBlock{
504✔
173
                Hash:         header.BlockHash(),
504✔
174
                Height:       uint32(height),
504✔
175
                Transactions: mtxs,
504✔
176
        }
504✔
177

504✔
178
        c.blockQueue.Add(&blockEvent{
504✔
179
                eventType: connected,
504✔
180
                block:     block,
504✔
181
        })
504✔
182
}
183

184
// onFilteredBlockDisconnected is a callback which is executed once a block is
185
// disconnected from the end of the main chain.
186
func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32,
187
        header *wire.BlockHeader) {
415✔
188

415✔
189
        log.Debugf("got disconnected block at height %d: %v", height,
415✔
190
                header.BlockHash())
415✔
191

415✔
192
        filteredBlock := &FilteredBlock{
415✔
193
                Hash:   header.BlockHash(),
415✔
194
                Height: uint32(height),
415✔
195
        }
415✔
196

415✔
197
        c.blockQueue.Add(&blockEvent{
415✔
198
                eventType: disconnected,
415✔
199
                block:     filteredBlock,
415✔
200
        })
415✔
201
}
415✔
202

203
// chainFilterer is the primary coordination goroutine within the
204
// CfFilteredChainView. This goroutine handles errors from the running rescan.
205
func (c *CfFilteredChainView) chainFilterer() {
2✔
206
        defer c.wg.Done()
2✔
207

2✔
208
        for {
4✔
209
                select {
2✔
210
                case err := <-c.rescanErrChan:
×
211
                        log.Errorf("Error encountered during rescan: %v", err)
×
212
                case <-c.quit:
2✔
213
                        return
2✔
214
                }
215
        }
216
}
217

218
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
219
// result of applying the current registered UTXO sub-set on the block
220
// corresponding to that block hash. If any watched UTXO's are spent by the
221
// selected lock, then the internal chainFilter will also be updated.
222
//
223
// NOTE: This is part of the FilteredChainView interface.
224
func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
225
        // First, we'll fetch the block header itself so we can obtain the
1✔
226
        // height which is part of our return value.
1✔
227
        blockHeight, err := c.p2pNode.GetBlockHeight(blockHash)
1✔
228
        if err != nil {
1✔
229
                return nil, err
×
230
        }
×
231

232
        filteredBlock := &FilteredBlock{
1✔
233
                Hash:   *blockHash,
1✔
234
                Height: uint32(blockHeight),
1✔
235
        }
1✔
236

1✔
237
        // If we don't have any items within our current chain filter, then we
1✔
238
        // can exit early as we don't need to fetch the filter.
1✔
239
        c.filterMtx.RLock()
1✔
240
        if len(c.chainFilter) == 0 {
1✔
UNCOV
241
                c.filterMtx.RUnlock()
×
UNCOV
242
                return filteredBlock, nil
×
UNCOV
243
        }
×
244
        c.filterMtx.RUnlock()
1✔
245

1✔
246
        // Next, using the block, hash, we'll fetch the compact filter for this
1✔
247
        // block. We only require the regular filter as we're just looking for
1✔
248
        // outpoint that have been spent.
1✔
249
        filter, err := c.p2pNode.GetCFilter(*blockHash, wire.GCSFilterRegular)
1✔
250
        if err != nil {
1✔
251
                return nil, fmt.Errorf("unable to fetch filter: %w", err)
×
252
        }
×
253

254
        // Before we can match the filter, we'll need to map each item in our
255
        // chain filter to the representation that included in the compact
256
        // filters.
257
        c.filterMtx.RLock()
1✔
258
        relevantPoints := make([][]byte, 0, len(c.chainFilter))
1✔
259
        for _, filterEntry := range c.chainFilter {
3✔
260
                relevantPoints = append(relevantPoints, filterEntry)
2✔
261
        }
2✔
262
        c.filterMtx.RUnlock()
1✔
263

1✔
264
        // With our relevant points constructed, we can finally match against
1✔
265
        // the retrieved filter.
1✔
266
        matched, err := filter.MatchAny(builder.DeriveKey(blockHash),
1✔
267
                relevantPoints)
1✔
268
        if err != nil {
1✔
269
                return nil, err
×
270
        }
×
271

272
        // If there wasn't a match, then we'll return the filtered block as is
273
        // (void of any transactions).
274
        if !matched {
1✔
UNCOV
275
                return filteredBlock, nil
×
UNCOV
276
        }
×
277

278
        // If we reach this point, then there was a match, so we'll need to
279
        // fetch the block itself so we can scan it for any actual matches (as
280
        // there's a fp rate).
281
        block, err := c.GetBlock(*blockHash)
1✔
282
        if err != nil {
1✔
283
                return nil, err
×
284
        }
×
285

286
        // Finally, we'll step through the block, input by input, to see if any
287
        // transactions spend any outputs from our watched sub-set of the UTXO
288
        // set.
289
        for _, tx := range block.Transactions() {
4✔
290
                for _, txIn := range tx.MsgTx().TxIn {
6✔
291
                        prevOp := txIn.PreviousOutPoint
3✔
292

3✔
293
                        c.filterMtx.RLock()
3✔
294
                        _, ok := c.chainFilter[prevOp]
3✔
295
                        c.filterMtx.RUnlock()
3✔
296

3✔
297
                        if ok {
5✔
298
                                filteredBlock.Transactions = append(
2✔
299
                                        filteredBlock.Transactions,
2✔
300
                                        tx.MsgTx().Copy(),
2✔
301
                                )
2✔
302

2✔
303
                                c.filterMtx.Lock()
2✔
304
                                delete(c.chainFilter, prevOp)
2✔
305
                                c.filterMtx.Unlock()
2✔
306

2✔
307
                                break
2✔
308
                        }
309
                }
310
        }
311

312
        return filteredBlock, nil
1✔
313
}
314

315
// UpdateFilter updates the UTXO filter which is to be consulted when creating
316
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
317
// meaning repeated calls to this method should _expand_ the size of the UTXO
318
// sub-set currently being watched.  If the set updateHeight is _lower_ than
319
// the best known height of the implementation, then the state should be
320
// rewound to ensure all relevant notifications are dispatched.
321
//
322
// NOTE: This is part of the FilteredChainView interface.
323
func (c *CfFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
324
        updateHeight uint32) error {
3✔
325

3✔
326
        log.Tracef("Updating chain filter with new UTXO's: %v", ops)
3✔
327

3✔
328
        // First, we'll update the current chain view, by adding any new
3✔
329
        // UTXO's, ignoring duplicates in the process.
3✔
330
        c.filterMtx.Lock()
3✔
331
        for _, op := range ops {
8✔
332
                c.chainFilter[op.OutPoint] = op.FundingPkScript
5✔
333
        }
5✔
334
        c.filterMtx.Unlock()
3✔
335

3✔
336
        inputs := make([]neutrino.InputWithScript, len(ops))
3✔
337
        for i, op := range ops {
8✔
338
                inputs[i] = neutrino.InputWithScript{
5✔
339
                        PkScript: op.FundingPkScript,
5✔
340
                        OutPoint: op.OutPoint,
5✔
341
                }
5✔
342
        }
5✔
343

344
        // With our internal chain view update, we'll craft a new update to the
345
        // chainView which includes our new UTXO's, and current update height.
346
        rescanUpdate := []neutrino.UpdateOption{
3✔
347
                neutrino.AddInputs(inputs...),
3✔
348
                neutrino.Rewind(updateHeight),
3✔
349
                neutrino.DisableDisconnectedNtfns(true),
3✔
350
        }
3✔
351
        err := c.chainView.Update(rescanUpdate...)
3✔
352
        if err != nil {
3✔
353
                return fmt.Errorf("unable to update rescan: %w", err)
×
354
        }
×
355
        return nil
3✔
356
}
357

358
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
359
// Each time a block is connected to the end of a main chain, and appropriate
360
// FilteredBlock which contains the transactions which mutate our watched UTXO
361
// set is to be returned.
362
//
363
// NOTE: This is part of the FilteredChainView interface.
364
func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
4✔
365
        return c.blockQueue.newBlocks
4✔
366
}
4✔
367

368
// DisconnectedBlocks returns a receive only channel which will be sent upon
369
// with the empty filtered blocks of blocks which are disconnected from the
370
// main chain in the case of a re-org.
371
//
372
// NOTE: This is part of the FilteredChainView interface.
373
func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
374
        return c.blockQueue.staleBlocks
1✔
375
}
1✔
376

377
// GetBlock is used to retrieve the block with the given hash. Since the block
378
// cache used by neutrino will be the same as that used by LND (since it is
379
// passed to neutrino on initialisation), the neutrino GetBlock method can be
380
// called directly since it already uses the block cache. However, neutrino
381
// does not lock the block cache mutex for the given block hash and so that is
382
// done here.
383
func (c *CfFilteredChainView) GetBlock(hash chainhash.Hash) (
384
        *btcutil.Block, error) {
1✔
385

1✔
386
        c.blockCache.HashMutex.Lock(lntypes.Hash(hash))
1✔
387
        defer c.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
1✔
388

1✔
389
        return c.p2pNode.GetBlock(hash)
1✔
390
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc