• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration21/migration.go
1
package migration21
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7

8
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
9
        "github.com/lightningnetwork/lnd/channeldb/migration21/common"
10
        "github.com/lightningnetwork/lnd/channeldb/migration21/current"
11
        "github.com/lightningnetwork/lnd/channeldb/migration21/legacy"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

15
var (
16
        byteOrder = binary.BigEndian
17

18
        // openChanBucket stores all the currently open channels. This bucket
19
        // has a second, nested bucket which is keyed by a node's ID. Within
20
        // that node ID bucket, all attributes required to track, update, and
21
        // close a channel are stored.
22
        //
23
        // openChan -> nodeID -> chanPoint
24
        //
25
        // TODO(roasbeef): flesh out comment
26
        openChannelBucket = []byte("open-chan-bucket")
27

28
        // commitDiffKey stores the current pending commitment state we've
29
        // extended to the remote party (if any). Each time we propose a new
30
        // state, we store the information necessary to reconstruct this state
31
        // from the prior commitment. This allows us to resync the remote party
32
        // to their expected state in the case of message loss.
33
        //
34
        // TODO(roasbeef): rename to commit chain?
35
        commitDiffKey = []byte("commit-diff-key")
36

37
        // unsignedAckedUpdatesKey is an entry in the channel bucket that
38
        // contains the remote updates that we have acked, but not yet signed
39
        // for in one of our remote commits.
40
        unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key")
41

42
        // remoteUnsignedLocalUpdatesKey is an entry in the channel bucket that
43
        // contains the local updates that the remote party has acked, but
44
        // has not yet signed for in one of their local commits.
45
        remoteUnsignedLocalUpdatesKey = []byte("remote-unsigned-local-updates-key")
46

47
        // networkResultStoreBucketKey is used for the root level bucket that
48
        // stores the network result for each payment ID.
49
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
50

51
        // closedChannelBucket stores summarization information concerning
52
        // previously open, but now closed channels.
53
        closedChannelBucket = []byte("closed-chan-bucket")
54

55
        // fwdPackagesKey is the root-level bucket that all forwarding packages
56
        // are written. This bucket is further subdivided based on the short
57
        // channel ID of each channel.
58
        fwdPackagesKey = []byte("fwd-packages")
59
)
60

61
// MigrateDatabaseWireMessages performs a migration in all areas that we
62
// currently store wire messages without length prefixes. This includes the
63
// CommitDiff struct, ChannelCloseSummary, LogUpdates, and also the
64
// networkResult struct as well.
UNCOV
65
func MigrateDatabaseWireMessages(tx kvdb.RwTx) error {
×
UNCOV
66
        // The migration will proceed in three phases: we'll need to update any
×
UNCOV
67
        // pending commit diffs, then any unsigned acked updates for all open
×
UNCOV
68
        // channels, then finally we'll need to update all the current
×
UNCOV
69
        // stored network results for payments in the switch.
×
UNCOV
70
        //
×
UNCOV
71
        // In this phase, we'll migrate the open channel data.
×
UNCOV
72
        if err := migrateOpenChanBucket(tx); err != nil {
×
73
                return err
×
74
        }
×
75

76
        // Next, we'll update all the present close channel summaries as well.
UNCOV
77
        if err := migrateCloseChanSummaries(tx); err != nil {
×
78
                return err
×
79
        }
×
80

81
        // We'll migrate forwarding packages, which have log updates as part of
82
        // their serialized data.
UNCOV
83
        if err := migrateForwardingPackages(tx); err != nil {
×
84
                return err
×
85
        }
×
86

87
        // Finally, we'll update the pending network results as well.
UNCOV
88
        return migrateNetworkResults(tx)
×
89
}
90

UNCOV
91
func migrateOpenChanBucket(tx kvdb.RwTx) error {
×
UNCOV
92
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
93

×
UNCOV
94
        // If no bucket is found, we can exit early.
×
UNCOV
95
        if openChanBucket == nil {
×
96
                return nil
×
97
        }
×
98

UNCOV
99
        type channelPath struct {
×
UNCOV
100
                nodePub   []byte
×
UNCOV
101
                chainHash []byte
×
UNCOV
102
                chanPoint []byte
×
UNCOV
103
        }
×
UNCOV
104
        var channelPaths []channelPath
×
UNCOV
105
        err := openChanBucket.ForEach(func(nodePub, v []byte) error {
×
UNCOV
106
                // Ensure that this is a key the same size as a pubkey, and
×
UNCOV
107
                // also that it leads directly to a bucket.
×
UNCOV
108
                if len(nodePub) != 33 || v != nil {
×
109
                        return nil
×
110
                }
×
111

UNCOV
112
                nodeChanBucket := openChanBucket.NestedReadBucket(nodePub)
×
UNCOV
113
                if nodeChanBucket == nil {
×
114
                        return fmt.Errorf("no bucket for node %x", nodePub)
×
115
                }
×
116

117
                // The next layer down is all the chains that this node
118
                // has channels on with us.
UNCOV
119
                return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
×
UNCOV
120
                        // If there's a value, it's not a bucket so
×
UNCOV
121
                        // ignore it.
×
UNCOV
122
                        if v != nil {
×
123
                                return nil
×
124
                        }
×
125

UNCOV
126
                        chainBucket := nodeChanBucket.NestedReadBucket(
×
UNCOV
127
                                chainHash,
×
UNCOV
128
                        )
×
UNCOV
129
                        if chainBucket == nil {
×
130
                                return fmt.Errorf("unable to read "+
×
131
                                        "bucket for chain=%x", chainHash)
×
132
                        }
×
133

UNCOV
134
                        return chainBucket.ForEach(func(chanPoint, v []byte) error {
×
UNCOV
135
                                // If there's a value, it's not a bucket so
×
UNCOV
136
                                // ignore it.
×
UNCOV
137
                                if v != nil {
×
138
                                        return nil
×
139
                                }
×
140

UNCOV
141
                                channelPaths = append(channelPaths, channelPath{
×
UNCOV
142
                                        nodePub:   nodePub,
×
UNCOV
143
                                        chainHash: chainHash,
×
UNCOV
144
                                        chanPoint: chanPoint,
×
UNCOV
145
                                })
×
UNCOV
146

×
UNCOV
147
                                return nil
×
148
                        })
149
                })
150
        })
UNCOV
151
        if err != nil {
×
152
                return err
×
153
        }
×
154

155
        // Now that we have all the paths of the channel we need to migrate,
156
        // we'll update all the state in a distinct step to avoid weird
157
        // behavior from  modifying buckets in a ForEach statement.
UNCOV
158
        for _, channelPath := range channelPaths {
×
UNCOV
159
                // First, we'll extract it from the node's chain bucket.
×
UNCOV
160
                nodeChanBucket := openChanBucket.NestedReadWriteBucket(
×
UNCOV
161
                        channelPath.nodePub,
×
UNCOV
162
                )
×
UNCOV
163
                chainBucket := nodeChanBucket.NestedReadWriteBucket(
×
UNCOV
164
                        channelPath.chainHash,
×
UNCOV
165
                )
×
UNCOV
166
                chanBucket := chainBucket.NestedReadWriteBucket(
×
UNCOV
167
                        channelPath.chanPoint,
×
UNCOV
168
                )
×
UNCOV
169

×
UNCOV
170
                // At this point, we have the channel bucket now, so we'll
×
UNCOV
171
                // check to see if this channel has a pending commitment or
×
UNCOV
172
                // not.
×
UNCOV
173
                commitDiffBytes := chanBucket.Get(commitDiffKey)
×
UNCOV
174
                if commitDiffBytes != nil {
×
UNCOV
175
                        // Now that we have the commit diff in the _old_
×
UNCOV
176
                        // encoding, we'll write it back to disk using the new
×
UNCOV
177
                        // encoding which has a length prefix in front of the
×
UNCOV
178
                        // CommitSig.
×
UNCOV
179
                        commitDiff, err := legacy.DeserializeCommitDiff(
×
UNCOV
180
                                bytes.NewReader(commitDiffBytes),
×
UNCOV
181
                        )
×
UNCOV
182
                        if err != nil {
×
183
                                return err
×
184
                        }
×
185

UNCOV
186
                        var b bytes.Buffer
×
UNCOV
187
                        err = current.SerializeCommitDiff(&b, commitDiff)
×
UNCOV
188
                        if err != nil {
×
189
                                return err
×
190
                        }
×
191

UNCOV
192
                        err = chanBucket.Put(commitDiffKey, b.Bytes())
×
UNCOV
193
                        if err != nil {
×
194
                                return err
×
195
                        }
×
196
                }
197

198
                // With the commit diff migrated, we'll now check to see if
199
                // there're any un-acked updates we need to migrate as well.
UNCOV
200
                updateBytes := chanBucket.Get(unsignedAckedUpdatesKey)
×
UNCOV
201
                if updateBytes != nil {
×
UNCOV
202
                        // We have un-acked updates we need to migrate so we'll
×
UNCOV
203
                        // decode then re-encode them here using the new
×
UNCOV
204
                        // format.
×
UNCOV
205
                        legacyUnackedUpdates, err := legacy.DeserializeLogUpdates(
×
UNCOV
206
                                bytes.NewReader(updateBytes),
×
UNCOV
207
                        )
×
UNCOV
208
                        if err != nil {
×
209
                                return err
×
210
                        }
×
211

UNCOV
212
                        var b bytes.Buffer
×
UNCOV
213
                        err = current.SerializeLogUpdates(&b, legacyUnackedUpdates)
×
UNCOV
214
                        if err != nil {
×
215
                                return err
×
216
                        }
×
217

UNCOV
218
                        err = chanBucket.Put(unsignedAckedUpdatesKey, b.Bytes())
×
UNCOV
219
                        if err != nil {
×
220
                                return err
×
221
                        }
×
222
                }
223

224
                // Remote unsigned updates as well.
UNCOV
225
                updateBytes = chanBucket.Get(remoteUnsignedLocalUpdatesKey)
×
UNCOV
226
                if updateBytes != nil {
×
UNCOV
227
                        legacyUnsignedUpdates, err := legacy.DeserializeLogUpdates(
×
UNCOV
228
                                bytes.NewReader(updateBytes),
×
UNCOV
229
                        )
×
UNCOV
230
                        if err != nil {
×
231
                                return err
×
232
                        }
×
233

UNCOV
234
                        var b bytes.Buffer
×
UNCOV
235
                        err = current.SerializeLogUpdates(&b, legacyUnsignedUpdates)
×
UNCOV
236
                        if err != nil {
×
237
                                return err
×
238
                        }
×
239

UNCOV
240
                        err = chanBucket.Put(remoteUnsignedLocalUpdatesKey, b.Bytes())
×
UNCOV
241
                        if err != nil {
×
242
                                return err
×
243
                        }
×
244
                }
245
        }
246

UNCOV
247
        return nil
×
248
}
249

UNCOV
250
func migrateCloseChanSummaries(tx kvdb.RwTx) error {
×
UNCOV
251
        closedChanBucket := tx.ReadWriteBucket(closedChannelBucket)
×
UNCOV
252

×
UNCOV
253
        // Exit early if bucket is not found.
×
UNCOV
254
        if closedChannelBucket == nil {
×
255
                return nil
×
256
        }
×
257

UNCOV
258
        type closedChan struct {
×
UNCOV
259
                chanKey      []byte
×
UNCOV
260
                summaryBytes []byte
×
UNCOV
261
        }
×
UNCOV
262
        var closedChans []closedChan
×
UNCOV
263
        err := closedChanBucket.ForEach(func(k, v []byte) error {
×
UNCOV
264
                closedChans = append(closedChans, closedChan{
×
UNCOV
265
                        chanKey:      k,
×
UNCOV
266
                        summaryBytes: v,
×
UNCOV
267
                })
×
UNCOV
268
                return nil
×
UNCOV
269
        })
×
UNCOV
270
        if err != nil {
×
271
                return err
×
272
        }
×
273

UNCOV
274
        for _, closedChan := range closedChans {
×
UNCOV
275
                oldSummary, err := legacy.DeserializeCloseChannelSummary(
×
UNCOV
276
                        bytes.NewReader(closedChan.summaryBytes),
×
UNCOV
277
                )
×
UNCOV
278
                if err != nil {
×
279
                        return err
×
280
                }
×
281

UNCOV
282
                var newSummaryBytes bytes.Buffer
×
UNCOV
283
                err = current.SerializeChannelCloseSummary(
×
UNCOV
284
                        &newSummaryBytes, oldSummary,
×
UNCOV
285
                )
×
UNCOV
286
                if err != nil {
×
287
                        return err
×
288
                }
×
289

UNCOV
290
                err = closedChanBucket.Put(
×
UNCOV
291
                        closedChan.chanKey, newSummaryBytes.Bytes(),
×
UNCOV
292
                )
×
UNCOV
293
                if err != nil {
×
294
                        return err
×
295
                }
×
296
        }
UNCOV
297
        return nil
×
298
}
299

UNCOV
300
func migrateForwardingPackages(tx kvdb.RwTx) error {
×
UNCOV
301
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
×
UNCOV
302

×
UNCOV
303
        // Exit early if bucket is not found.
×
UNCOV
304
        if fwdPkgBkt == nil {
×
305
                return nil
×
306
        }
×
307

308
        // We Go through the bucket and fetches all short channel IDs.
UNCOV
309
        var sources []lnwire.ShortChannelID
×
UNCOV
310
        err := fwdPkgBkt.ForEach(func(k, v []byte) error {
×
UNCOV
311
                source := lnwire.NewShortChanIDFromInt(byteOrder.Uint64(k))
×
UNCOV
312
                sources = append(sources, source)
×
UNCOV
313
                return nil
×
UNCOV
314
        })
×
UNCOV
315
        if err != nil {
×
316
                return err
×
317
        }
×
318

319
        // Now load all forwarding packages using the legacy encoding.
UNCOV
320
        var pkgsToMigrate []*common.FwdPkg
×
UNCOV
321
        for _, source := range sources {
×
UNCOV
322
                packager := legacy.NewChannelPackager(source)
×
UNCOV
323
                fwdPkgs, err := packager.LoadFwdPkgs(tx)
×
UNCOV
324
                if err != nil {
×
325
                        return err
×
326
                }
×
327

UNCOV
328
                pkgsToMigrate = append(pkgsToMigrate, fwdPkgs...)
×
329
        }
330

331
        // Add back the packages using the current encoding.
UNCOV
332
        for _, pkg := range pkgsToMigrate {
×
UNCOV
333
                packager := current.NewChannelPackager(pkg.Source)
×
UNCOV
334
                err := packager.AddFwdPkg(tx, pkg)
×
UNCOV
335
                if err != nil {
×
336
                        return err
×
337
                }
×
338
        }
339

UNCOV
340
        return nil
×
341
}
342

UNCOV
343
func migrateNetworkResults(tx kvdb.RwTx) error {
×
UNCOV
344
        networkResults := tx.ReadWriteBucket(networkResultStoreBucketKey)
×
UNCOV
345

×
UNCOV
346
        // Exit early if bucket is not found.
×
UNCOV
347
        if networkResults == nil {
×
348
                return nil
×
349
        }
×
350

351
        // Similar to the prior migrations, we'll do this one in two phases:
352
        // we'll first grab all the keys we need to migrate in one loop, then
353
        // update them all in another loop.
UNCOV
354
        var netResultsToMigrate [][2][]byte
×
UNCOV
355
        err := networkResults.ForEach(func(k, v []byte) error {
×
UNCOV
356
                netResultsToMigrate = append(netResultsToMigrate, [2][]byte{
×
UNCOV
357
                        k, v,
×
UNCOV
358
                })
×
UNCOV
359
                return nil
×
UNCOV
360
        })
×
UNCOV
361
        if err != nil {
×
362
                return err
×
363
        }
×
364

UNCOV
365
        for _, netResult := range netResultsToMigrate {
×
UNCOV
366
                resKey := netResult[0]
×
UNCOV
367
                resBytes := netResult[1]
×
UNCOV
368
                oldResult, err := legacy.DeserializeNetworkResult(
×
UNCOV
369
                        bytes.NewReader(resBytes),
×
UNCOV
370
                )
×
UNCOV
371
                if err != nil {
×
372
                        return err
×
373
                }
×
374

UNCOV
375
                var newResultBuf bytes.Buffer
×
UNCOV
376
                err = current.SerializeNetworkResult(&newResultBuf, oldResult)
×
UNCOV
377
                if err != nil {
×
378
                        return err
×
379
                }
×
380

UNCOV
381
                err = networkResults.Put(resKey, newResultBuf.Bytes())
×
UNCOV
382
                if err != nil {
×
383
                        return err
×
384
                }
×
385
        }
UNCOV
386
        return nil
×
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc