• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13586005509

28 Feb 2025 10:14AM UTC coverage: 68.629% (+9.9%) from 58.77%
13586005509

Pull #9521

github

web-flow
Merge 37d3a70a5 into 8532955b3
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129950 of 189351 relevant lines covered (68.63%)

23726.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/routing/chainview/neutrino.go
1
package chainview
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightninglabs/neutrino"
14
        "github.com/lightningnetwork/lnd/blockcache"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
)
18

19
// CfFilteredChainView is an implementation of the FilteredChainView interface
20
// which is supported by an underlying Bitcoin light client which supports
21
// client side filtering of Golomb Coded Sets. Rather than fetching all the
22
// blocks, the light client is able to query filters locally, to test if an
23
// item in a block modifies any of our watched set of UTXOs.
24
type CfFilteredChainView struct {
25
        started int32 // To be used atomically.
26
        stopped int32 // To be used atomically.
27

28
        // p2pNode is a pointer to the running GCS-filter supported Bitcoin
29
        // light clientl
30
        p2pNode *neutrino.ChainService
31

32
        // chainView is the active rescan which only watches our specified
33
        // sub-set of the UTXO set.
34
        chainView *neutrino.Rescan
35

36
        // rescanErrChan is the channel that any errors encountered during the
37
        // rescan will be sent over.
38
        rescanErrChan <-chan error
39

40
        // blockEventQueue is the ordered queue used to keep the order
41
        // of connected and disconnected blocks sent to the reader of the
42
        // chainView.
43
        blockQueue *blockEventQueue
44

45
        // blockCache is an LRU block cache.
46
        blockCache *blockcache.BlockCache
47

48
        // chainFilter is the
49
        filterMtx   sync.RWMutex
50
        chainFilter map[wire.OutPoint][]byte
51

52
        quit chan struct{}
53
        wg   sync.WaitGroup
54
}
55

56
// A compile time check to ensure CfFilteredChainView implements the
57
// chainview.FilteredChainView.
58
var _ FilteredChainView = (*CfFilteredChainView)(nil)
59

60
// NewCfFilteredChainView creates a new instance of the CfFilteredChainView
61
// which is connected to an active neutrino node.
62
//
63
// NOTE: The node should already be running and syncing before being passed into
64
// this function.
65
func NewCfFilteredChainView(node *neutrino.ChainService,
66
        blockCache *blockcache.BlockCache) (*CfFilteredChainView, error) {
6✔
67

6✔
68
        return &CfFilteredChainView{
6✔
69
                blockQueue:    newBlockEventQueue(),
6✔
70
                quit:          make(chan struct{}),
6✔
71
                rescanErrChan: make(chan error),
6✔
72
                chainFilter:   make(map[wire.OutPoint][]byte),
6✔
73
                p2pNode:       node,
6✔
74
                blockCache:    blockCache,
6✔
75
        }, nil
6✔
76
}
6✔
77

78
// Start kicks off the FilteredChainView implementation. This function must be
79
// called before any calls to UpdateFilter can be processed.
80
//
81
// NOTE: This is part of the FilteredChainView interface.
82
func (c *CfFilteredChainView) Start() error {
6✔
83
        // Already started?
6✔
84
        if atomic.AddInt32(&c.started, 1) != 1 {
6✔
85
                return nil
×
86
        }
×
87

88
        log.Infof("FilteredChainView starting")
6✔
89

6✔
90
        // First, we'll obtain the latest block height of the p2p node. We'll
6✔
91
        // start the auto-rescan from this point. Once a caller actually wishes
6✔
92
        // to register a chain view, the rescan state will be rewound
6✔
93
        // accordingly.
6✔
94
        startingPoint, err := c.p2pNode.BestBlock()
6✔
95
        if err != nil {
6✔
96
                return err
×
97
        }
×
98

99
        // Next, we'll create our set of rescan options. Currently it's
100
        // required that an user MUST set a addr/outpoint/txid when creating a
101
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
102
        // actually be matched.
103
        var zeroPoint neutrino.InputWithScript
6✔
104
        rescanOptions := []neutrino.RescanOption{
6✔
105
                neutrino.StartBlock(startingPoint),
6✔
106
                neutrino.QuitChan(c.quit),
6✔
107
                neutrino.NotificationHandlers(
6✔
108
                        rpcclient.NotificationHandlers{
6✔
109
                                OnFilteredBlockConnected:    c.onFilteredBlockConnected,
6✔
110
                                OnFilteredBlockDisconnected: c.onFilteredBlockDisconnected,
6✔
111
                        },
6✔
112
                ),
6✔
113
                neutrino.WatchInputs(zeroPoint),
6✔
114
        }
6✔
115

6✔
116
        // Finally, we'll create our rescan struct, start it, and launch all
6✔
117
        // the goroutines we need to operate this FilteredChainView instance.
6✔
118
        c.chainView = neutrino.NewRescan(
6✔
119
                &neutrino.RescanChainSource{
6✔
120
                        ChainService: c.p2pNode,
6✔
121
                },
6✔
122
                rescanOptions...,
6✔
123
        )
6✔
124
        c.rescanErrChan = c.chainView.Start()
6✔
125

6✔
126
        c.blockQueue.Start()
6✔
127

6✔
128
        c.wg.Add(1)
6✔
129
        go c.chainFilterer()
6✔
130

6✔
131
        return nil
6✔
132
}
133

134
// Stop signals all active goroutines for a graceful shutdown.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (c *CfFilteredChainView) Stop() error {
6✔
138
        log.Debug("CfFilteredChainView stopping")
6✔
139
        defer log.Debug("CfFilteredChainView stopped")
6✔
140

6✔
141
        // Already shutting down?
6✔
142
        if atomic.AddInt32(&c.stopped, 1) != 1 {
6✔
143
                return nil
×
144
        }
×
145

146
        close(c.quit)
6✔
147
        c.blockQueue.Stop()
6✔
148
        c.wg.Wait()
6✔
149

6✔
150
        return nil
6✔
151
}
152

153
// onFilteredBlockConnected is called for each block that's connected to the
154
// end of the main chain. Based on our current chain filter, the block may or
155
// may not include any relevant transactions.
156
func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
157
        header *wire.BlockHeader, txns []*btcutil.Tx) {
464✔
158

464✔
159
        mtxs := make([]*wire.MsgTx, len(txns))
464✔
160
        for i, tx := range txns {
468✔
161
                mtx := tx.MsgTx()
4✔
162
                mtxs[i] = mtx
4✔
163

4✔
164
                for _, txIn := range mtx.TxIn {
8✔
165
                        c.filterMtx.Lock()
4✔
166
                        delete(c.chainFilter, txIn.PreviousOutPoint)
4✔
167
                        c.filterMtx.Unlock()
4✔
168
                }
4✔
169

170
        }
171

172
        block := &FilteredBlock{
464✔
173
                Hash:         header.BlockHash(),
464✔
174
                Height:       uint32(height),
464✔
175
                Transactions: mtxs,
464✔
176
        }
464✔
177

464✔
178
        c.blockQueue.Add(&blockEvent{
464✔
179
                eventType: connected,
464✔
180
                block:     block,
464✔
181
        })
464✔
182
}
183

184
// onFilteredBlockDisconnected is a callback which is executed once a block is
185
// disconnected from the end of the main chain.
186
func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32,
187
        header *wire.BlockHeader) {
415✔
188

415✔
189
        log.Debugf("got disconnected block at height %d: %v", height,
415✔
190
                header.BlockHash())
415✔
191

415✔
192
        filteredBlock := &FilteredBlock{
415✔
193
                Hash:   header.BlockHash(),
415✔
194
                Height: uint32(height),
415✔
195
        }
415✔
196

415✔
197
        c.blockQueue.Add(&blockEvent{
415✔
198
                eventType: disconnected,
415✔
199
                block:     filteredBlock,
415✔
200
        })
415✔
201
}
415✔
202

203
// chainFilterer is the primary coordination goroutine within the
204
// CfFilteredChainView. This goroutine handles errors from the running rescan.
205
func (c *CfFilteredChainView) chainFilterer() {
6✔
206
        defer c.wg.Done()
6✔
207

6✔
208
        for {
12✔
209
                select {
6✔
210
                case err := <-c.rescanErrChan:
×
211
                        log.Errorf("Error encountered during rescan: %v", err)
×
212
                case <-c.quit:
6✔
213
                        return
6✔
214
                }
215
        }
216
}
217

218
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
219
// result of applying the current registered UTXO sub-set on the block
220
// corresponding to that block hash. If any watched UTXO's are spent by the
221
// selected lock, then the internal chainFilter will also be updated.
222
//
223
// NOTE: This is part of the FilteredChainView interface.
224
func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
2✔
225
        // First, we'll fetch the block header itself so we can obtain the
2✔
226
        // height which is part of our return value.
2✔
227
        blockHeight, err := c.p2pNode.GetBlockHeight(blockHash)
2✔
228
        if err != nil {
2✔
229
                return nil, err
×
230
        }
×
231

232
        filteredBlock := &FilteredBlock{
2✔
233
                Hash:   *blockHash,
2✔
234
                Height: uint32(blockHeight),
2✔
235
        }
2✔
236

2✔
237
        // If we don't have any items within our current chain filter, then we
2✔
238
        // can exit early as we don't need to fetch the filter.
2✔
239
        c.filterMtx.RLock()
2✔
240
        if len(c.chainFilter) == 0 {
3✔
241
                c.filterMtx.RUnlock()
1✔
242
                return filteredBlock, nil
1✔
243
        }
1✔
244
        c.filterMtx.RUnlock()
2✔
245

2✔
246
        // Next, using the block, hash, we'll fetch the compact filter for this
2✔
247
        // block. We only require the regular filter as we're just looking for
2✔
248
        // outpoint that have been spent.
2✔
249
        filter, err := c.p2pNode.GetCFilter(*blockHash, wire.GCSFilterRegular)
2✔
250
        if err != nil {
2✔
251
                return nil, fmt.Errorf("unable to fetch filter: %w", err)
×
252
        }
×
253

254
        // Before we can match the filter, we'll need to map each item in our
255
        // chain filter to the representation that included in the compact
256
        // filters.
257
        c.filterMtx.RLock()
2✔
258
        relevantPoints := make([][]byte, 0, len(c.chainFilter))
2✔
259
        for _, filterEntry := range c.chainFilter {
5✔
260
                relevantPoints = append(relevantPoints, filterEntry)
3✔
261
        }
3✔
262
        c.filterMtx.RUnlock()
2✔
263

2✔
264
        // With our relevant points constructed, we can finally match against
2✔
265
        // the retrieved filter.
2✔
266
        matched, err := filter.MatchAny(builder.DeriveKey(blockHash),
2✔
267
                relevantPoints)
2✔
268
        if err != nil {
2✔
269
                return nil, err
×
270
        }
×
271

272
        // If there wasn't a match, then we'll return the filtered block as is
273
        // (void of any transactions).
274
        if !matched {
3✔
275
                return filteredBlock, nil
1✔
276
        }
1✔
277

278
        // If we reach this point, then there was a match, so we'll need to
279
        // fetch the block itself so we can scan it for any actual matches (as
280
        // there's a fp rate).
281
        block, err := c.GetBlock(*blockHash)
2✔
282
        if err != nil {
2✔
283
                return nil, err
×
284
        }
×
285

286
        // Finally, we'll step through the block, input by input, to see if any
287
        // transactions spend any outputs from our watched sub-set of the UTXO
288
        // set.
289
        for _, tx := range block.Transactions() {
6✔
290
                for _, txIn := range tx.MsgTx().TxIn {
8✔
291
                        prevOp := txIn.PreviousOutPoint
4✔
292

4✔
293
                        c.filterMtx.RLock()
4✔
294
                        _, ok := c.chainFilter[prevOp]
4✔
295
                        c.filterMtx.RUnlock()
4✔
296

4✔
297
                        if ok {
7✔
298
                                filteredBlock.Transactions = append(
3✔
299
                                        filteredBlock.Transactions,
3✔
300
                                        tx.MsgTx().Copy(),
3✔
301
                                )
3✔
302

3✔
303
                                c.filterMtx.Lock()
3✔
304
                                delete(c.chainFilter, prevOp)
3✔
305
                                c.filterMtx.Unlock()
3✔
306

3✔
307
                                break
3✔
308
                        }
309
                }
310
        }
311

312
        return filteredBlock, nil
2✔
313
}
314

315
// UpdateFilter updates the UTXO filter which is to be consulted when creating
316
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
317
// meaning repeated calls to this method should _expand_ the size of the UTXO
318
// sub-set currently being watched.  If the set updateHeight is _lower_ than
319
// the best known height of the implementation, then the state should be
320
// rewound to ensure all relevant notifications are dispatched.
321
//
322
// NOTE: This is part of the FilteredChainView interface.
323
func (c *CfFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
324
        updateHeight uint32) error {
4✔
325

4✔
326
        log.Tracef("Updating chain filter with new UTXO's: %v", ops)
4✔
327

4✔
328
        // First, we'll update the current chain view, by adding any new
4✔
329
        // UTXO's, ignoring duplicates in the process.
4✔
330
        c.filterMtx.Lock()
4✔
331
        for _, op := range ops {
10✔
332
                c.chainFilter[op.OutPoint] = op.FundingPkScript
6✔
333
        }
6✔
334
        c.filterMtx.Unlock()
4✔
335

4✔
336
        inputs := make([]neutrino.InputWithScript, len(ops))
4✔
337
        for i, op := range ops {
10✔
338
                inputs[i] = neutrino.InputWithScript{
6✔
339
                        PkScript: op.FundingPkScript,
6✔
340
                        OutPoint: op.OutPoint,
6✔
341
                }
6✔
342
        }
6✔
343

344
        // With our internal chain view update, we'll craft a new update to the
345
        // chainView which includes our new UTXO's, and current update height.
346
        rescanUpdate := []neutrino.UpdateOption{
4✔
347
                neutrino.AddInputs(inputs...),
4✔
348
                neutrino.Rewind(updateHeight),
4✔
349
                neutrino.DisableDisconnectedNtfns(true),
4✔
350
        }
4✔
351
        err := c.chainView.Update(rescanUpdate...)
4✔
352
        if err != nil {
4✔
353
                return fmt.Errorf("unable to update rescan: %w", err)
×
354
        }
×
355
        return nil
4✔
356
}
357

358
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
359
// Each time a block is connected to the end of a main chain, and appropriate
360
// FilteredBlock which contains the transactions which mutate our watched UTXO
361
// set is to be returned.
362
//
363
// NOTE: This is part of the FilteredChainView interface.
364
func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
5✔
365
        return c.blockQueue.newBlocks
5✔
366
}
5✔
367

368
// DisconnectedBlocks returns a receive only channel which will be sent upon
369
// with the empty filtered blocks of blocks which are disconnected from the
370
// main chain in the case of a re-org.
371
//
372
// NOTE: This is part of the FilteredChainView interface.
373
func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
2✔
374
        return c.blockQueue.staleBlocks
2✔
375
}
2✔
376

377
// GetBlock is used to retrieve the block with the given hash. Since the block
378
// cache used by neutrino will be the same as that used by LND (since it is
379
// passed to neutrino on initialisation), the neutrino GetBlock method can be
380
// called directly since it already uses the block cache. However, neutrino
381
// does not lock the block cache mutex for the given block hash and so that is
382
// done here.
383
func (c *CfFilteredChainView) GetBlock(hash chainhash.Hash) (
384
        *btcutil.Block, error) {
2✔
385

2✔
386
        c.blockCache.HashMutex.Lock(lntypes.Hash(hash))
2✔
387
        defer c.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
2✔
388

2✔
389
        return c.p2pNode.GetBlock(hash)
2✔
390
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc