• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.78
/routing/chainview/neutrino.go
1
package chainview
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/btcsuite/btcd/btcutil/gcs/builder"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightninglabs/neutrino"
14
        "github.com/lightningnetwork/lnd/blockcache"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
)
18

19
// CfFilteredChainView is an implementation of the FilteredChainView interface
20
// which is supported by an underlying Bitcoin light client which supports
21
// client side filtering of Golomb Coded Sets. Rather than fetching all the
22
// blocks, the light client is able to query filters locally, to test if an
23
// item in a block modifies any of our watched set of UTXOs.
24
type CfFilteredChainView struct {
25
        started int32 // To be used atomically.
26
        stopped int32 // To be used atomically.
27

28
        // p2pNode is a pointer to the running GCS-filter supported Bitcoin
29
        // light clientl
30
        p2pNode *neutrino.ChainService
31

32
        // chainView is the active rescan which only watches our specified
33
        // sub-set of the UTXO set.
34
        chainView *neutrino.Rescan
35

36
        // rescanErrChan is the channel that any errors encountered during the
37
        // rescan will be sent over.
38
        rescanErrChan <-chan error
39

40
        // blockEventQueue is the ordered queue used to keep the order
41
        // of connected and disconnected blocks sent to the reader of the
42
        // chainView.
43
        blockQueue *blockEventQueue
44

45
        // blockCache is an LRU block cache.
46
        blockCache *blockcache.BlockCache
47

48
        // chainFilter is the
49
        filterMtx   sync.RWMutex
50
        chainFilter map[wire.OutPoint][]byte
51

52
        quit chan struct{}
53
        wg   sync.WaitGroup
54
}
55

56
// A compile time check to ensure CfFilteredChainView implements the
57
// chainview.FilteredChainView.
58
var _ FilteredChainView = (*CfFilteredChainView)(nil)
59

60
// NewCfFilteredChainView creates a new instance of the CfFilteredChainView
61
// which is connected to an active neutrino node.
62
//
63
// NOTE: The node should already be running and syncing before being passed into
64
// this function.
65
func NewCfFilteredChainView(node *neutrino.ChainService,
66
        blockCache *blockcache.BlockCache) (*CfFilteredChainView, error) {
2✔
67

2✔
68
        return &CfFilteredChainView{
2✔
69
                blockQueue:    newBlockEventQueue(),
2✔
70
                quit:          make(chan struct{}),
2✔
71
                rescanErrChan: make(chan error),
2✔
72
                chainFilter:   make(map[wire.OutPoint][]byte),
2✔
73
                p2pNode:       node,
2✔
74
                blockCache:    blockCache,
2✔
75
        }, nil
2✔
76
}
2✔
77

78
// Start kicks off the FilteredChainView implementation. This function must be
79
// called before any calls to UpdateFilter can be processed.
80
//
81
// NOTE: This is part of the FilteredChainView interface.
82
func (c *CfFilteredChainView) Start() error {
2✔
83
        // Already started?
2✔
84
        if atomic.AddInt32(&c.started, 1) != 1 {
2✔
85
                return nil
×
86
        }
×
87

88
        log.Infof("FilteredChainView starting")
2✔
89

2✔
90
        // First, we'll obtain the latest block height of the p2p node. We'll
2✔
91
        // start the auto-rescan from this point. Once a caller actually wishes
2✔
92
        // to register a chain view, the rescan state will be rewound
2✔
93
        // accordingly.
2✔
94
        startingPoint, err := c.p2pNode.BestBlock()
2✔
95
        if err != nil {
2✔
96
                return err
×
97
        }
×
98

99
        // Next, we'll create our set of rescan options. Currently it's
100
        // required that an user MUST set a addr/outpoint/txid when creating a
101
        // rescan. To get around this, we'll add a "zero" outpoint, that won't
102
        // actually be matched.
103
        var zeroPoint neutrino.InputWithScript
2✔
104
        rescanOptions := []neutrino.RescanOption{
2✔
105
                neutrino.StartBlock(startingPoint),
2✔
106
                neutrino.QuitChan(c.quit),
2✔
107
                neutrino.NotificationHandlers(
2✔
108
                        rpcclient.NotificationHandlers{
2✔
109
                                OnFilteredBlockConnected:    c.onFilteredBlockConnected,
2✔
110
                                OnFilteredBlockDisconnected: c.onFilteredBlockDisconnected,
2✔
111
                        },
2✔
112
                ),
2✔
113
                neutrino.WatchInputs(zeroPoint),
2✔
114
        }
2✔
115

2✔
116
        // Finally, we'll create our rescan struct, start it, and launch all
2✔
117
        // the goroutines we need to operate this FilteredChainView instance.
2✔
118
        c.chainView = neutrino.NewRescan(
2✔
119
                &neutrino.RescanChainSource{
2✔
120
                        ChainService: c.p2pNode,
2✔
121
                },
2✔
122
                rescanOptions...,
2✔
123
        )
2✔
124
        c.rescanErrChan = c.chainView.Start()
2✔
125

2✔
126
        c.blockQueue.Start()
2✔
127

2✔
128
        c.wg.Add(1)
2✔
129
        go c.chainFilterer()
2✔
130

2✔
131
        return nil
2✔
132
}
133

134
// Stop signals all active goroutines for a graceful shutdown.
135
//
136
// NOTE: This is part of the FilteredChainView interface.
137
func (c *CfFilteredChainView) Stop() error {
2✔
138
        log.Debug("CfFilteredChainView stopping")
2✔
139
        defer log.Debug("CfFilteredChainView stopped")
2✔
140

2✔
141
        // Already shutting down?
2✔
142
        if atomic.AddInt32(&c.stopped, 1) != 1 {
2✔
143
                return nil
×
144
        }
×
145

146
        close(c.quit)
2✔
147
        c.blockQueue.Stop()
2✔
148
        c.wg.Wait()
2✔
149

2✔
150
        return nil
2✔
151
}
152

153
// onFilteredBlockConnected is called for each block that's connected to the
154
// end of the main chain. Based on our current chain filter, the block may or
155
// may not include any relevant transactions.
156
func (c *CfFilteredChainView) onFilteredBlockConnected(height int32,
157
        header *wire.BlockHeader, txns []*btcutil.Tx) {
504✔
158

504✔
159
        mtxs := make([]*wire.MsgTx, len(txns))
504✔
160
        for i, tx := range txns {
507✔
161
                mtx := tx.MsgTx()
3✔
162
                mtxs[i] = mtx
3✔
163

3✔
164
                for _, txIn := range mtx.TxIn {
6✔
165
                        c.filterMtx.Lock()
3✔
166
                        delete(c.chainFilter, txIn.PreviousOutPoint)
3✔
167
                        c.filterMtx.Unlock()
3✔
168
                }
3✔
169

170
        }
171

172
        block := &FilteredBlock{
504✔
173
                Hash:         header.BlockHash(),
504✔
174
                Height:       uint32(height),
504✔
175
                Transactions: mtxs,
504✔
176
        }
504✔
177

504✔
178
        c.blockQueue.Add(&blockEvent{
504✔
179
                eventType: connected,
504✔
180
                block:     block,
504✔
181
        })
504✔
182
}
183

184
// onFilteredBlockDisconnected is a callback which is executed once a block is
185
// disconnected from the end of the main chain.
186
func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32,
187
        header *wire.BlockHeader) {
415✔
188

415✔
189
        log.Debugf("got disconnected block at height %d: %v", height,
415✔
190
                header.BlockHash())
415✔
191

415✔
192
        filteredBlock := &FilteredBlock{
415✔
193
                Hash:   header.BlockHash(),
415✔
194
                Height: uint32(height),
415✔
195
        }
415✔
196

415✔
197
        c.blockQueue.Add(&blockEvent{
415✔
198
                eventType: disconnected,
415✔
199
                block:     filteredBlock,
415✔
200
        })
415✔
201
}
415✔
202

203
// chainFilterer is the primary coordination goroutine within the
204
// CfFilteredChainView. This goroutine handles errors from the running rescan.
205
func (c *CfFilteredChainView) chainFilterer() {
2✔
206
        defer c.wg.Done()
2✔
207

2✔
208
        for {
4✔
209
                select {
2✔
210
                case err := <-c.rescanErrChan:
×
211
                        log.Errorf("Error encountered during rescan: %v", err)
×
212
                case <-c.quit:
2✔
213
                        return
2✔
214
                }
215
        }
216
}
217

218
// FilterBlock takes a block hash, and returns a FilteredBlocks which is the
219
// result of applying the current registered UTXO sub-set on the block
220
// corresponding to that block hash. If any watched UTXO's are spent by the
221
// selected lock, then the internal chainFilter will also be updated.
222
//
223
// NOTE: This is part of the FilteredChainView interface.
224
func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredBlock, error) {
1✔
225
        // First, we'll fetch the block header itself so we can obtain the
1✔
226
        // height which is part of our return value.
1✔
227
        blockHeight, err := c.p2pNode.GetBlockHeight(blockHash)
1✔
228
        if err != nil {
1✔
229
                return nil, err
×
230
        }
×
231

232
        filteredBlock := &FilteredBlock{
1✔
233
                Hash:   *blockHash,
1✔
234
                Height: uint32(blockHeight),
1✔
235
        }
1✔
236

1✔
237
        // If we don't have any items within our current chain filter, then we
1✔
238
        // can exit early as we don't need to fetch the filter.
1✔
239
        c.filterMtx.RLock()
1✔
240
        if len(c.chainFilter) == 0 {
1✔
UNCOV
241
                c.filterMtx.RUnlock()
×
UNCOV
242
                return filteredBlock, nil
×
UNCOV
243
        }
×
244
        c.filterMtx.RUnlock()
1✔
245

1✔
246
        // Next, using the block, hash, we'll fetch the compact filter for this
1✔
247
        // block. We only require the regular filter as we're just looking for
1✔
248
        // outpoint that have been spent.
1✔
249
        filter, err := c.p2pNode.GetCFilter(*blockHash, wire.GCSFilterRegular)
1✔
250
        if err != nil {
1✔
251
                return nil, fmt.Errorf("unable to fetch filter: %w", err)
×
252
        }
×
253

254
        // Before we can match the filter, we'll need to map each item in our
255
        // chain filter to the representation that included in the compact
256
        // filters.
257
        c.filterMtx.RLock()
1✔
258
        relevantPoints := make([][]byte, 0, len(c.chainFilter))
1✔
259
        for _, filterEntry := range c.chainFilter {
3✔
260
                relevantPoints = append(relevantPoints, filterEntry)
2✔
261
        }
2✔
262
        c.filterMtx.RUnlock()
1✔
263

1✔
264
        // With our relevant points constructed, we can finally match against
1✔
265
        // the retrieved filter.
1✔
266
        matched, err := filter.MatchAny(builder.DeriveKey(blockHash),
1✔
267
                relevantPoints)
1✔
268
        if err != nil {
1✔
269
                return nil, err
×
270
        }
×
271

272
        // If there wasn't a match, then we'll return the filtered block as is
273
        // (void of any transactions).
274
        if !matched {
1✔
UNCOV
275
                return filteredBlock, nil
×
UNCOV
276
        }
×
277

278
        // If we reach this point, then there was a match, so we'll need to
279
        // fetch the block itself so we can scan it for any actual matches (as
280
        // there's a fp rate).
281
        block, err := c.GetBlock(*blockHash)
1✔
282
        if err != nil {
1✔
283
                return nil, err
×
284
        }
×
285

286
        // Finally, we'll step through the block, input by input, to see if any
287
        // transactions spend any outputs from our watched sub-set of the UTXO
288
        // set.
289
        for _, tx := range block.Transactions() {
4✔
290
                for _, txIn := range tx.MsgTx().TxIn {
6✔
291
                        prevOp := txIn.PreviousOutPoint
3✔
292

3✔
293
                        c.filterMtx.RLock()
3✔
294
                        _, ok := c.chainFilter[prevOp]
3✔
295
                        c.filterMtx.RUnlock()
3✔
296

3✔
297
                        if ok {
5✔
298
                                filteredBlock.Transactions = append(
2✔
299
                                        filteredBlock.Transactions,
2✔
300
                                        tx.MsgTx().Copy(),
2✔
301
                                )
2✔
302

2✔
303
                                c.filterMtx.Lock()
2✔
304
                                delete(c.chainFilter, prevOp)
2✔
305
                                c.filterMtx.Unlock()
2✔
306

2✔
307
                                break
2✔
308
                        }
309
                }
310
        }
311

312
        return filteredBlock, nil
1✔
313
}
314

315
// UpdateFilter updates the UTXO filter which is to be consulted when creating
316
// FilteredBlocks to be sent to subscribed clients. This method is cumulative
317
// meaning repeated calls to this method should _expand_ the size of the UTXO
318
// sub-set currently being watched.  If the set updateHeight is _lower_ than
319
// the best known height of the implementation, then the state should be
320
// rewound to ensure all relevant notifications are dispatched.
321
//
322
// NOTE: This is part of the FilteredChainView interface.
323
func (c *CfFilteredChainView) UpdateFilter(ops []graphdb.EdgePoint,
324
        updateHeight uint32) error {
3✔
325

3✔
326
        log.Tracef("Updating chain filter with new UTXO's: %v", ops)
3✔
327

3✔
328
        // First, we'll update the current chain view, by adding any new
3✔
329
        // UTXO's, ignoring duplicates in the process.
3✔
330
        c.filterMtx.Lock()
3✔
331
        for _, op := range ops {
8✔
332
                c.chainFilter[op.OutPoint] = op.FundingPkScript
5✔
333
        }
5✔
334
        c.filterMtx.Unlock()
3✔
335

3✔
336
        inputs := make([]neutrino.InputWithScript, len(ops))
3✔
337
        for i, op := range ops {
8✔
338
                inputs[i] = neutrino.InputWithScript{
5✔
339
                        PkScript: op.FundingPkScript,
5✔
340
                        OutPoint: op.OutPoint,
5✔
341
                }
5✔
342
        }
5✔
343

344
        // With our internal chain view update, we'll craft a new update to the
345
        // chainView which includes our new UTXO's, and current update height.
346
        rescanUpdate := []neutrino.UpdateOption{
3✔
347
                neutrino.AddInputs(inputs...),
3✔
348
                neutrino.Rewind(updateHeight),
3✔
349
                neutrino.DisableDisconnectedNtfns(true),
3✔
350
        }
3✔
351
        err := c.chainView.Update(rescanUpdate...)
3✔
352
        if err != nil {
3✔
353
                return fmt.Errorf("unable to update rescan: %w", err)
×
354
        }
×
355
        return nil
3✔
356
}
357

358
// FilteredBlocks returns the channel that filtered blocks are to be sent over.
359
// Each time a block is connected to the end of a main chain, and appropriate
360
// FilteredBlock which contains the transactions which mutate our watched UTXO
361
// set is to be returned.
362
//
363
// NOTE: This is part of the FilteredChainView interface.
364
func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock {
4✔
365
        return c.blockQueue.newBlocks
4✔
366
}
4✔
367

368
// DisconnectedBlocks returns a receive only channel which will be sent upon
369
// with the empty filtered blocks of blocks which are disconnected from the
370
// main chain in the case of a re-org.
371
//
372
// NOTE: This is part of the FilteredChainView interface.
373
func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock {
1✔
374
        return c.blockQueue.staleBlocks
1✔
375
}
1✔
376

377
// GetBlock is used to retrieve the block with the given hash. Since the block
378
// cache used by neutrino will be the same as that used by LND (since it is
379
// passed to neutrino on initialisation), the neutrino GetBlock method can be
380
// called directly since it already uses the block cache. However, neutrino
381
// does not lock the block cache mutex for the given block hash and so that is
382
// done here.
383
func (c *CfFilteredChainView) GetBlock(hash chainhash.Hash) (
384
        *btcutil.Block, error) {
1✔
385

1✔
386
        c.blockCache.HashMutex.Lock(lntypes.Hash(hash))
1✔
387
        defer c.blockCache.HashMutex.Unlock(lntypes.Hash(hash))
1✔
388

1✔
389
        return c.p2pNode.GetBlock(hash)
1✔
390
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc