• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12280461253

11 Dec 2024 04:15PM UTC coverage: 48.9% (-0.6%) from 49.54%
12280461253

Pull #9344

github

ellemouton
htlcswitch+go.mod: use updated fn.ContextGuard

This commit updates the fn dep to the version containing the updates to
the ContextGuard implementation. Only the htlcswitch/link uses the guard
at the moment so this is updated to make use of the new implementation.
Pull Request #9344: htlcswitch+go.mod: use updated fn.ContextGuard

40 of 54 new or added lines in 2 files covered. (74.07%)

1322 existing lines in 35 files now uncovered.

99081 of 202620 relevant lines covered (48.9%)

1.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/routing/chainview/neutrino.go
1
package chainview
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightninglabs/neutrino"
14
        "github.com/lightningnetwork/lnd/blockcache"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
)
18

19
// CfFilteredChainView is an implementation of the FilteredChainView interface
20
// which is supported by an underlying Bitcoin light client which supports
21
// client side filtering of Golomb Coded Sets. Rather than fetching all the
22
// blocks, the light client is able to query filters locally, to test if an
23
// item in a block modifies any of our watched set of UTXOs.
24
type CfFilteredChainView struct {
25
        started int32 // To be used atomically.
26
        stopped int32 // To be used atomically.
27

28
        // p2pNode is a pointer to the running GCS-filter supported Bitcoin
29
        // light clientl
30
        p2pNode *neutrino.ChainService
31

32
        // chainView is the active rescan which only watches our specified
33
        // sub-set of the UTXO set.
34
        chainView *neutrino.Rescan
35

36
        // rescanErrChan is the channel that any errors encountered during the
37
        // rescan will be sent over.
38
        rescanErrChan <-chan error
39

40
        // blockEventQueue is the ordered queue used to keep the order
41
        // of connected and disconnected blocks sent to the reader of the
42
        // chainView.
43
        blockQueue *blockEventQueue
44

45
        // blockCache is an LRU block cache.
46
        blockCache *blockcache.BlockCache
47

48
        // chainFilter is the
49
        filterMtx   sync.RWMutex
50
        chainFilter map[wire.OutPoint][]byte
51

52
        quit chan struct{}
53
        wg   sync.WaitGroup
54
}
55

56
// A compile time check to ensure CfFilteredChainView implements the
57
// chainview.FilteredChainView.
58
var _ FilteredChainView = (*CfFilteredChainView)(nil)
59

60
// NewCfFilteredChainView creates a new instance of the CfFilteredChainView
61
// which is connected to an active neutrino node.
62
//
63
// NOTE: The node should already be running and syncing before being passed into
64
// this function.
65
func NewCfFilteredChainView(node *neutrino.ChainService,
UNCOV
66
        blockCache *blockcache.BlockCache) (*CfFilteredChainView, error) {
×
UNCOV
67

×
UNCOV
68
        return &CfFilteredChainView{
×
UNCOV
69
                blockQueue:    newBlockEventQueue(),
×
UNCOV
70
                quit:          make(chan struct{}),
×
UNCOV
71
                rescanErrChan: make(chan error),
×
UNCOV
72
                chainFilter:   make(map[wire.OutPoint][]byte),
×
UNCOV
73
                p2pNode:       node,
×
UNCOV
74
                blockCache:    blockCache,
×
UNCOV
75
        }, nil
×
UNCOV
76
}
×
77

78
// Start kicks off the FilteredChainView implementation. This function must be
79
// called before any calls to UpdateFilter can be processed.
80
//
81
// NOTE: This is part of the FilteredChainView interface.
UNCOV
82
func (c *CfFilteredChainView) Start() error {
×
UNCOV
83
        // Already started?
×
UNCOV
84
        if atomic.AddInt32(&c.started, 1) != 1 {
×
85
                return nil
×
86
        }
×
87

UNCOV
88
        log.Infof("FilteredChainView starting")
×
UNCOV
89

×
UNCOV
90
        // First, we'll obtain the latest block height of the p2p node. We'll
×
UNCOV
91
        // start the auto-rescan from this point. Once a caller actually wishes
×
UNCOV
92
        // to register a chain view, the rescan state will be rewound
×
UNCOV
93
        // accordingly.
×
UNCOV
94
        startingPoint, err := c.p2pNode.BestBlock()
×
UNCOV
95
        if err != nil {
×
96
                return err
×
97
        }
×
98

99
        // Next, we'll create our set of rescan options. Currently it's
100
        // required that an user MUST set a addr/outpoint/txid when creating a
101
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
102
        // actually be matched.
UNCOV
103
        var zeroPoint neutrino.InputWithScript
×
UNCOV
104
        rescanOptions := []neutrino.RescanOption{
×
UNCOV
105
                neutrino.StartBlock(startingPoint),
×
UNCOV
106
                neutrino.QuitChan(c.quit),
×
UNCOV
107
                neutrino.NotificationHandlers(
×
UNCOV
108
                        rpcclient.NotificationHandlers{
×
UNCOV
109
                                OnFilteredBlockConnected:    c.onFilteredBlockConnected,
×
UNCOV
110
                                OnFilteredBlockDisconnected: c.onFilteredBlockDisconnected,
×
UNCOV
111
                        },
×
UNCOV
112
                ),
×
UNCOV
113
                neutrino.WatchInputs(zeroPoint),
×
UNCOV
114
        }
×
UNCOV
115

×
UNCOV
116
        // Finally, we'll create our rescan struct, start it, and launch all
×
UNCOV
117
        // the goroutines we need to operate this FilteredChainView instance.
×
UNCOV
118
        c.chainView = neutrino.NewRescan(
×
UNCOV
119
                &neutrino.RescanChainSource{
×
UNCOV
120
                        ChainService: c.p2pNode,
×
UNCOV
121
                },
×
UNCOV
122
                rescanOptions...,
×
UNCOV
123
        )
×
UNCOV
124
        c.rescanErrChan = c.chainView.Start()
×
UNCOV
125

×
UNCOV
126
        c.blockQueue.Start()
×
UNCOV
127

×
UNCOV
128
        c.wg.Add(1)
×
UNCOV
129
        go c.chainFilterer()
×
UNCOV
130

×
UNCOV
131
        return nil
×
132
}
133

134
// Stop signals all active goroutines for a graceful shutdown.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
UNCOV
137
func (c *CfFilteredChainView) Stop() error {
×
UNCOV
138
        log.Debug("CfFilteredChainView stopping")
×
UNCOV
139
        defer log.Debug("CfFilteredChainView stopped")
×
UNCOV
140

×
UNCOV
141
        // Already shutting down?
×
UNCOV
142
        if atomic.AddInt32(&c.stopped, 1) != 1 {
×
143
                return nil
×
144
        }
×
145

UNCOV
146
        close(c.quit)
×
UNCOV
147
        c.blockQueue.Stop()
×
UNCOV
148
        c.wg.Wait()
×
UNCOV
149

×
UNCOV
150
        return nil
×
151
}
152

153
// onFilteredBlockConnected is called for each block that's connected to the
154
// end of the main chain. Based on our current chain filter, the block may or
155
// may not include any relevant transactions.
156
func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
UNCOV
157
        header *wire.BlockHeader, txns []*btcutil.Tx) {
×
UNCOV
158

×
UNCOV
159
        mtxs := make([]*wire.MsgTx, len(txns))
×
UNCOV
160
        for i, tx := range txns {
×
UNCOV
161
                mtx := tx.MsgTx()
×
UNCOV
162
                mtxs[i] = mtx
×
UNCOV
163

×
UNCOV
164
                for _, txIn := range mtx.TxIn {
×
UNCOV
165
                        c.filterMtx.Lock()
×
UNCOV
166
                        delete(c.chainFilter, txIn.PreviousOutPoint)
×
UNCOV
167
                        c.filterMtx.Unlock()
×
UNCOV
168
                }
×
169

170
        }
171

UNCOV
172
        block := &FilteredBlock{
×
UNCOV
173
                Hash:         header.BlockHash(),
×
UNCOV
174
                Height:       uint32(height),
×
UNCOV
175
                Transactions: mtxs,
×
UNCOV
176
        }
×
UNCOV
177

×
UNCOV
178
        c.blockQueue.Add(&blockEvent{
×
UNCOV
179
                eventType: connected,
×
UNCOV
180
                block:     block,
×
UNCOV
181
        })
×
182
}
183

184
// onFilteredBlockDisconnected is a callback which is executed once a block is
185
// disconnected from the end of the main chain.
186
func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32,
187
        header *wire.BlockHeader) {
×
188

×
189
        log.Debugf("got disconnected block at height %d: %v", height,
×
190
                header.BlockHash())
×
191

×
192
        filteredBlock := &FilteredBlock{
×
193
                Hash:   header.BlockHash(),
×
194
                Height: uint32(height),
×
195
        }
×
196

×
197
        c.blockQueue.Add(&blockEvent{
×
198
                eventType: disconnected,
×
199
                block:     filteredBlock,
×
200
        })
×
201
}
×
202

203
// chainFilterer is the primary coordination goroutine within the
204
// CfFilteredChainView. This goroutine handles errors from the running rescan.
UNCOV
205
func (c *CfFilteredChainView) chainFilterer() {
×
UNCOV
206
        defer c.wg.Done()
×
UNCOV
207

×
UNCOV
208
        for {
×
UNCOV
209
                select {
×
210
                case err := <-c.rescanErrChan:
×
211
                        log.Errorf("Error encountered during rescan: %v", err)
×
UNCOV
212
                case <-c.quit:
×
UNCOV
213
                        return
×
214
                }
215
        }
216
}
217

218
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
219
// result of applying the current registered UTXO sub-set on the block
220
// corresponding to that block hash. If any watched UTXO's are spent by the
221
// selected lock, then the internal chainFilter will also be updated.
222
//
223
// NOTE: This is part of the FilteredChainView interface.
UNCOV
224
func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
×
UNCOV
225
        // First, we'll fetch the block header itself so we can obtain the
×
UNCOV
226
        // height which is part of our return value.
×
UNCOV
227
        blockHeight, err := c.p2pNode.GetBlockHeight(blockHash)
×
UNCOV
228
        if err != nil {
×
229
                return nil, err
×
230
        }
×
231

UNCOV
232
        filteredBlock := &FilteredBlock{
×
UNCOV
233
                Hash:   *blockHash,
×
UNCOV
234
                Height: uint32(blockHeight),
×
UNCOV
235
        }
×
UNCOV
236

×
UNCOV
237
        // If we don't have any items within our current chain filter, then we
×
UNCOV
238
        // can exit early as we don't need to fetch the filter.
×
UNCOV
239
        c.filterMtx.RLock()
×
UNCOV
240
        if len(c.chainFilter) == 0 {
×
UNCOV
241
                c.filterMtx.RUnlock()
×
UNCOV
242
                return filteredBlock, nil
×
UNCOV
243
        }
×
UNCOV
244
        c.filterMtx.RUnlock()
×
UNCOV
245

×
UNCOV
246
        // Next, using the block, hash, we'll fetch the compact filter for this
×
UNCOV
247
        // block. We only require the regular filter as we're just looking for
×
UNCOV
248
        // outpoint that have been spent.
×
UNCOV
249
        filter, err := c.p2pNode.GetCFilter(*blockHash, wire.GCSFilterRegular)
×
UNCOV
250
        if err != nil {
×
251
                return nil, fmt.Errorf("unable to fetch filter: %w", err)
×
252
        }
×
253

254
        // Before we can match the filter, we'll need to map each item in our
255
        // chain filter to the representation that included in the compact
256
        // filters.
UNCOV
257
        c.filterMtx.RLock()
×
UNCOV
258
        relevantPoints := make([][]byte, 0, len(c.chainFilter))
×
UNCOV
259
        for _, filterEntry := range c.chainFilter {
×
UNCOV
260
                relevantPoints = append(relevantPoints, filterEntry)
×
UNCOV
261
        }
×
UNCOV
262
        c.filterMtx.RUnlock()
×
UNCOV
263

×
UNCOV
264
        // With our relevant points constructed, we can finally match against
×
UNCOV
265
        // the retrieved filter.
×
UNCOV
266
        matched, err := filter.MatchAny(builder.DeriveKey(blockHash),
×
UNCOV
267
                relevantPoints)
×
UNCOV
268
        if err != nil {
×
269
                return nil, err
×
270
        }
×
271

272
        // If there wasn't a match, then we'll return the filtered block as is
273
        // (void of any transactions).
UNCOV
274
        if !matched {
×
UNCOV
275
                return filteredBlock, nil
×
UNCOV
276
        }
×
277

278
        // If we reach this point, then there was a match, so we'll need to
279
        // fetch the block itself so we can scan it for any actual matches (as
280
        // there's a fp rate).
UNCOV
281
        block, err := c.GetBlock(*blockHash)
×
UNCOV
282
        if err != nil {
×
283
                return nil, err
×
284
        }
×
285

286
        // Finally, we'll step through the block, input by input, to see if any
287
        // transactions spend any outputs from our watched sub-set of the UTXO
288
        // set.
UNCOV
289
        for _, tx := range block.Transactions() {
×
UNCOV
290
                for _, txIn := range tx.MsgTx().TxIn {
×
UNCOV
291
                        prevOp := txIn.PreviousOutPoint
×
UNCOV
292

×
UNCOV
293
                        c.filterMtx.RLock()
×
UNCOV
294
                        _, ok := c.chainFilter[prevOp]
×
UNCOV
295
                        c.filterMtx.RUnlock()
×
UNCOV
296

×
UNCOV
297
                        if ok {
×
UNCOV
298
                                filteredBlock.Transactions = append(
×
UNCOV
299
                                        filteredBlock.Transactions,
×
UNCOV
300
                                        tx.MsgTx().Copy(),
×
UNCOV
301
                                )
×
UNCOV
302

×
UNCOV
303
                                c.filterMtx.Lock()
×
UNCOV
304
                                delete(c.chainFilter, prevOp)
×
UNCOV
305
                                c.filterMtx.Unlock()
×
UNCOV
306

×
UNCOV
307
                                break
×
308
                        }
309
                }
310
        }
311

UNCOV
312
        return filteredBlock, nil
×
313
}
314

315
// UpdateFilter updates the UTXO filter which is to be consulted when creating
316
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
317
// meaning repeated calls to this method should _expand_ the size of the UTXO
318
// sub-set currently being watched.  If the set updateHeight is _lower_ than
319
// the best known height of the implementation, then the state should be
320
// rewound to ensure all relevant notifications are dispatched.
321
//
322
// NOTE: This is part of the FilteredChainView interface.
323
func (c *CfFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
UNCOV
324
        updateHeight uint32) error {
×
UNCOV
325

×
UNCOV
326
        log.Tracef("Updating chain filter with new UTXO's: %v", ops)
×
UNCOV
327

×
UNCOV
328
        // First, we'll update the current chain view, by adding any new
×
UNCOV
329
        // UTXO's, ignoring duplicates in the process.
×
UNCOV
330
        c.filterMtx.Lock()
×
UNCOV
331
        for _, op := range ops {
×
UNCOV
332
                c.chainFilter[op.OutPoint] = op.FundingPkScript
×
UNCOV
333
        }
×
UNCOV
334
        c.filterMtx.Unlock()
×
UNCOV
335

×
UNCOV
336
        inputs := make([]neutrino.InputWithScript, len(ops))
×
UNCOV
337
        for i, op := range ops {
×
UNCOV
338
                inputs[i] = neutrino.InputWithScript{
×
UNCOV
339
                        PkScript: op.FundingPkScript,
×
UNCOV
340
                        OutPoint: op.OutPoint,
×
UNCOV
341
                }
×
UNCOV
342
        }
×
343

344
        // With our internal chain view update, we'll craft a new update to the
345
        // chainView which includes our new UTXO's, and current update height.
UNCOV
346
        rescanUpdate := []neutrino.UpdateOption{
×
UNCOV
347
                neutrino.AddInputs(inputs...),
×
UNCOV
348
                neutrino.Rewind(updateHeight),
×
UNCOV
349
                neutrino.DisableDisconnectedNtfns(true),
×
UNCOV
350
        }
×
UNCOV
351
        err := c.chainView.Update(rescanUpdate...)
×
UNCOV
352
        if err != nil {
×
353
                return fmt.Errorf("unable to update rescan: %w", err)
×
354
        }
×
UNCOV
355
        return nil
×
356
}
357

358
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
359
// Each time a block is connected to the end of a main chain, and appropriate
360
// FilteredBlock which contains the transactions which mutate our watched UTXO
361
// set is to be returned.
362
//
363
// NOTE: This is part of the FilteredChainView interface.
UNCOV
364
func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
×
UNCOV
365
        return c.blockQueue.newBlocks
×
UNCOV
366
}
×
367

368
// DisconnectedBlocks returns a receive only channel which will be sent upon
369
// with the empty filtered blocks of blocks which are disconnected from the
370
// main chain in the case of a re-org.
371
//
372
// NOTE: This is part of the FilteredChainView interface.
UNCOV
373
func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
×
UNCOV
374
        return c.blockQueue.staleBlocks
×
UNCOV
375
}
×
376

377
// GetBlock is used to retrieve the block with the given hash. Since the block
378
// cache used by neutrino will be the same as that used by LND (since it is
379
// passed to neutrino on initialisation), the neutrino GetBlock method can be
380
// called directly since it already uses the block cache. However, neutrino
381
// does not lock the block cache mutex for the given block hash and so that is
382
// done here.
383
func (c *CfFilteredChainView) GetBlock(hash chainhash.Hash) (
UNCOV
384
        *btcutil.Block, error) {
×
UNCOV
385

×
UNCOV
386
        c.blockCache.HashMutex.Lock(lntypes.Hash(hash))
×
UNCOV
387
        defer c.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
×
UNCOV
388

×
UNCOV
389
        return c.p2pNode.GetBlock(hash)
×
UNCOV
390
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc