• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.62
/watchtower/lookout/lookout.go
1
package lookout
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/watchtower/blob"
14
)
15

16
// ErrLookoutExiting is an error that is returned when the lookout server is
17
// in the process of shutting down.
18
var ErrLookoutExiting = errors.New("lookout server is shutting down")
19

20
// Config houses the Lookout's required resources to properly fulfill it's duty,
21
// including block fetching, querying accepted state updates, and construction
22
// and publication of justice transactions.
23
type Config struct {
24
        // DB provides persistent access to the watchtower's accepted state
25
        // updates such that they can be queried as new blocks arrive from the
26
        // network.
27
        DB DB
28

29
        // EpochRegistrar supports the ability to register for events corresponding to
30
        // newly created blocks.
31
        EpochRegistrar EpochRegistrar
32

33
        // BlockFetcher supports the ability to fetch blocks from the backend or
34
        // network.
35
        BlockFetcher BlockFetcher
36

37
        // Punisher handles the responsibility of crafting and broadcasting
38
        // justice transaction for any breached transactions.
39
        Punisher Punisher
40

41
        // MinBackoff is the minimum amount of time to back-off before
42
        // re-attempting to fetch a block.
43
        MinBackoff time.Duration
44

45
        // MaxBackoff is the maximum amount of time to back-off before
46
        // re-attempting to fetch a block.
47
        MaxBackoff time.Duration
48

49
        // MaxNumRetries is the maximum number of times that we should
50
        // re-attempt fetching a block before moving on.
51
        MaxNumRetries int
52
}
53

54
// Lookout will check any incoming blocks against the transactions found in the
55
// database, and in case of matches send the information needed to create a
56
// penalty transaction to the punisher.
57
type Lookout struct {
58
        started  int32 // atomic
59
        shutdown int32 // atomic
60

61
        cfg *Config
62

63
        wg   sync.WaitGroup
64
        quit chan struct{}
65
}
66

67
// New constructs a new Lookout from the given LookoutConfig.
68
func New(cfg *Config) *Lookout {
1✔
69
        return &Lookout{
1✔
70
                cfg:  cfg,
1✔
71
                quit: make(chan struct{}),
1✔
72
        }
1✔
73
}
1✔
74

75
// Start safely spins up the Lookout and begins monitoring for breaches.
76
func (l *Lookout) Start() error {
1✔
77
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
1✔
78
                return nil
×
79
        }
×
80

81
        log.Infof("Starting lookout")
1✔
82

1✔
83
        startEpoch, err := l.cfg.DB.GetLookoutTip()
1✔
84
        if err != nil {
1✔
85
                return err
×
86
        }
×
87

88
        if startEpoch == nil {
2✔
89
                log.Infof("Starting lookout from chain tip")
1✔
90
        } else {
1✔
91
                log.Infof("Starting lookout from epoch(height=%d hash=%v)",
×
92
                        startEpoch.Height, startEpoch.Hash)
×
93
        }
×
94

95
        events, err := l.cfg.EpochRegistrar.RegisterBlockEpochNtfn(startEpoch)
1✔
96
        if err != nil {
1✔
97
                log.Errorf("Unable to register for block epochs: %v", err)
×
98
                return err
×
99
        }
×
100

101
        l.wg.Add(1)
1✔
102
        go l.watchBlocks(events)
1✔
103

1✔
104
        log.Infof("Lookout started successfully")
1✔
105

1✔
106
        return nil
1✔
107
}
108

109
// Stop safely shuts down the Lookout.
UNCOV
110
func (l *Lookout) Stop() error {
×
UNCOV
111
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
×
112
                return nil
×
113
        }
×
114

UNCOV
115
        log.Infof("Stopping lookout")
×
UNCOV
116

×
UNCOV
117
        close(l.quit)
×
UNCOV
118
        l.wg.Wait()
×
UNCOV
119

×
UNCOV
120
        log.Infof("Lookout stopped successfully")
×
UNCOV
121

×
UNCOV
122
        return nil
×
123
}
124

125
// fetchBlockWithRetries attempts to fetch a block from the blockchain using
126
// its hash. If it fails to fetch the block, it will back-off and retry up to
127
// MaxNumRetries times.
128
func (l *Lookout) fetchBlockWithRetries(hash *chainhash.Hash) (*wire.MsgBlock,
129
        error) {
2✔
130

2✔
131
        backoff := l.cfg.MinBackoff
2✔
132

2✔
133
        updateBackoff := func() {
2✔
134
                backoff *= 2
×
135
                if backoff > l.cfg.MaxBackoff {
×
136
                        backoff = l.cfg.MaxBackoff
×
137
                }
×
138
        }
139

140
        var attempt int
2✔
141
        for {
4✔
142
                attempt++
2✔
143

2✔
144
                block, err := l.cfg.BlockFetcher.GetBlock(hash)
2✔
145
                if err == nil {
4✔
146
                        return block, nil
2✔
147
                }
2✔
148

149
                if attempt > l.cfg.MaxNumRetries {
×
150
                        return nil, fmt.Errorf("failed to fetch block %s "+
×
151
                                "after %d attempts: %v", hash, attempt, err)
×
152
                }
×
153

154
                log.Errorf("Failed to fetch block %s (attempt %d): %v. "+
×
155
                        "Retrying in %v seconds", hash, attempt, err,
×
156
                        backoff.Seconds())
×
157

×
158
                select {
×
159
                case <-time.After(backoff):
×
160
                case <-l.quit:
×
161
                        return nil, ErrLookoutExiting
×
162
                }
163

164
                updateBackoff()
×
165
        }
166
}
167

168
// watchBlocks serially pulls incoming epochs from the epoch source and searches
169
// our accepted state updates for any breached transactions. If any are found,
170
// we will attempt to decrypt the state updates' encrypted blobs and exact
171
// justice for the victim.
172
//
173
// This method MUST be run as a goroutine.
174
func (l *Lookout) watchBlocks(epochs *chainntnfs.BlockEpochEvent) {
1✔
175
        defer l.wg.Done()
1✔
176
        defer epochs.Cancel()
1✔
177

1✔
178
        for {
4✔
179
                select {
3✔
180
                case epoch := <-epochs.Epochs:
2✔
181
                        log.Debugf("Fetching block for (height=%d, hash=%s)",
2✔
182
                                epoch.Height, epoch.Hash)
2✔
183

2✔
184
                        // Fetch the full block corresponding to the newly
2✔
185
                        // arriving epoch from the backend.
2✔
186
                        block, err := l.fetchBlockWithRetries(epoch.Hash)
2✔
187
                        if err != nil {
2✔
188
                                log.Errorf("Unable to fetch block for "+
×
189
                                        "(height=%x, hash=%s): %v",
×
190
                                        epoch.Height, epoch.Hash, err)
×
191
                                continue
×
192
                        }
193

194
                        // Process the block to see if it contains any breaches
195
                        // that we are monitoring on behalf of our clients.
196
                        err = l.processEpoch(epoch, block)
2✔
197
                        if err != nil {
2✔
198
                                log.Errorf("Unable to process %v: %v",
×
199
                                        epoch, err)
×
200
                        }
×
201

UNCOV
202
                case <-l.quit:
×
UNCOV
203
                        return
×
204
                }
205
        }
206
}
207

208
// processEpoch accepts an Epoch and queries the database for any matching state
209
// updates for the confirmed transactions. If any are found, the lookout
210
// responds by attempting to decrypt the encrypted blob and publishing the
211
// justice transaction.
212
func (l *Lookout) processEpoch(epoch *chainntnfs.BlockEpoch,
213
        block *wire.MsgBlock) error {
2✔
214

2✔
215
        numTxnsInBlock := len(block.Transactions)
2✔
216

2✔
217
        log.Debugf("Scanning %d transaction in block (height=%d, hash=%s) "+
2✔
218
                "for breaches", numTxnsInBlock, epoch.Height, epoch.Hash)
2✔
219

2✔
220
        // Iterate over the transactions contained in the block, deriving a
2✔
221
        // breach hint for each transaction and constructing an index mapping
2✔
222
        // the hint back to it's original transaction.
2✔
223
        hintToTx := make(map[blob.BreachHint]*wire.MsgTx, numTxnsInBlock)
2✔
224
        txHints := make([]blob.BreachHint, 0, numTxnsInBlock)
2✔
225
        for _, tx := range block.Transactions {
4✔
226
                hash := tx.TxHash()
2✔
227
                hint := blob.NewBreachHintFromHash(&hash)
2✔
228

2✔
229
                txHints = append(txHints, hint)
2✔
230
                hintToTx[hint] = tx.Copy()
2✔
231
        }
2✔
232

233
        // Query the database to see if any of the breach hints cause a match
234
        // with any of our accepted state updates.
235
        matches, err := l.cfg.DB.QueryMatches(txHints)
2✔
236
        if err != nil {
2✔
237
                return err
×
238
        }
×
239

240
        // No matches were found, we are done.
241
        if len(matches) == 0 {
2✔
UNCOV
242
                log.Debugf("No breaches found in (height=%d, hash=%s)",
×
UNCOV
243
                        epoch.Height, epoch.Hash)
×
UNCOV
244
                return nil
×
UNCOV
245
        }
×
246

247
        breachCountStr := "breach"
2✔
248
        if len(matches) > 1 {
2✔
249
                breachCountStr = "breaches"
×
250
        }
×
251

252
        log.Infof("Found %d %s in (height=%d, hash=%s)",
2✔
253
                len(matches), breachCountStr, epoch.Height, epoch.Hash)
2✔
254

2✔
255
        // For each match, use our index to retrieve the original transaction,
2✔
256
        // which corresponds to the breaching commitment transaction. If the
2✔
257
        // decryption succeeds, we will accumulate the assembled justice
2✔
258
        // descriptors in a single slice
2✔
259
        var successes []*JusticeDescriptor
2✔
260
        for _, match := range matches {
4✔
261
                commitTx := hintToTx[match.Hint]
2✔
262
                log.Infof("Dispatching punisher for client %s, breach-txid=%s",
2✔
263
                        match.ID, commitTx.TxHash())
2✔
264

2✔
265
                // The decryption key for the state update should be the full
2✔
266
                // txid of the breaching commitment transaction.
2✔
267
                // The decryption key for the state update should be computed as
2✔
268
                //   key = SHA256(txid || txid).
2✔
269
                breachTxID := commitTx.TxHash()
2✔
270
                breachKey := blob.NewBreachKeyFromHash(&breachTxID)
2✔
271

2✔
272
                // Now, decrypt the blob of justice that we received in the
2✔
273
                // state update. This will contain all information required to
2✔
274
                // sweep the breached commitment outputs.
2✔
275
                justiceKit, err := blob.Decrypt(
2✔
276
                        breachKey, match.EncryptedBlob,
2✔
277
                        match.SessionInfo.Policy.BlobType,
2✔
278
                )
2✔
279
                if err != nil {
2✔
280
                        // If the decryption fails, this implies either that the
×
281
                        // client sent an invalid blob, or that the breach hint
×
282
                        // caused a match on the txid, but this isn't actually
×
283
                        // the right transaction.
×
284
                        log.Debugf("Unable to decrypt blob for client %s, "+
×
285
                                "breach-txid %s: %v", match.ID,
×
286
                                commitTx.TxHash(), err)
×
287
                        continue
×
288
                }
289

290
                justiceDesc := &JusticeDescriptor{
2✔
291
                        BreachedCommitTx: commitTx,
2✔
292
                        SessionInfo:      match.SessionInfo,
2✔
293
                        JusticeKit:       justiceKit,
2✔
294
                }
2✔
295
                successes = append(successes, justiceDesc)
2✔
296
        }
297

298
        // TODO(conner): mark successfully decrypted blob so that we can
299
        // reliably rebroadcast on startup
300

301
        // Now, we'll dispatch a punishment for each successful match in
302
        // parallel. This will assemble the justice transaction for each and
303
        // watch for their confirmation on chain.
304
        for _, justiceDesc := range successes {
4✔
305
                l.wg.Add(1)
2✔
306
                go l.dispatchPunisher(justiceDesc)
2✔
307
        }
2✔
308

309
        return l.cfg.DB.SetLookoutTip(epoch)
2✔
310
}
311

312
// dispatchPunisher accepts a justice descriptor corresponding to a successfully
313
// decrypted blob.  The punisher will then construct the witness scripts and
314
// witness stacks for the breached outputs. If construction of the justice
315
// transaction is successful, it will be published to the network to retrieve
316
// the funds and claim the watchtower's reward.
317
//
318
// This method MUST be run as a goroutine.
319
func (l *Lookout) dispatchPunisher(desc *JusticeDescriptor) {
2✔
320
        defer l.wg.Done()
2✔
321

2✔
322
        // Give the justice descriptor to the punisher to construct and publish
2✔
323
        // the justice transaction. The lookout's quit channel is provided so
2✔
324
        // that long-running tasks that watch for on-chain events can be
2✔
325
        // canceled during shutdown since this method is waitgrouped.
2✔
326
        err := l.cfg.Punisher.Punish(desc, l.quit)
2✔
327
        if err != nil {
2✔
328
                log.Errorf("Unable to punish breach-txid %s for %s: %v",
×
329
                        desc.BreachedCommitTx.TxHash(), desc.SessionInfo.ID,
×
330
                        err)
×
331
                return
×
332
        }
×
333

334
        log.Infof("Punishment for client %s with breach-txid=%s dispatched",
2✔
335
                desc.SessionInfo.ID, desc.BreachedCommitTx.TxHash())
2✔
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc