• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13586005509

28 Feb 2025 10:14AM UTC coverage: 68.629% (+9.9%) from 58.77%
13586005509

Pull #9521

github

web-flow
Merge 37d3a70a5 into 8532955b3
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129950 of 189351 relevant lines covered (68.63%)

23726.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.24
/channeldb/migration21/legacy/legacy_decoding.go
1
package legacy
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8

9
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
10
        "github.com/lightningnetwork/lnd/channeldb/migration21/common"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
)
13

14
func deserializeHtlcs(r io.Reader) ([]common.HTLC, error) {
1✔
15
        var numHtlcs uint16
1✔
16
        if err := ReadElement(r, &numHtlcs); err != nil {
1✔
17
                return nil, err
×
18
        }
×
19

20
        var htlcs []common.HTLC
1✔
21
        if numHtlcs == 0 {
2✔
22
                return htlcs, nil
1✔
23
        }
1✔
24

25
        htlcs = make([]common.HTLC, numHtlcs)
×
26
        for i := uint16(0); i < numHtlcs; i++ {
×
27
                if err := ReadElements(r,
×
28
                        &htlcs[i].Signature, &htlcs[i].RHash, &htlcs[i].Amt,
×
29
                        &htlcs[i].RefundTimeout, &htlcs[i].OutputIndex,
×
30
                        &htlcs[i].Incoming, &htlcs[i].OnionBlob,
×
31
                        &htlcs[i].HtlcIndex, &htlcs[i].LogIndex,
×
32
                ); err != nil {
×
33
                        return htlcs, err
×
34
                }
×
35
        }
36

37
        return htlcs, nil
×
38
}
39

40
func DeserializeLogUpdates(r io.Reader) ([]common.LogUpdate, error) {
3✔
41
        var numUpdates uint16
3✔
42
        if err := binary.Read(r, byteOrder, &numUpdates); err != nil {
3✔
43
                return nil, err
×
44
        }
×
45

46
        logUpdates := make([]common.LogUpdate, numUpdates)
3✔
47
        for i := 0; i < int(numUpdates); i++ {
9✔
48
                err := ReadElements(r,
6✔
49
                        &logUpdates[i].LogIndex, &logUpdates[i].UpdateMsg,
6✔
50
                )
6✔
51
                if err != nil {
6✔
52
                        return nil, err
×
53
                }
×
54
        }
55

56
        return logUpdates, nil
3✔
57
}
58

59
func deserializeChanCommit(r io.Reader) (common.ChannelCommitment, error) {
1✔
60
        var c common.ChannelCommitment
1✔
61

1✔
62
        err := ReadElements(r,
1✔
63
                &c.CommitHeight, &c.LocalLogIndex, &c.LocalHtlcIndex, &c.RemoteLogIndex,
1✔
64
                &c.RemoteHtlcIndex, &c.LocalBalance, &c.RemoteBalance,
1✔
65
                &c.CommitFee, &c.FeePerKw, &c.CommitTx, &c.CommitSig,
1✔
66
        )
1✔
67
        if err != nil {
1✔
68
                return c, err
×
69
        }
×
70

71
        c.Htlcs, err = deserializeHtlcs(r)
1✔
72
        if err != nil {
1✔
73
                return c, err
×
74
        }
×
75

76
        return c, nil
1✔
77
}
78

79
func DeserializeCommitDiff(r io.Reader) (*common.CommitDiff, error) {
1✔
80
        var (
1✔
81
                d   common.CommitDiff
1✔
82
                err error
1✔
83
        )
1✔
84

1✔
85
        d.Commitment, err = deserializeChanCommit(r)
1✔
86
        if err != nil {
1✔
87
                return nil, err
×
88
        }
×
89

90
        d.CommitSig = &lnwire.CommitSig{}
1✔
91
        if err := d.CommitSig.Decode(r, 0); err != nil {
1✔
92
                return nil, err
×
93
        }
×
94

95
        d.LogUpdates, err = DeserializeLogUpdates(r)
1✔
96
        if err != nil {
1✔
97
                return nil, err
×
98
        }
×
99

100
        var numOpenRefs uint16
1✔
101
        if err := binary.Read(r, byteOrder, &numOpenRefs); err != nil {
1✔
102
                return nil, err
×
103
        }
×
104

105
        d.OpenedCircuitKeys = make([]common.CircuitKey, numOpenRefs)
1✔
106
        for i := 0; i < int(numOpenRefs); i++ {
1✔
107
                err := ReadElements(r,
×
108
                        &d.OpenedCircuitKeys[i].ChanID,
×
109
                        &d.OpenedCircuitKeys[i].HtlcID)
×
110
                if err != nil {
×
111
                        return nil, err
×
112
                }
×
113
        }
114

115
        var numClosedRefs uint16
1✔
116
        if err := binary.Read(r, byteOrder, &numClosedRefs); err != nil {
1✔
117
                return nil, err
×
118
        }
×
119

120
        d.ClosedCircuitKeys = make([]common.CircuitKey, numClosedRefs)
1✔
121
        for i := 0; i < int(numClosedRefs); i++ {
1✔
122
                err := ReadElements(r,
×
123
                        &d.ClosedCircuitKeys[i].ChanID,
×
124
                        &d.ClosedCircuitKeys[i].HtlcID)
×
125
                if err != nil {
×
126
                        return nil, err
×
127
                }
×
128
        }
129

130
        return &d, nil
1✔
131
}
132

133
func serializeHtlcs(b io.Writer, htlcs ...common.HTLC) error {
1✔
134
        numHtlcs := uint16(len(htlcs))
1✔
135
        if err := WriteElement(b, numHtlcs); err != nil {
1✔
136
                return err
×
137
        }
×
138

139
        for _, htlc := range htlcs {
1✔
140
                if err := WriteElements(b,
×
141
                        htlc.Signature, htlc.RHash, htlc.Amt, htlc.RefundTimeout,
×
142
                        htlc.OutputIndex, htlc.Incoming, htlc.OnionBlob,
×
143
                        htlc.HtlcIndex, htlc.LogIndex,
×
144
                ); err != nil {
×
145
                        return err
×
146
                }
×
147
        }
148

149
        return nil
1✔
150
}
151

152
func serializeChanCommit(w io.Writer, c *common.ChannelCommitment) error {
1✔
153
        if err := WriteElements(w,
1✔
154
                c.CommitHeight, c.LocalLogIndex, c.LocalHtlcIndex,
1✔
155
                c.RemoteLogIndex, c.RemoteHtlcIndex, c.LocalBalance,
1✔
156
                c.RemoteBalance, c.CommitFee, c.FeePerKw, c.CommitTx,
1✔
157
                c.CommitSig,
1✔
158
        ); err != nil {
1✔
159
                return err
×
160
        }
×
161

162
        return serializeHtlcs(w, c.Htlcs...)
1✔
163
}
164

165
func SerializeLogUpdates(w io.Writer, logUpdates []common.LogUpdate) error {
2✔
166
        numUpdates := uint16(len(logUpdates))
2✔
167
        if err := binary.Write(w, byteOrder, numUpdates); err != nil {
2✔
168
                return err
×
169
        }
×
170

171
        for _, diff := range logUpdates {
6✔
172
                err := WriteElements(w, diff.LogIndex, diff.UpdateMsg)
4✔
173
                if err != nil {
4✔
174
                        return err
×
175
                }
×
176
        }
177

178
        return nil
2✔
179
}
180

181
func SerializeCommitDiff(w io.Writer, diff *common.CommitDiff) error { // nolint: dupl
1✔
182
        if err := serializeChanCommit(w, &diff.Commitment); err != nil {
1✔
183
                return err
×
184
        }
×
185

186
        if err := diff.CommitSig.Encode(w, 0); err != nil {
1✔
187
                return err
×
188
        }
×
189

190
        if err := SerializeLogUpdates(w, diff.LogUpdates); err != nil {
1✔
191
                return err
×
192
        }
×
193

194
        numOpenRefs := uint16(len(diff.OpenedCircuitKeys))
1✔
195
        if err := binary.Write(w, byteOrder, numOpenRefs); err != nil {
1✔
196
                return err
×
197
        }
×
198

199
        for _, openRef := range diff.OpenedCircuitKeys {
1✔
200
                err := WriteElements(w, openRef.ChanID, openRef.HtlcID)
×
201
                if err != nil {
×
202
                        return err
×
203
                }
×
204
        }
205

206
        numClosedRefs := uint16(len(diff.ClosedCircuitKeys))
1✔
207
        if err := binary.Write(w, byteOrder, numClosedRefs); err != nil {
1✔
208
                return err
×
209
        }
×
210

211
        for _, closedRef := range diff.ClosedCircuitKeys {
1✔
212
                err := WriteElements(w, closedRef.ChanID, closedRef.HtlcID)
×
213
                if err != nil {
×
214
                        return err
×
215
                }
×
216
        }
217

218
        return nil
1✔
219
}
220

221
func DeserializeNetworkResult(r io.Reader) (*common.NetworkResult, error) {
1✔
222
        var (
1✔
223
                err error
1✔
224
        )
1✔
225

1✔
226
        n := &common.NetworkResult{}
1✔
227

1✔
228
        n.Msg, err = lnwire.ReadMessage(r, 0)
1✔
229
        if err != nil {
1✔
230
                return nil, err
×
231
        }
×
232

233
        if err := ReadElements(r,
1✔
234
                &n.Unencrypted, &n.IsResolution,
1✔
235
        ); err != nil {
1✔
236
                return nil, err
×
237
        }
×
238

239
        return n, nil
1✔
240
}
241

242
func SerializeNetworkResult(w io.Writer, n *common.NetworkResult) error {
1✔
243
        if _, err := lnwire.WriteMessage(w, n.Msg, 0); err != nil {
1✔
244
                return err
×
245
        }
×
246

247
        return WriteElements(w, n.Unencrypted, n.IsResolution)
1✔
248
}
249

250
func readChanConfig(b io.Reader, c *common.ChannelConfig) error { // nolint: dupl
1✔
251
        return ReadElements(b,
1✔
252
                &c.DustLimit, &c.MaxPendingAmount, &c.ChanReserve,
1✔
253
                &c.MinHTLC, &c.MaxAcceptedHtlcs, &c.CsvDelay,
1✔
254
                &c.MultiSigKey, &c.RevocationBasePoint,
1✔
255
                &c.PaymentBasePoint, &c.DelayBasePoint,
1✔
256
                &c.HtlcBasePoint,
1✔
257
        )
1✔
258
}
1✔
259

260
func DeserializeCloseChannelSummary(
261
        r io.Reader) (*common.ChannelCloseSummary, error) { // nolint: dupl
1✔
262

1✔
263
        c := &common.ChannelCloseSummary{}
1✔
264

1✔
265
        err := ReadElements(r,
1✔
266
                &c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID,
1✔
267
                &c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance,
1✔
268
                &c.TimeLockedBalance, &c.CloseType, &c.IsPending,
1✔
269
        )
1✔
270
        if err != nil {
1✔
271
                return nil, err
×
272
        }
×
273

274
        // We'll now check to see if the channel close summary was encoded with
275
        // any of the additional optional fields.
276
        var hasNewFields bool
1✔
277
        err = ReadElements(r, &hasNewFields)
1✔
278
        if err != nil {
1✔
279
                return nil, err
×
280
        }
×
281

282
        // If fields are not present, we can return.
283
        if !hasNewFields {
1✔
284
                return c, nil
×
285
        }
×
286

287
        // Otherwise read the new fields.
288
        if err := ReadElements(r, &c.RemoteCurrentRevocation); err != nil {
1✔
289
                return nil, err
×
290
        }
×
291

292
        if err := readChanConfig(r, &c.LocalChanConfig); err != nil {
1✔
293
                return nil, err
×
294
        }
×
295

296
        // Finally, we'll attempt to read the next unrevoked commitment point
297
        // for the remote party. If we closed the channel before receiving a
298
        // funding locked message then this might not be present. A boolean
299
        // indicating whether the field is present will come first.
300
        var hasRemoteNextRevocation bool
1✔
301
        err = ReadElements(r, &hasRemoteNextRevocation)
1✔
302
        if err != nil {
1✔
303
                return nil, err
×
304
        }
×
305

306
        // If this field was written, read it.
307
        if hasRemoteNextRevocation {
2✔
308
                err = ReadElements(r, &c.RemoteNextRevocation)
1✔
309
                if err != nil {
1✔
310
                        return nil, err
×
311
                }
×
312
        }
313

314
        // Check if we have a channel sync message to read.
315
        var hasChanSyncMsg bool
1✔
316
        err = ReadElements(r, &hasChanSyncMsg)
1✔
317
        if err == io.EOF {
1✔
318
                return c, nil
×
319
        } else if err != nil {
1✔
320
                return nil, err
×
321
        }
×
322

323
        // If a chan sync message is present, read it.
324
        if hasChanSyncMsg {
2✔
325
                // We must pass in reference to a lnwire.Message for the codec
1✔
326
                // to support it.
1✔
327
                msg, err := lnwire.ReadMessage(r, 0)
1✔
328
                if err != nil {
1✔
329
                        return nil, err
×
330
                }
×
331

332
                chanSync, ok := msg.(*lnwire.ChannelReestablish)
1✔
333
                if !ok {
1✔
334
                        return nil, errors.New("unable cast db Message to " +
×
335
                                "ChannelReestablish")
×
336
                }
×
337
                c.LastChanSyncMsg = chanSync
1✔
338
        }
339

340
        return c, nil
1✔
341
}
342

343
func writeChanConfig(b io.Writer, c *common.ChannelConfig) error { // nolint: dupl
1✔
344
        return WriteElements(b,
1✔
345
                c.DustLimit, c.MaxPendingAmount, c.ChanReserve, c.MinHTLC,
1✔
346
                c.MaxAcceptedHtlcs, c.CsvDelay, c.MultiSigKey,
1✔
347
                c.RevocationBasePoint, c.PaymentBasePoint, c.DelayBasePoint,
1✔
348
                c.HtlcBasePoint,
1✔
349
        )
1✔
350
}
1✔
351

352
func SerializeChannelCloseSummary(w io.Writer, cs *common.ChannelCloseSummary) error {
1✔
353
        err := WriteElements(w,
1✔
354
                cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID,
1✔
355
                cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance,
1✔
356
                cs.TimeLockedBalance, cs.CloseType, cs.IsPending,
1✔
357
        )
1✔
358
        if err != nil {
1✔
359
                return err
×
360
        }
×
361

362
        // If this is a close channel summary created before the addition of
363
        // the new fields, then we can exit here.
364
        if cs.RemoteCurrentRevocation == nil {
1✔
365
                return WriteElements(w, false)
×
366
        }
×
367

368
        // If fields are present, write boolean to indicate this, and continue.
369
        if err := WriteElements(w, true); err != nil {
1✔
370
                return err
×
371
        }
×
372

373
        if err := WriteElements(w, cs.RemoteCurrentRevocation); err != nil {
1✔
374
                return err
×
375
        }
×
376

377
        if err := writeChanConfig(w, &cs.LocalChanConfig); err != nil {
1✔
378
                return err
×
379
        }
×
380

381
        // The RemoteNextRevocation field is optional, as it's possible for a
382
        // channel to be closed before we learn of the next unrevoked
383
        // revocation point for the remote party. Write a boolean indicating
384
        // whether this field is present or not.
385
        if err := WriteElements(w, cs.RemoteNextRevocation != nil); err != nil {
1✔
386
                return err
×
387
        }
×
388

389
        // Write the field, if present.
390
        if cs.RemoteNextRevocation != nil {
2✔
391
                if err = WriteElements(w, cs.RemoteNextRevocation); err != nil {
1✔
392
                        return err
×
393
                }
×
394
        }
395

396
        // Write whether the channel sync message is present.
397
        if err := WriteElements(w, cs.LastChanSyncMsg != nil); err != nil {
1✔
398
                return err
×
399
        }
×
400

401
        // Write the channel sync message, if present.
402
        if cs.LastChanSyncMsg != nil {
2✔
403
                _, err = lnwire.WriteMessage(w, cs.LastChanSyncMsg, 0)
1✔
404
                if err != nil {
1✔
405
                        return err
×
406
                }
×
407
        }
408

409
        return nil
1✔
410
}
411

412
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
413
// package has potentially been mangled.
414
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
415

416
var (
417
        // fwdPackagesKey is the root-level bucket that all forwarding packages
418
        // are written. This bucket is further subdivided based on the short
419
        // channel ID of each channel.
420
        fwdPackagesKey = []byte("fwd-packages")
421

422
        // addBucketKey is the bucket to which all Add log updates are written.
423
        addBucketKey = []byte("add-updates")
424

425
        // failSettleBucketKey is the bucket to which all Settle/Fail log
426
        // updates are written.
427
        failSettleBucketKey = []byte("fail-settle-updates")
428

429
        // fwdFilterKey is a key used to write the set of Adds that passed
430
        // validation and are to be forwarded to the switch.
431
        // NOTE: The presence of this key within a forwarding package indicates
432
        // that the package has reached FwdStateProcessed.
433
        fwdFilterKey = []byte("fwd-filter-key")
434

435
        // ackFilterKey is a key used to access the PkgFilter indicating which
436
        // Adds have received a Settle/Fail. This response may come from a
437
        // number of sources, including: exitHop settle/fails, switch failures,
438
        // chain arbiter interjections, as well as settle/fails from the
439
        // next hop in the route.
440
        ackFilterKey = []byte("ack-filter-key")
441

442
        // settleFailFilterKey is a key used to access the PkgFilter indicating
443
        // which Settles/Fails in have been received and processed by the link
444
        // that originally received the Add.
445
        settleFailFilterKey = []byte("settle-fail-filter-key")
446
)
447

448
func makeLogKey(updateNum uint64) [8]byte {
500✔
449
        var key [8]byte
500✔
450
        byteOrder.PutUint64(key[:], updateNum)
500✔
451
        return key
500✔
452
}
500✔
453

454
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
455
func uint16Key(i uint16) []byte {
400✔
456
        key := make([]byte, 2)
400✔
457
        byteOrder.PutUint16(key, i)
400✔
458
        return key
400✔
459
}
400✔
460

461
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
462
// packages. The packager is tied to a particular source channel ID, allowing it
463
// to create and edit its own packages. Each packager also has the ability to
464
// remove fail/settle htlcs that correspond to an add contained in one of
465
// source's packages.
466
type ChannelPackager struct {
467
        source lnwire.ShortChannelID
468
}
469

470
// NewChannelPackager creates a new packager for a single channel.
471
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
200✔
472
        return &ChannelPackager{
200✔
473
                source: source,
200✔
474
        }
200✔
475
}
200✔
476

477
// AddFwdPkg writes a newly locked in forwarding package to disk.
478
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *common.FwdPkg) error { // nolint: dupl
100✔
479
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
100✔
480
        if err != nil {
100✔
481
                return err
×
482
        }
×
483

484
        source := makeLogKey(fwdPkg.Source.ToUint64())
100✔
485
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
100✔
486
        if err != nil {
100✔
487
                return err
×
488
        }
×
489

490
        heightKey := makeLogKey(fwdPkg.Height)
100✔
491
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
100✔
492
        if err != nil {
100✔
493
                return err
×
494
        }
×
495

496
        // Write ADD updates we received at this commit height.
497
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
100✔
498
        if err != nil {
100✔
499
                return err
×
500
        }
×
501

502
        // Write SETTLE/FAIL updates we received at this commit height.
503
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
100✔
504
        if err != nil {
100✔
505
                return err
×
506
        }
×
507

508
        for i := range fwdPkg.Adds {
300✔
509
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
200✔
510
                if err != nil {
200✔
511
                        return err
×
512
                }
×
513
        }
514

515
        // Persist the initialized pkg filter, which will be used to determine
516
        // when we can remove this forwarding package from disk.
517
        var ackFilterBuf bytes.Buffer
100✔
518
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
100✔
519
                return err
×
520
        }
×
521

522
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
100✔
523
                return err
×
524
        }
×
525

526
        for i := range fwdPkg.SettleFails {
300✔
527
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
200✔
528
                if err != nil {
200✔
529
                        return err
×
530
                }
×
531
        }
532

533
        var settleFailFilterBuf bytes.Buffer
100✔
534
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
100✔
535
        if err != nil {
100✔
536
                return err
×
537
        }
×
538

539
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
100✔
540
}
541

542
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
543
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *common.LogUpdate) error {
400✔
544
        var b bytes.Buffer
400✔
545
        if err := serializeLogUpdate(&b, htlc); err != nil {
400✔
546
                return err
×
547
        }
×
548

549
        return bkt.Put(uint16Key(idx), b.Bytes())
400✔
550
}
551

552
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
553
// processed, and returns their deserialized log updates in a map indexed by the
554
// remote commitment height at which the updates were locked in.
555
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*common.FwdPkg, error) {
100✔
556
        return loadChannelFwdPkgs(tx, p.source)
100✔
557
}
100✔
558

559
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
560
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*common.FwdPkg, error) { // nolint: dupl
100✔
561
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
100✔
562
        if fwdPkgBkt == nil {
100✔
563
                return nil, nil
×
564
        }
×
565

566
        sourceKey := makeLogKey(source.ToUint64())
100✔
567
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
100✔
568
        if sourceBkt == nil {
100✔
569
                return nil, nil
×
570
        }
×
571

572
        var heights []uint64
100✔
573
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
200✔
574
                if len(k) != 8 {
100✔
575
                        return ErrCorruptedFwdPkg
×
576
                }
×
577

578
                heights = append(heights, byteOrder.Uint64(k))
100✔
579

100✔
580
                return nil
100✔
581
        }); err != nil {
×
582
                return nil, err
×
583
        }
×
584

585
        // Load the forwarding package for each retrieved height.
586
        fwdPkgs := make([]*common.FwdPkg, 0, len(heights))
100✔
587
        for _, height := range heights {
200✔
588
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
100✔
589
                if err != nil {
100✔
590
                        return nil, err
×
591
                }
×
592

593
                fwdPkgs = append(fwdPkgs, fwdPkg)
100✔
594
        }
595

596
        return fwdPkgs, nil
100✔
597
}
598

599
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
600
// appropriate FwdState.
601
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
602
        height uint64) (*common.FwdPkg, error) {
100✔
603

100✔
604
        sourceKey := makeLogKey(source.ToUint64())
100✔
605
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
100✔
606
        if sourceBkt == nil {
100✔
607
                return nil, ErrCorruptedFwdPkg
×
608
        }
×
609

610
        heightKey := makeLogKey(height)
100✔
611
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
100✔
612
        if heightBkt == nil {
100✔
613
                return nil, ErrCorruptedFwdPkg
×
614
        }
×
615

616
        // Load ADDs from disk.
617
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
100✔
618
        if addBkt == nil {
100✔
619
                return nil, ErrCorruptedFwdPkg
×
620
        }
×
621

622
        adds, err := loadHtlcs(addBkt)
100✔
623
        if err != nil {
100✔
624
                return nil, err
×
625
        }
×
626

627
        // Load ack filter from disk.
628
        ackFilterBytes := heightBkt.Get(ackFilterKey)
100✔
629
        if ackFilterBytes == nil {
100✔
630
                return nil, ErrCorruptedFwdPkg
×
631
        }
×
632
        ackFilterReader := bytes.NewReader(ackFilterBytes)
100✔
633

100✔
634
        ackFilter := &common.PkgFilter{}
100✔
635
        if err := ackFilter.Decode(ackFilterReader); err != nil {
100✔
636
                return nil, err
×
637
        }
×
638

639
        // Load SETTLE/FAILs from disk.
640
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
100✔
641
        if failSettleBkt == nil {
100✔
642
                return nil, ErrCorruptedFwdPkg
×
643
        }
×
644

645
        failSettles, err := loadHtlcs(failSettleBkt)
100✔
646
        if err != nil {
100✔
647
                return nil, err
×
648
        }
×
649

650
        // Load settle fail filter from disk.
651
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
100✔
652
        if settleFailFilterBytes == nil {
100✔
653
                return nil, ErrCorruptedFwdPkg
×
654
        }
×
655
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
100✔
656

100✔
657
        settleFailFilter := &common.PkgFilter{}
100✔
658
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
100✔
659
                return nil, err
×
660
        }
×
661

662
        // Initialize the fwding package, which always starts in the
663
        // FwdStateLockedIn. We can determine what state the package was left in
664
        // by examining constraints on the information loaded from disk.
665
        fwdPkg := &common.FwdPkg{
100✔
666
                Source:           source,
100✔
667
                State:            common.FwdStateLockedIn,
100✔
668
                Height:           height,
100✔
669
                Adds:             adds,
100✔
670
                AckFilter:        ackFilter,
100✔
671
                SettleFails:      failSettles,
100✔
672
                SettleFailFilter: settleFailFilter,
100✔
673
        }
100✔
674

100✔
675
        // Check to see if we have written the set exported filter adds to
100✔
676
        // disk. If we haven't, processing of this package was never started, or
100✔
677
        // failed during the last attempt.
100✔
678
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
100✔
679
        if fwdFilterBytes == nil {
200✔
680
                nAdds := uint16(len(adds))
100✔
681
                fwdPkg.FwdFilter = common.NewPkgFilter(nAdds)
100✔
682
                return fwdPkg, nil
100✔
683
        }
100✔
684

685
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
×
686
        fwdPkg.FwdFilter = &common.PkgFilter{}
×
687
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
×
688
                return nil, err
×
689
        }
×
690

691
        // Otherwise, a complete round of processing was completed, and we
692
        // advance the package to FwdStateProcessed.
693
        fwdPkg.State = common.FwdStateProcessed
×
694

×
695
        // If every add, settle, and fail has been fully acknowledged, we can
×
696
        // safely set the package's state to FwdStateCompleted, signalling that
×
697
        // it can be garbage collected.
×
698
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
×
699
                fwdPkg.State = common.FwdStateCompleted
×
700
        }
×
701

702
        return fwdPkg, nil
×
703
}
704

705
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
706
// them in order of the indexes they were written under.
707
func loadHtlcs(bkt kvdb.RBucket) ([]common.LogUpdate, error) {
200✔
708
        var htlcs []common.LogUpdate
200✔
709
        if err := bkt.ForEach(func(_, v []byte) error {
600✔
710
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
400✔
711
                if err != nil {
400✔
712
                        return err
×
713
                }
×
714

715
                htlcs = append(htlcs, *htlc)
400✔
716

400✔
717
                return nil
400✔
718
        }); err != nil {
×
719
                return nil, err
×
720
        }
×
721

722
        return htlcs, nil
200✔
723
}
724

725
// serializeLogUpdate writes a log update to the provided io.Writer.
726
func serializeLogUpdate(w io.Writer, l *common.LogUpdate) error {
400✔
727
        return WriteElements(w, l.LogIndex, l.UpdateMsg)
400✔
728
}
400✔
729

730
// deserializeLogUpdate reads a log update from the provided io.Reader.
731
func deserializeLogUpdate(r io.Reader) (*common.LogUpdate, error) {
400✔
732
        l := &common.LogUpdate{}
400✔
733
        if err := ReadElements(r, &l.LogIndex, &l.UpdateMsg); err != nil {
400✔
734
                return nil, err
×
735
        }
×
736

737
        return l, nil
400✔
738
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc