• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18016273007

25 Sep 2025 05:55PM UTC coverage: 54.653% (-12.0%) from 66.622%
18016273007

Pull #10248

github

web-flow
Merge 128443298 into b09b20c69
Pull Request #10248: Enforce TLV when creating a Route

25 of 30 new or added lines in 4 files covered. (83.33%)

23906 existing lines in 281 files now uncovered.

109536 of 200421 relevant lines covered (54.65%)

21816.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.62
/watchtower/lookout/lookout.go
1
package lookout
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/watchtower/blob"
14
)
15

16
// ErrLookoutExiting is an error that is returned when the lookout server is
17
// in the process of shutting down.
18
var ErrLookoutExiting = errors.New("lookout server is shutting down")
19

20
// Config houses the Lookout's required resources to properly fulfill it's duty,
21
// including block fetching, querying accepted state updates, and construction
22
// and publication of justice transactions.
23
type Config struct {
24
        // DB provides persistent access to the watchtower's accepted state
25
        // updates such that they can be queried as new blocks arrive from the
26
        // network.
27
        DB DB
28

29
        // EpochRegistrar supports the ability to register for events corresponding to
30
        // newly created blocks.
31
        EpochRegistrar EpochRegistrar
32

33
        // BlockFetcher supports the ability to fetch blocks from the backend or
34
        // network.
35
        BlockFetcher BlockFetcher
36

37
        // Punisher handles the responsibility of crafting and broadcasting
38
        // justice transaction for any breached transactions.
39
        Punisher Punisher
40

41
        // MinBackoff is the minimum amount of time to back-off before
42
        // re-attempting to fetch a block.
43
        MinBackoff time.Duration
44

45
        // MaxBackoff is the maximum amount of time to back-off before
46
        // re-attempting to fetch a block.
47
        MaxBackoff time.Duration
48

49
        // MaxNumRetries is the maximum number of times that we should
50
        // re-attempt fetching a block before moving on.
51
        MaxNumRetries int
52
}
53

54
// Lookout will check any incoming blocks against the transactions found in the
55
// database, and in case of matches send the information needed to create a
56
// penalty transaction to the punisher.
57
type Lookout struct {
58
        started  int32 // atomic
59
        shutdown int32 // atomic
60

61
        cfg *Config
62

63
        wg   sync.WaitGroup
64
        quit chan struct{}
65
}
66

67
// New constructs a new Lookout from the given LookoutConfig.
68
func New(cfg *Config) *Lookout {
1✔
69
        return &Lookout{
1✔
70
                cfg:  cfg,
1✔
71
                quit: make(chan struct{}),
1✔
72
        }
1✔
73
}
1✔
74

75
// Start safely spins up the Lookout and begins monitoring for breaches.
76
func (l *Lookout) Start() error {
1✔
77
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
1✔
78
                return nil
×
79
        }
×
80

81
        log.Infof("Starting lookout")
1✔
82

1✔
83
        startEpoch, err := l.cfg.DB.GetLookoutTip()
1✔
84
        if err != nil {
1✔
85
                return err
×
86
        }
×
87

88
        if startEpoch == nil {
2✔
89
                log.Infof("Starting lookout from chain tip")
1✔
90
        } else {
1✔
91
                log.Infof("Starting lookout from epoch(height=%d hash=%v)",
×
92
                        startEpoch.Height, startEpoch.Hash)
×
93
        }
×
94

95
        events, err := l.cfg.EpochRegistrar.RegisterBlockEpochNtfn(startEpoch)
1✔
96
        if err != nil {
1✔
97
                log.Errorf("Unable to register for block epochs: %v", err)
×
98
                return err
×
99
        }
×
100

101
        l.wg.Add(1)
1✔
102
        go l.watchBlocks(events)
1✔
103

1✔
104
        log.Infof("Lookout started successfully")
1✔
105

1✔
106
        return nil
1✔
107
}
108

109
// Stop safely shuts down the Lookout.
UNCOV
110
func (l *Lookout) Stop() error {
×
UNCOV
111
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
×
112
                return nil
×
113
        }
×
114

UNCOV
115
        log.Infof("Stopping lookout")
×
UNCOV
116

×
UNCOV
117
        close(l.quit)
×
UNCOV
118
        l.wg.Wait()
×
UNCOV
119

×
UNCOV
120
        log.Infof("Lookout stopped successfully")
×
UNCOV
121

×
UNCOV
122
        return nil
×
123
}
124

125
// fetchBlockWithRetries attempts to fetch a block from the blockchain using
126
// its hash. If it fails to fetch the block, it will back-off and retry up to
127
// MaxNumRetries times.
128
func (l *Lookout) fetchBlockWithRetries(hash *chainhash.Hash) (*wire.MsgBlock,
129
        error) {
2✔
130

2✔
131
        backoff := l.cfg.MinBackoff
2✔
132

2✔
133
        updateBackoff := func() {
2✔
134
                backoff *= 2
×
135
                if backoff > l.cfg.MaxBackoff {
×
136
                        backoff = l.cfg.MaxBackoff
×
137
                }
×
138
        }
139

140
        var attempt int
2✔
141
        for {
4✔
142
                attempt++
2✔
143

2✔
144
                block, err := l.cfg.BlockFetcher.GetBlock(hash)
2✔
145
                if err == nil {
4✔
146
                        return block, nil
2✔
147
                }
2✔
148

149
                if attempt > l.cfg.MaxNumRetries {
×
150
                        return nil, fmt.Errorf("failed to fetch block %s "+
×
151
                                "after %d attempts: %v", hash, attempt, err)
×
152
                }
×
153

154
                log.Errorf("Failed to fetch block %s (attempt %d): %v. "+
×
155
                        "Retrying in %v seconds", hash, attempt, err,
×
156
                        backoff.Seconds())
×
157

×
158
                select {
×
159
                case <-time.After(backoff):
×
160
                case <-l.quit:
×
161
                        return nil, ErrLookoutExiting
×
162
                }
163

164
                updateBackoff()
×
165
        }
166
}
167

168
// watchBlocks serially pulls incoming epochs from the epoch source and searches
169
// our accepted state updates for any breached transactions. If any are found,
170
// we will attempt to decrypt the state updates' encrypted blobs and exact
171
// justice for the victim.
172
//
173
// This method MUST be run as a goroutine.
174
func (l *Lookout) watchBlocks(epochs *chainntnfs.BlockEpochEvent) {
1✔
175
        defer l.wg.Done()
1✔
176
        defer epochs.Cancel()
1✔
177

1✔
178
        for {
4✔
179
                select {
3✔
180
                case epoch := <-epochs.Epochs:
2✔
181
                        log.Debugf("Fetching block for (height=%d, hash=%s)",
2✔
182
                                epoch.Height, epoch.Hash)
2✔
183

2✔
184
                        // Fetch the full block corresponding to the newly
2✔
185
                        // arriving epoch from the backend.
2✔
186
                        block, err := l.fetchBlockWithRetries(epoch.Hash)
2✔
187
                        if err != nil {
2✔
188
                                log.Errorf("Unable to fetch block for "+
×
189
                                        "(height=%x, hash=%s): %v",
×
190
                                        epoch.Height, epoch.Hash, err)
×
191
                                continue
×
192
                        }
193

194
                        // Process the block to see if it contains any breaches
195
                        // that we are monitoring on behalf of our clients.
196
                        err = l.processEpoch(epoch, block)
2✔
197
                        if err != nil {
2✔
198
                                log.Errorf("Unable to process %v: %v",
×
199
                                        epoch, err)
×
200
                        }
×
201

UNCOV
202
                case <-l.quit:
×
UNCOV
203
                        return
×
204
                }
205
        }
206
}
207

208
// processEpoch accepts an Epoch and queries the database for any matching state
209
// updates for the confirmed transactions. If any are found, the lookout
210
// responds by attempting to decrypt the encrypted blob and publishing the
211
// justice transaction.
212
func (l *Lookout) processEpoch(epoch *chainntnfs.BlockEpoch,
213
        block *wire.MsgBlock) error {
2✔
214

2✔
215
        numTxnsInBlock := len(block.Transactions)
2✔
216

2✔
217
        log.Debugf("Scanning %d transaction in block (height=%d, hash=%s) "+
2✔
218
                "for breaches", numTxnsInBlock, epoch.Height, epoch.Hash)
2✔
219

2✔
220
        // Iterate over the transactions contained in the block, deriving a
2✔
221
        // breach hint for each transaction and constructing an index mapping
2✔
222
        // the hint back to it's original transaction.
2✔
223
        hintToTx := make(map[blob.BreachHint]*wire.MsgTx, numTxnsInBlock)
2✔
224
        txHints := make([]blob.BreachHint, 0, numTxnsInBlock)
2✔
225
        for _, tx := range block.Transactions {
4✔
226
                hash := tx.TxHash()
2✔
227
                hint := blob.NewBreachHintFromHash(&hash)
2✔
228

2✔
229
                txHints = append(txHints, hint)
2✔
230
                hintToTx[hint] = tx.Copy()
2✔
231
        }
2✔
232

233
        // Query the database to see if any of the breach hints cause a match
234
        // with any of our accepted state updates.
235
        matches, err := l.cfg.DB.QueryMatches(txHints)
2✔
236
        if err != nil {
2✔
237
                return err
×
238
        }
×
239

240
        // No matches were found, we are done.
241
        if len(matches) == 0 {
2✔
UNCOV
242
                log.Debugf("No breaches found in (height=%d, hash=%s)",
×
UNCOV
243
                        epoch.Height, epoch.Hash)
×
UNCOV
244
                return nil
×
UNCOV
245
        }
×
246

247
        breachCountStr := "breach"
2✔
248
        if len(matches) > 1 {
2✔
249
                breachCountStr = "breaches"
×
250
        }
×
251

252
        log.Infof("Found %d %s in (height=%d, hash=%s)",
2✔
253
                len(matches), breachCountStr, epoch.Height, epoch.Hash)
2✔
254

2✔
255
        // For each match, use our index to retrieve the original transaction,
2✔
256
        // which corresponds to the breaching commitment transaction. If the
2✔
257
        // decryption succeeds, we will accumulate the assembled justice
2✔
258
        // descriptors in a single slice
2✔
259
        var successes []*JusticeDescriptor
2✔
260
        for _, match := range matches {
4✔
261
                commitTx := hintToTx[match.Hint]
2✔
262
                log.Infof("Dispatching punisher for client %s, breach-txid=%s",
2✔
263
                        match.ID, commitTx.TxHash())
2✔
264

2✔
265
                // The decryption key for the state update should be the full
2✔
266
                // txid of the breaching commitment transaction.
2✔
267
                // The decryption key for the state update should be computed as
2✔
268
                //   key = SHA256(txid || txid).
2✔
269
                breachTxID := commitTx.TxHash()
2✔
270
                breachKey := blob.NewBreachKeyFromHash(&breachTxID)
2✔
271

2✔
272
                // Now, decrypt the blob of justice that we received in the
2✔
273
                // state update. This will contain all information required to
2✔
274
                // sweep the breached commitment outputs.
2✔
275
                justiceKit, err := blob.Decrypt(
2✔
276
                        breachKey, match.EncryptedBlob,
2✔
277
                        match.SessionInfo.Policy.BlobType,
2✔
278
                )
2✔
279
                if err != nil {
2✔
280
                        // If the decryption fails, this implies either that the
×
281
                        // client sent an invalid blob, or that the breach hint
×
282
                        // caused a match on the txid, but this isn't actually
×
283
                        // the right transaction.
×
284
                        log.Debugf("Unable to decrypt blob for client %s, "+
×
285
                                "breach-txid %s: %v", match.ID,
×
286
                                commitTx.TxHash(), err)
×
287
                        continue
×
288
                }
289

290
                justiceDesc := &JusticeDescriptor{
2✔
291
                        BreachedCommitTx: commitTx,
2✔
292
                        SessionInfo:      match.SessionInfo,
2✔
293
                        JusticeKit:       justiceKit,
2✔
294
                }
2✔
295
                successes = append(successes, justiceDesc)
2✔
296
        }
297

298
        // TODO(conner): mark successfully decrypted blob so that we can
299
        // reliably rebroadcast on startup
300

301
        // Now, we'll dispatch a punishment for each successful match in
302
        // parallel. This will assemble the justice transaction for each and
303
        // watch for their confirmation on chain.
304
        for _, justiceDesc := range successes {
4✔
305
                l.wg.Add(1)
2✔
306
                go l.dispatchPunisher(justiceDesc)
2✔
307
        }
2✔
308

309
        return l.cfg.DB.SetLookoutTip(epoch)
2✔
310
}
311

312
// dispatchPunisher accepts a justice descriptor corresponding to a successfully
313
// decrypted blob.  The punisher will then construct the witness scripts and
314
// witness stacks for the breached outputs. If construction of the justice
315
// transaction is successful, it will be published to the network to retrieve
316
// the funds and claim the watchtower's reward.
317
//
318
// This method MUST be run as a goroutine.
319
func (l *Lookout) dispatchPunisher(desc *JusticeDescriptor) {
2✔
320
        defer l.wg.Done()
2✔
321

2✔
322
        // Give the justice descriptor to the punisher to construct and publish
2✔
323
        // the justice transaction. The lookout's quit channel is provided so
2✔
324
        // that long-running tasks that watch for on-chain events can be
2✔
325
        // canceled during shutdown since this method is waitgrouped.
2✔
326
        err := l.cfg.Punisher.Punish(desc, l.quit)
2✔
327
        if err != nil {
2✔
328
                log.Errorf("Unable to punish breach-txid %s for %s: %v",
×
329
                        desc.BreachedCommitTx.TxHash(), desc.SessionInfo.ID,
×
330
                        err)
×
331
                return
×
332
        }
×
333

334
        log.Infof("Punishment for client %s with breach-txid=%s dispatched",
2✔
335
                desc.SessionInfo.ID, desc.BreachedCommitTx.TxHash())
2✔
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc