• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.07
/watchtower/lookout/lookout.go
1
package lookout
2

3
import (
4
        "sync"
5
        "sync/atomic"
6

7
        "github.com/btcsuite/btcd/wire"
8
        "github.com/lightningnetwork/lnd/chainntnfs"
9
        "github.com/lightningnetwork/lnd/watchtower/blob"
10
)
11

12
// Config houses the Lookout's required resources to properly fulfill it's duty,
13
// including block fetching, querying accepted state updates, and construction
14
// and publication of justice transactions.
15
type Config struct {
16
        // DB provides persistent access to the watchtower's accepted state
17
        // updates such that they can be queried as new blocks arrive from the
18
        // network.
19
        DB DB
20

21
        // EpochRegistrar supports the ability to register for events corresponding to
22
        // newly created blocks.
23
        EpochRegistrar EpochRegistrar
24

25
        // BlockFetcher supports the ability to fetch blocks from the backend or
26
        // network.
27
        BlockFetcher BlockFetcher
28

29
        // Punisher handles the responsibility of crafting and broadcasting
30
        // justice transaction for any breached transactions.
31
        Punisher Punisher
32
}
33

34
// Lookout will check any incoming blocks against the transactions found in the
35
// database, and in case of matches send the information needed to create a
36
// penalty transaction to the punisher.
37
type Lookout struct {
38
        started  int32 // atomic
39
        shutdown int32 // atomic
40

41
        cfg *Config
42

43
        wg   sync.WaitGroup
44
        quit chan struct{}
45
}
46

47
// New constructs a new Lookout from the given LookoutConfig.
48
func New(cfg *Config) *Lookout {
1✔
49
        return &Lookout{
1✔
50
                cfg:  cfg,
1✔
51
                quit: make(chan struct{}),
1✔
52
        }
1✔
53
}
1✔
54

55
// Start safely spins up the Lookout and begins monitoring for breaches.
56
func (l *Lookout) Start() error {
1✔
57
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
1✔
58
                return nil
×
59
        }
×
60

61
        log.Infof("Starting lookout")
1✔
62

1✔
63
        startEpoch, err := l.cfg.DB.GetLookoutTip()
1✔
64
        if err != nil {
1✔
65
                return err
×
66
        }
×
67

68
        if startEpoch == nil {
2✔
69
                log.Infof("Starting lookout from chain tip")
1✔
70
        } else {
1✔
71
                log.Infof("Starting lookout from epoch(height=%d hash=%v)",
×
72
                        startEpoch.Height, startEpoch.Hash)
×
73
        }
×
74

75
        events, err := l.cfg.EpochRegistrar.RegisterBlockEpochNtfn(startEpoch)
1✔
76
        if err != nil {
1✔
77
                log.Errorf("Unable to register for block epochs: %v", err)
×
78
                return err
×
79
        }
×
80

81
        l.wg.Add(1)
1✔
82
        go l.watchBlocks(events)
1✔
83

1✔
84
        log.Infof("Lookout started successfully")
1✔
85

1✔
86
        return nil
1✔
87
}
88

89
// Stop safely shuts down the Lookout.
UNCOV
90
func (l *Lookout) Stop() error {
×
UNCOV
91
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
×
92
                return nil
×
93
        }
×
94

UNCOV
95
        log.Infof("Stopping lookout")
×
UNCOV
96

×
UNCOV
97
        close(l.quit)
×
UNCOV
98
        l.wg.Wait()
×
UNCOV
99

×
UNCOV
100
        log.Infof("Lookout stopped successfully")
×
UNCOV
101

×
UNCOV
102
        return nil
×
103
}
104

105
// watchBlocks serially pulls incoming epochs from the epoch source and searches
106
// our accepted state updates for any breached transactions. If any are found,
107
// we will attempt to decrypt the state updates' encrypted blobs and exact
108
// justice for the victim.
109
//
110
// This method MUST be run as a goroutine.
111
func (l *Lookout) watchBlocks(epochs *chainntnfs.BlockEpochEvent) {
1✔
112
        defer l.wg.Done()
1✔
113
        defer epochs.Cancel()
1✔
114

1✔
115
        for {
4✔
116
                select {
3✔
117
                case epoch := <-epochs.Epochs:
2✔
118
                        log.Debugf("Fetching block for (height=%d, hash=%s)",
2✔
119
                                epoch.Height, epoch.Hash)
2✔
120

2✔
121
                        // Fetch the full block from the backend corresponding
2✔
122
                        // to the newly arriving epoch.
2✔
123
                        block, err := l.cfg.BlockFetcher.GetBlock(epoch.Hash)
2✔
124
                        if err != nil {
2✔
125
                                // TODO(conner): add retry logic?
×
126
                                log.Errorf("Unable to fetch block for "+
×
127
                                        "(height=%x, hash=%s): %v",
×
128
                                        epoch.Height, epoch.Hash, err)
×
129
                                continue
×
130
                        }
131

132
                        // Process the block to see if it contains any breaches
133
                        // that we are monitoring on behalf of our clients.
134
                        err = l.processEpoch(epoch, block)
2✔
135
                        if err != nil {
2✔
136
                                log.Errorf("Unable to process %v: %v",
×
137
                                        epoch, err)
×
138
                        }
×
139

UNCOV
140
                case <-l.quit:
×
UNCOV
141
                        return
×
142
                }
143
        }
144
}
145

146
// processEpoch accepts an Epoch and queries the database for any matching state
147
// updates for the confirmed transactions. If any are found, the lookout
148
// responds by attempting to decrypt the encrypted blob and publishing the
149
// justice transaction.
150
func (l *Lookout) processEpoch(epoch *chainntnfs.BlockEpoch,
151
        block *wire.MsgBlock) error {
2✔
152

2✔
153
        numTxnsInBlock := len(block.Transactions)
2✔
154

2✔
155
        log.Debugf("Scanning %d transaction in block (height=%d, hash=%s) "+
2✔
156
                "for breaches", numTxnsInBlock, epoch.Height, epoch.Hash)
2✔
157

2✔
158
        // Iterate over the transactions contained in the block, deriving a
2✔
159
        // breach hint for each transaction and constructing an index mapping
2✔
160
        // the hint back to it's original transaction.
2✔
161
        hintToTx := make(map[blob.BreachHint]*wire.MsgTx, numTxnsInBlock)
2✔
162
        txHints := make([]blob.BreachHint, 0, numTxnsInBlock)
2✔
163
        for _, tx := range block.Transactions {
4✔
164
                hash := tx.TxHash()
2✔
165
                hint := blob.NewBreachHintFromHash(&hash)
2✔
166

2✔
167
                txHints = append(txHints, hint)
2✔
168
                hintToTx[hint] = tx.Copy()
2✔
169
        }
2✔
170

171
        // Query the database to see if any of the breach hints cause a match
172
        // with any of our accepted state updates.
173
        matches, err := l.cfg.DB.QueryMatches(txHints)
2✔
174
        if err != nil {
2✔
175
                return err
×
176
        }
×
177

178
        // No matches were found, we are done.
179
        if len(matches) == 0 {
2✔
UNCOV
180
                log.Debugf("No breaches found in (height=%d, hash=%s)",
×
UNCOV
181
                        epoch.Height, epoch.Hash)
×
UNCOV
182
                return nil
×
UNCOV
183
        }
×
184

185
        breachCountStr := "breach"
2✔
186
        if len(matches) > 1 {
2✔
187
                breachCountStr = "breaches"
×
188
        }
×
189

190
        log.Infof("Found %d %s in (height=%d, hash=%s)",
2✔
191
                len(matches), breachCountStr, epoch.Height, epoch.Hash)
2✔
192

2✔
193
        // For each match, use our index to retrieve the original transaction,
2✔
194
        // which corresponds to the breaching commitment transaction. If the
2✔
195
        // decryption succeeds, we will accumulate the assembled justice
2✔
196
        // descriptors in a single slice
2✔
197
        var successes []*JusticeDescriptor
2✔
198
        for _, match := range matches {
4✔
199
                commitTx := hintToTx[match.Hint]
2✔
200
                log.Infof("Dispatching punisher for client %s, breach-txid=%s",
2✔
201
                        match.ID, commitTx.TxHash())
2✔
202

2✔
203
                // The decryption key for the state update should be the full
2✔
204
                // txid of the breaching commitment transaction.
2✔
205
                // The decryption key for the state update should be computed as
2✔
206
                //   key = SHA256(txid).
2✔
207
                breachTxID := commitTx.TxHash()
2✔
208
                breachKey := blob.NewBreachKeyFromHash(&breachTxID)
2✔
209

2✔
210
                // Now, decrypt the blob of justice that we received in the
2✔
211
                // state update. This will contain all information required to
2✔
212
                // sweep the breached commitment outputs.
2✔
213
                justiceKit, err := blob.Decrypt(
2✔
214
                        breachKey, match.EncryptedBlob,
2✔
215
                        match.SessionInfo.Policy.BlobType,
2✔
216
                )
2✔
217
                if err != nil {
2✔
218
                        // If the decryption fails, this implies either that the
×
219
                        // client sent an invalid blob, or that the breach hint
×
220
                        // caused a match on the txid, but this isn't actually
×
221
                        // the right transaction.
×
222
                        log.Debugf("Unable to decrypt blob for client %s, "+
×
223
                                "breach-txid %s: %v", match.ID,
×
224
                                commitTx.TxHash(), err)
×
225
                        continue
×
226
                }
227

228
                justiceDesc := &JusticeDescriptor{
2✔
229
                        BreachedCommitTx: commitTx,
2✔
230
                        SessionInfo:      match.SessionInfo,
2✔
231
                        JusticeKit:       justiceKit,
2✔
232
                }
2✔
233
                successes = append(successes, justiceDesc)
2✔
234
        }
235

236
        // TODO(conner): mark successfully decrypted blob so that we can
237
        // reliably rebroadcast on startup
238

239
        // Now, we'll dispatch a punishment for each successful match in
240
        // parallel. This will assemble the justice transaction for each and
241
        // watch for their confirmation on chain.
242
        for _, justiceDesc := range successes {
4✔
243
                l.wg.Add(1)
2✔
244
                go l.dispatchPunisher(justiceDesc)
2✔
245
        }
2✔
246

247
        return l.cfg.DB.SetLookoutTip(epoch)
2✔
248
}
249

250
// dispatchPunisher accepts a justice descriptor corresponding to a successfully
251
// decrypted blob.  The punisher will then construct the witness scripts and
252
// witness stacks for the breached outputs. If construction of the justice
253
// transaction is successful, it will be published to the network to retrieve
254
// the funds and claim the watchtower's reward.
255
//
256
// This method MUST be run as a goroutine.
257
func (l *Lookout) dispatchPunisher(desc *JusticeDescriptor) {
2✔
258
        defer l.wg.Done()
2✔
259

2✔
260
        // Give the justice descriptor to the punisher to construct and publish
2✔
261
        // the justice transaction. The lookout's quit channel is provided so
2✔
262
        // that long-running tasks that watch for on-chain events can be
2✔
263
        // canceled during shutdown since this method is waitgrouped.
2✔
264
        err := l.cfg.Punisher.Punish(desc, l.quit)
2✔
265
        if err != nil {
2✔
266
                log.Errorf("Unable to punish breach-txid %s for %s: %v",
×
267
                        desc.BreachedCommitTx.TxHash(), desc.SessionInfo.ID,
×
268
                        err)
×
269
                return
×
270
        }
×
271

272
        log.Infof("Punishment for client %s with breach-txid=%s dispatched",
2✔
273
                desc.SessionInfo.ID, desc.BreachedCommitTx.TxHash())
2✔
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc