• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration21/current/current_encoding.go
1
package current
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9

10
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
11
        "github.com/lightningnetwork/lnd/channeldb/migration21/common"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

UNCOV
15
func serializeChanCommit(w io.Writer, c *common.ChannelCommitment) error { // nolint: dupl
×
UNCOV
16
        if err := WriteElements(w,
×
UNCOV
17
                c.CommitHeight, c.LocalLogIndex, c.LocalHtlcIndex,
×
UNCOV
18
                c.RemoteLogIndex, c.RemoteHtlcIndex, c.LocalBalance,
×
UNCOV
19
                c.RemoteBalance, c.CommitFee, c.FeePerKw, c.CommitTx,
×
UNCOV
20
                c.CommitSig,
×
UNCOV
21
        ); err != nil {
×
22
                return err
×
23
        }
×
24

UNCOV
25
        return serializeHtlcs(w, c.Htlcs...)
×
26
}
27

UNCOV
28
func SerializeLogUpdates(w io.Writer, logUpdates []common.LogUpdate) error { // nolint: dupl
×
UNCOV
29
        numUpdates := uint16(len(logUpdates))
×
UNCOV
30
        if err := binary.Write(w, byteOrder, numUpdates); err != nil {
×
31
                return err
×
32
        }
×
33

UNCOV
34
        for _, diff := range logUpdates {
×
UNCOV
35
                err := WriteElements(w, diff.LogIndex, diff.UpdateMsg)
×
UNCOV
36
                if err != nil {
×
37
                        return err
×
38
                }
×
39
        }
40

UNCOV
41
        return nil
×
42
}
43

UNCOV
44
func serializeHtlcs(b io.Writer, htlcs ...common.HTLC) error { // nolint: dupl
×
UNCOV
45
        numHtlcs := uint16(len(htlcs))
×
UNCOV
46
        if err := WriteElement(b, numHtlcs); err != nil {
×
47
                return err
×
48
        }
×
49

UNCOV
50
        for _, htlc := range htlcs {
×
51
                if err := WriteElements(b,
×
52
                        htlc.Signature, htlc.RHash, htlc.Amt, htlc.RefundTimeout,
×
53
                        htlc.OutputIndex, htlc.Incoming, htlc.OnionBlob,
×
54
                        htlc.HtlcIndex, htlc.LogIndex,
×
55
                ); err != nil {
×
56
                        return err
×
57
                }
×
58
        }
59

UNCOV
60
        return nil
×
61
}
62

UNCOV
63
func SerializeCommitDiff(w io.Writer, diff *common.CommitDiff) error { // nolint: dupl
×
UNCOV
64
        if err := serializeChanCommit(w, &diff.Commitment); err != nil {
×
65
                return err
×
66
        }
×
67

UNCOV
68
        if err := WriteElements(w, diff.CommitSig); err != nil {
×
69
                return err
×
70
        }
×
71

UNCOV
72
        if err := SerializeLogUpdates(w, diff.LogUpdates); err != nil {
×
73
                return err
×
74
        }
×
75

UNCOV
76
        numOpenRefs := uint16(len(diff.OpenedCircuitKeys))
×
UNCOV
77
        if err := binary.Write(w, byteOrder, numOpenRefs); err != nil {
×
78
                return err
×
79
        }
×
80

UNCOV
81
        for _, openRef := range diff.OpenedCircuitKeys {
×
82
                err := WriteElements(w, openRef.ChanID, openRef.HtlcID)
×
83
                if err != nil {
×
84
                        return err
×
85
                }
×
86
        }
87

UNCOV
88
        numClosedRefs := uint16(len(diff.ClosedCircuitKeys))
×
UNCOV
89
        if err := binary.Write(w, byteOrder, numClosedRefs); err != nil {
×
90
                return err
×
91
        }
×
92

UNCOV
93
        for _, closedRef := range diff.ClosedCircuitKeys {
×
94
                err := WriteElements(w, closedRef.ChanID, closedRef.HtlcID)
×
95
                if err != nil {
×
96
                        return err
×
97
                }
×
98
        }
99

UNCOV
100
        return nil
×
101
}
102

UNCOV
103
func deserializeHtlcs(r io.Reader) ([]common.HTLC, error) { // nolint: dupl
×
UNCOV
104
        var numHtlcs uint16
×
UNCOV
105
        if err := ReadElement(r, &numHtlcs); err != nil {
×
106
                return nil, err
×
107
        }
×
108

UNCOV
109
        var htlcs []common.HTLC
×
UNCOV
110
        if numHtlcs == 0 {
×
UNCOV
111
                return htlcs, nil
×
UNCOV
112
        }
×
113

114
        htlcs = make([]common.HTLC, numHtlcs)
×
115
        for i := uint16(0); i < numHtlcs; i++ {
×
116
                if err := ReadElements(r,
×
117
                        &htlcs[i].Signature, &htlcs[i].RHash, &htlcs[i].Amt,
×
118
                        &htlcs[i].RefundTimeout, &htlcs[i].OutputIndex,
×
119
                        &htlcs[i].Incoming, &htlcs[i].OnionBlob,
×
120
                        &htlcs[i].HtlcIndex, &htlcs[i].LogIndex,
×
121
                ); err != nil {
×
122
                        return htlcs, err
×
123
                }
×
124
        }
125

126
        return htlcs, nil
×
127
}
128

UNCOV
129
func deserializeChanCommit(r io.Reader) (common.ChannelCommitment, error) { // nolint: dupl
×
UNCOV
130
        var c common.ChannelCommitment
×
UNCOV
131

×
UNCOV
132
        err := ReadElements(r,
×
UNCOV
133
                &c.CommitHeight, &c.LocalLogIndex, &c.LocalHtlcIndex, &c.RemoteLogIndex,
×
UNCOV
134
                &c.RemoteHtlcIndex, &c.LocalBalance, &c.RemoteBalance,
×
UNCOV
135
                &c.CommitFee, &c.FeePerKw, &c.CommitTx, &c.CommitSig,
×
UNCOV
136
        )
×
UNCOV
137
        if err != nil {
×
138
                return c, err
×
139
        }
×
140

UNCOV
141
        c.Htlcs, err = deserializeHtlcs(r)
×
UNCOV
142
        if err != nil {
×
143
                return c, err
×
144
        }
×
145

UNCOV
146
        return c, nil
×
147
}
148

UNCOV
149
func DeserializeLogUpdates(r io.Reader) ([]common.LogUpdate, error) { // nolint: dupl
×
UNCOV
150
        var numUpdates uint16
×
UNCOV
151
        if err := binary.Read(r, byteOrder, &numUpdates); err != nil {
×
152
                return nil, err
×
153
        }
×
154

UNCOV
155
        logUpdates := make([]common.LogUpdate, numUpdates)
×
UNCOV
156
        for i := 0; i < int(numUpdates); i++ {
×
UNCOV
157
                err := ReadElements(r,
×
UNCOV
158
                        &logUpdates[i].LogIndex, &logUpdates[i].UpdateMsg,
×
UNCOV
159
                )
×
UNCOV
160
                if err != nil {
×
161
                        return nil, err
×
162
                }
×
163
        }
UNCOV
164
        return logUpdates, nil
×
165
}
166

UNCOV
167
func DeserializeCommitDiff(r io.Reader) (*common.CommitDiff, error) { // nolint: dupl
×
UNCOV
168
        var (
×
UNCOV
169
                d   common.CommitDiff
×
UNCOV
170
                err error
×
UNCOV
171
        )
×
UNCOV
172

×
UNCOV
173
        d.Commitment, err = deserializeChanCommit(r)
×
UNCOV
174
        if err != nil {
×
175
                return nil, err
×
176
        }
×
177

UNCOV
178
        var msg lnwire.Message
×
UNCOV
179
        if err := ReadElements(r, &msg); err != nil {
×
180
                return nil, err
×
181
        }
×
UNCOV
182
        commitSig, ok := msg.(*lnwire.CommitSig)
×
UNCOV
183
        if !ok {
×
184
                return nil, fmt.Errorf("expected lnwire.CommitSig, instead "+
×
185
                        "read: %T", msg)
×
186
        }
×
UNCOV
187
        d.CommitSig = commitSig
×
UNCOV
188

×
UNCOV
189
        d.LogUpdates, err = DeserializeLogUpdates(r)
×
UNCOV
190
        if err != nil {
×
191
                return nil, err
×
192
        }
×
193

UNCOV
194
        var numOpenRefs uint16
×
UNCOV
195
        if err := binary.Read(r, byteOrder, &numOpenRefs); err != nil {
×
196
                return nil, err
×
197
        }
×
198

UNCOV
199
        d.OpenedCircuitKeys = make([]common.CircuitKey, numOpenRefs)
×
UNCOV
200
        for i := 0; i < int(numOpenRefs); i++ {
×
201
                err := ReadElements(r,
×
202
                        &d.OpenedCircuitKeys[i].ChanID,
×
203
                        &d.OpenedCircuitKeys[i].HtlcID)
×
204
                if err != nil {
×
205
                        return nil, err
×
206
                }
×
207
        }
208

UNCOV
209
        var numClosedRefs uint16
×
UNCOV
210
        if err := binary.Read(r, byteOrder, &numClosedRefs); err != nil {
×
211
                return nil, err
×
212
        }
×
213

UNCOV
214
        d.ClosedCircuitKeys = make([]common.CircuitKey, numClosedRefs)
×
UNCOV
215
        for i := 0; i < int(numClosedRefs); i++ {
×
216
                err := ReadElements(r,
×
217
                        &d.ClosedCircuitKeys[i].ChanID,
×
218
                        &d.ClosedCircuitKeys[i].HtlcID)
×
219
                if err != nil {
×
220
                        return nil, err
×
221
                }
×
222
        }
223

UNCOV
224
        return &d, nil
×
225
}
226

UNCOV
227
func SerializeNetworkResult(w io.Writer, n *common.NetworkResult) error { // nolint: dupl
×
UNCOV
228
        return WriteElements(w, n.Msg, n.Unencrypted, n.IsResolution)
×
UNCOV
229
}
×
230

UNCOV
231
func DeserializeNetworkResult(r io.Reader) (*common.NetworkResult, error) { // nolint: dupl
×
UNCOV
232
        n := &common.NetworkResult{}
×
UNCOV
233

×
UNCOV
234
        if err := ReadElements(r,
×
UNCOV
235
                &n.Msg, &n.Unencrypted, &n.IsResolution,
×
UNCOV
236
        ); err != nil {
×
237
                return nil, err
×
238
        }
×
239

UNCOV
240
        return n, nil
×
241
}
242

UNCOV
243
func writeChanConfig(b io.Writer, c *common.ChannelConfig) error { // nolint: dupl
×
UNCOV
244
        return WriteElements(b,
×
UNCOV
245
                c.DustLimit, c.MaxPendingAmount, c.ChanReserve, c.MinHTLC,
×
UNCOV
246
                c.MaxAcceptedHtlcs, c.CsvDelay, c.MultiSigKey,
×
UNCOV
247
                c.RevocationBasePoint, c.PaymentBasePoint, c.DelayBasePoint,
×
UNCOV
248
                c.HtlcBasePoint,
×
UNCOV
249
        )
×
UNCOV
250
}
×
251

UNCOV
252
func SerializeChannelCloseSummary(w io.Writer, cs *common.ChannelCloseSummary) error { // nolint: dupl
×
UNCOV
253
        err := WriteElements(w,
×
UNCOV
254
                cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID,
×
UNCOV
255
                cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance,
×
UNCOV
256
                cs.TimeLockedBalance, cs.CloseType, cs.IsPending,
×
UNCOV
257
        )
×
UNCOV
258
        if err != nil {
×
259
                return err
×
260
        }
×
261

262
        // If this is a close channel summary created before the addition of
263
        // the new fields, then we can exit here.
UNCOV
264
        if cs.RemoteCurrentRevocation == nil {
×
265
                return WriteElements(w, false)
×
266
        }
×
267

268
        // If fields are present, write boolean to indicate this, and continue.
UNCOV
269
        if err := WriteElements(w, true); err != nil {
×
270
                return err
×
271
        }
×
272

UNCOV
273
        if err := WriteElements(w, cs.RemoteCurrentRevocation); err != nil {
×
274
                return err
×
275
        }
×
276

UNCOV
277
        if err := writeChanConfig(w, &cs.LocalChanConfig); err != nil {
×
278
                return err
×
279
        }
×
280

281
        // The RemoteNextRevocation field is optional, as it's possible for a
282
        // channel to be closed before we learn of the next unrevoked
283
        // revocation point for the remote party. Write a boolean indicating
284
        // whether this field is present or not.
UNCOV
285
        if err := WriteElements(w, cs.RemoteNextRevocation != nil); err != nil {
×
286
                return err
×
287
        }
×
288

289
        // Write the field, if present.
UNCOV
290
        if cs.RemoteNextRevocation != nil {
×
UNCOV
291
                if err = WriteElements(w, cs.RemoteNextRevocation); err != nil {
×
292
                        return err
×
293
                }
×
294
        }
295

296
        // Write whether the channel sync message is present.
UNCOV
297
        if err := WriteElements(w, cs.LastChanSyncMsg != nil); err != nil {
×
298
                return err
×
299
        }
×
300

301
        // Write the channel sync message, if present.
UNCOV
302
        if cs.LastChanSyncMsg != nil {
×
UNCOV
303
                if err := WriteElements(w, cs.LastChanSyncMsg); err != nil {
×
304
                        return err
×
305
                }
×
306
        }
307

UNCOV
308
        return nil
×
309
}
310

UNCOV
311
func readChanConfig(b io.Reader, c *common.ChannelConfig) error {
×
UNCOV
312
        return ReadElements(b,
×
UNCOV
313
                &c.DustLimit, &c.MaxPendingAmount, &c.ChanReserve,
×
UNCOV
314
                &c.MinHTLC, &c.MaxAcceptedHtlcs, &c.CsvDelay,
×
UNCOV
315
                &c.MultiSigKey, &c.RevocationBasePoint,
×
UNCOV
316
                &c.PaymentBasePoint, &c.DelayBasePoint,
×
UNCOV
317
                &c.HtlcBasePoint,
×
UNCOV
318
        )
×
UNCOV
319
}
×
320

UNCOV
321
func DeserializeCloseChannelSummary(r io.Reader) (*common.ChannelCloseSummary, error) { // nolint: dupl
×
UNCOV
322
        c := &common.ChannelCloseSummary{}
×
UNCOV
323

×
UNCOV
324
        err := ReadElements(r,
×
UNCOV
325
                &c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID,
×
UNCOV
326
                &c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance,
×
UNCOV
327
                &c.TimeLockedBalance, &c.CloseType, &c.IsPending,
×
UNCOV
328
        )
×
UNCOV
329
        if err != nil {
×
330
                return nil, err
×
331
        }
×
332

333
        // We'll now check to see if the channel close summary was encoded with
334
        // any of the additional optional fields.
UNCOV
335
        var hasNewFields bool
×
UNCOV
336
        err = ReadElements(r, &hasNewFields)
×
UNCOV
337
        if err != nil {
×
338
                return nil, err
×
339
        }
×
340

341
        // If fields are not present, we can return.
UNCOV
342
        if !hasNewFields {
×
343
                return c, nil
×
344
        }
×
345

346
        // Otherwise read the new fields.
UNCOV
347
        if err := ReadElements(r, &c.RemoteCurrentRevocation); err != nil {
×
348
                return nil, err
×
349
        }
×
350

UNCOV
351
        if err := readChanConfig(r, &c.LocalChanConfig); err != nil {
×
352
                return nil, err
×
353
        }
×
354

355
        // Finally, we'll attempt to read the next unrevoked commitment point
356
        // for the remote party. If we closed the channel before receiving a
357
        // funding locked message then this might not be present. A boolean
358
        // indicating whether the field is present will come first.
UNCOV
359
        var hasRemoteNextRevocation bool
×
UNCOV
360
        err = ReadElements(r, &hasRemoteNextRevocation)
×
UNCOV
361
        if err != nil {
×
362
                return nil, err
×
363
        }
×
364

365
        // If this field was written, read it.
UNCOV
366
        if hasRemoteNextRevocation {
×
UNCOV
367
                err = ReadElements(r, &c.RemoteNextRevocation)
×
UNCOV
368
                if err != nil {
×
369
                        return nil, err
×
370
                }
×
371
        }
372

373
        // Check if we have a channel sync message to read.
UNCOV
374
        var hasChanSyncMsg bool
×
UNCOV
375
        err = ReadElements(r, &hasChanSyncMsg)
×
UNCOV
376
        if err == io.EOF {
×
377
                return c, nil
×
UNCOV
378
        } else if err != nil {
×
379
                return nil, err
×
380
        }
×
381

382
        // If a chan sync message is present, read it.
UNCOV
383
        if hasChanSyncMsg {
×
UNCOV
384
                // We must pass in reference to a lnwire.Message for the codec
×
UNCOV
385
                // to support it.
×
UNCOV
386
                var msg lnwire.Message
×
UNCOV
387
                if err := ReadElements(r, &msg); err != nil {
×
388
                        return nil, err
×
389
                }
×
390

UNCOV
391
                chanSync, ok := msg.(*lnwire.ChannelReestablish)
×
UNCOV
392
                if !ok {
×
393
                        return nil, errors.New("unable cast db Message to " +
×
394
                                "ChannelReestablish")
×
395
                }
×
UNCOV
396
                c.LastChanSyncMsg = chanSync
×
397
        }
398

UNCOV
399
        return c, nil
×
400
}
401

402
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
403
// package has potentially been mangled.
404
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
405

406
var (
407
        // fwdPackagesKey is the root-level bucket that all forwarding packages
408
        // are written. This bucket is further subdivided based on the short
409
        // channel ID of each channel.
410
        fwdPackagesKey = []byte("fwd-packages")
411

412
        // addBucketKey is the bucket to which all Add log updates are written.
413
        addBucketKey = []byte("add-updates")
414

415
        // failSettleBucketKey is the bucket to which all Settle/Fail log
416
        // updates are written.
417
        failSettleBucketKey = []byte("fail-settle-updates")
418

419
        // fwdFilterKey is a key used to write the set of Adds that passed
420
        // validation and are to be forwarded to the switch.
421
        // NOTE: The presence of this key within a forwarding package indicates
422
        // that the package has reached FwdStateProcessed.
423
        fwdFilterKey = []byte("fwd-filter-key")
424

425
        // ackFilterKey is a key used to access the PkgFilter indicating which
426
        // Adds have received a Settle/Fail. This response may come from a
427
        // number of sources, including: exitHop settle/fails, switch failures,
428
        // chain arbiter interjections, as well as settle/fails from the
429
        // next hop in the route.
430
        ackFilterKey = []byte("ack-filter-key")
431

432
        // settleFailFilterKey is a key used to access the PkgFilter indicating
433
        // which Settles/Fails in have been received and processed by the link
434
        // that originally received the Add.
435
        settleFailFilterKey = []byte("settle-fail-filter-key")
436
)
437

UNCOV
438
func makeLogKey(updateNum uint64) [8]byte {
×
UNCOV
439
        var key [8]byte
×
UNCOV
440
        byteOrder.PutUint64(key[:], updateNum)
×
UNCOV
441
        return key
×
UNCOV
442
}
×
443

444
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
UNCOV
445
func uint16Key(i uint16) []byte {
×
UNCOV
446
        key := make([]byte, 2)
×
UNCOV
447
        byteOrder.PutUint16(key, i)
×
UNCOV
448
        return key
×
UNCOV
449
}
×
450

451
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
452
// packages. The packager is tied to a particular source channel ID, allowing it
453
// to create and edit its own packages. Each packager also has the ability to
454
// remove fail/settle htlcs that correspond to an add contained in one of
455
// source's packages.
456
type ChannelPackager struct {
457
        source lnwire.ShortChannelID
458
}
459

460
// NewChannelPackager creates a new packager for a single channel.
UNCOV
461
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
×
UNCOV
462
        return &ChannelPackager{
×
UNCOV
463
                source: source,
×
UNCOV
464
        }
×
UNCOV
465
}
×
466

467
// AddFwdPkg writes a newly locked in forwarding package to disk.
UNCOV
468
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *common.FwdPkg) error { // nolint: dupl
×
UNCOV
469
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
×
UNCOV
470
        if err != nil {
×
471
                return err
×
472
        }
×
473

UNCOV
474
        source := makeLogKey(fwdPkg.Source.ToUint64())
×
UNCOV
475
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
×
UNCOV
476
        if err != nil {
×
477
                return err
×
478
        }
×
479

UNCOV
480
        heightKey := makeLogKey(fwdPkg.Height)
×
UNCOV
481
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
×
UNCOV
482
        if err != nil {
×
483
                return err
×
484
        }
×
485

486
        // Write ADD updates we received at this commit height.
UNCOV
487
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
×
UNCOV
488
        if err != nil {
×
489
                return err
×
490
        }
×
491

492
        // Write SETTLE/FAIL updates we received at this commit height.
UNCOV
493
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
×
UNCOV
494
        if err != nil {
×
495
                return err
×
496
        }
×
497

UNCOV
498
        for i := range fwdPkg.Adds {
×
UNCOV
499
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
×
UNCOV
500
                if err != nil {
×
501
                        return err
×
502
                }
×
503
        }
504

505
        // Persist the initialized pkg filter, which will be used to determine
506
        // when we can remove this forwarding package from disk.
UNCOV
507
        var ackFilterBuf bytes.Buffer
×
UNCOV
508
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
×
509
                return err
×
510
        }
×
511

UNCOV
512
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
×
513
                return err
×
514
        }
×
515

UNCOV
516
        for i := range fwdPkg.SettleFails {
×
UNCOV
517
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
×
UNCOV
518
                if err != nil {
×
519
                        return err
×
520
                }
×
521
        }
522

UNCOV
523
        var settleFailFilterBuf bytes.Buffer
×
UNCOV
524
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
×
UNCOV
525
        if err != nil {
×
526
                return err
×
527
        }
×
528

UNCOV
529
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
×
530
}
531

532
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
UNCOV
533
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *common.LogUpdate) error {
×
UNCOV
534
        var b bytes.Buffer
×
UNCOV
535
        if err := serializeLogUpdate(&b, htlc); err != nil {
×
536
                return err
×
537
        }
×
538

UNCOV
539
        return bkt.Put(uint16Key(idx), b.Bytes())
×
540
}
541

542
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
543
// processed, and returns their deserialized log updates in a map indexed by the
544
// remote commitment height at which the updates were locked in.
UNCOV
545
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*common.FwdPkg, error) {
×
UNCOV
546
        return loadChannelFwdPkgs(tx, p.source)
×
UNCOV
547
}
×
548

549
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
UNCOV
550
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*common.FwdPkg, error) { // nolint: dupl
×
UNCOV
551
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
×
UNCOV
552
        if fwdPkgBkt == nil {
×
553
                return nil, nil
×
554
        }
×
555

UNCOV
556
        sourceKey := makeLogKey(source.ToUint64())
×
UNCOV
557
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
×
UNCOV
558
        if sourceBkt == nil {
×
559
                return nil, nil
×
560
        }
×
561

UNCOV
562
        var heights []uint64
×
UNCOV
563
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
×
UNCOV
564
                if len(k) != 8 {
×
565
                        return ErrCorruptedFwdPkg
×
566
                }
×
567

UNCOV
568
                heights = append(heights, byteOrder.Uint64(k))
×
UNCOV
569

×
UNCOV
570
                return nil
×
571
        }); err != nil {
×
572
                return nil, err
×
573
        }
×
574

575
        // Load the forwarding package for each retrieved height.
UNCOV
576
        fwdPkgs := make([]*common.FwdPkg, 0, len(heights))
×
UNCOV
577
        for _, height := range heights {
×
UNCOV
578
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
×
UNCOV
579
                if err != nil {
×
580
                        return nil, err
×
581
                }
×
582

UNCOV
583
                fwdPkgs = append(fwdPkgs, fwdPkg)
×
584
        }
585

UNCOV
586
        return fwdPkgs, nil
×
587
}
588

589
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
590
// appropriate FwdState.
591
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
UNCOV
592
        height uint64) (*common.FwdPkg, error) {
×
UNCOV
593

×
UNCOV
594
        sourceKey := makeLogKey(source.ToUint64())
×
UNCOV
595
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
×
UNCOV
596
        if sourceBkt == nil {
×
597
                return nil, ErrCorruptedFwdPkg
×
598
        }
×
599

UNCOV
600
        heightKey := makeLogKey(height)
×
UNCOV
601
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
×
UNCOV
602
        if heightBkt == nil {
×
603
                return nil, ErrCorruptedFwdPkg
×
604
        }
×
605

606
        // Load ADDs from disk.
UNCOV
607
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
×
UNCOV
608
        if addBkt == nil {
×
609
                return nil, ErrCorruptedFwdPkg
×
610
        }
×
611

UNCOV
612
        adds, err := loadHtlcs(addBkt)
×
UNCOV
613
        if err != nil {
×
614
                return nil, err
×
615
        }
×
616

617
        // Load ack filter from disk.
UNCOV
618
        ackFilterBytes := heightBkt.Get(ackFilterKey)
×
UNCOV
619
        if ackFilterBytes == nil {
×
620
                return nil, ErrCorruptedFwdPkg
×
621
        }
×
UNCOV
622
        ackFilterReader := bytes.NewReader(ackFilterBytes)
×
UNCOV
623

×
UNCOV
624
        ackFilter := &common.PkgFilter{}
×
UNCOV
625
        if err := ackFilter.Decode(ackFilterReader); err != nil {
×
626
                return nil, err
×
627
        }
×
628

629
        // Load SETTLE/FAILs from disk.
UNCOV
630
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
×
UNCOV
631
        if failSettleBkt == nil {
×
632
                return nil, ErrCorruptedFwdPkg
×
633
        }
×
634

UNCOV
635
        failSettles, err := loadHtlcs(failSettleBkt)
×
UNCOV
636
        if err != nil {
×
637
                return nil, err
×
638
        }
×
639

640
        // Load settle fail filter from disk.
UNCOV
641
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
×
UNCOV
642
        if settleFailFilterBytes == nil {
×
643
                return nil, ErrCorruptedFwdPkg
×
644
        }
×
UNCOV
645
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
×
UNCOV
646

×
UNCOV
647
        settleFailFilter := &common.PkgFilter{}
×
UNCOV
648
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
×
649
                return nil, err
×
650
        }
×
651

652
        // Initialize the fwding package, which always starts in the
653
        // FwdStateLockedIn. We can determine what state the package was left in
654
        // by examining constraints on the information loaded from disk.
UNCOV
655
        fwdPkg := &common.FwdPkg{
×
UNCOV
656
                Source:           source,
×
UNCOV
657
                State:            common.FwdStateLockedIn,
×
UNCOV
658
                Height:           height,
×
UNCOV
659
                Adds:             adds,
×
UNCOV
660
                AckFilter:        ackFilter,
×
UNCOV
661
                SettleFails:      failSettles,
×
UNCOV
662
                SettleFailFilter: settleFailFilter,
×
UNCOV
663
        }
×
UNCOV
664

×
UNCOV
665
        // Check to see if we have written the set exported filter adds to
×
UNCOV
666
        // disk. If we haven't, processing of this package was never started, or
×
UNCOV
667
        // failed during the last attempt.
×
UNCOV
668
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
×
UNCOV
669
        if fwdFilterBytes == nil {
×
UNCOV
670
                nAdds := uint16(len(adds))
×
UNCOV
671
                fwdPkg.FwdFilter = common.NewPkgFilter(nAdds)
×
UNCOV
672
                return fwdPkg, nil
×
UNCOV
673
        }
×
674

675
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
×
676
        fwdPkg.FwdFilter = &common.PkgFilter{}
×
677
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
×
678
                return nil, err
×
679
        }
×
680

681
        // Otherwise, a complete round of processing was completed, and we
682
        // advance the package to FwdStateProcessed.
683
        fwdPkg.State = common.FwdStateProcessed
×
684

×
685
        // If every add, settle, and fail has been fully acknowledged, we can
×
686
        // safely set the package's state to FwdStateCompleted, signalling that
×
687
        // it can be garbage collected.
×
688
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
×
689
                fwdPkg.State = common.FwdStateCompleted
×
690
        }
×
691

692
        return fwdPkg, nil
×
693
}
694

695
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
696
// them in order of the indexes they were written under.
UNCOV
697
func loadHtlcs(bkt kvdb.RBucket) ([]common.LogUpdate, error) {
×
UNCOV
698
        var htlcs []common.LogUpdate
×
UNCOV
699
        if err := bkt.ForEach(func(_, v []byte) error {
×
UNCOV
700
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
×
UNCOV
701
                if err != nil {
×
702
                        return err
×
703
                }
×
704

UNCOV
705
                htlcs = append(htlcs, *htlc)
×
UNCOV
706

×
UNCOV
707
                return nil
×
708
        }); err != nil {
×
709
                return nil, err
×
710
        }
×
711

UNCOV
712
        return htlcs, nil
×
713
}
714

715
// serializeLogUpdate writes a log update to the provided io.Writer.
UNCOV
716
func serializeLogUpdate(w io.Writer, l *common.LogUpdate) error {
×
UNCOV
717
        return WriteElements(w, l.LogIndex, l.UpdateMsg)
×
UNCOV
718
}
×
719

720
// deserializeLogUpdate reads a log update from the provided io.Reader.
UNCOV
721
func deserializeLogUpdate(r io.Reader) (*common.LogUpdate, error) {
×
UNCOV
722
        l := &common.LogUpdate{}
×
UNCOV
723
        if err := ReadElements(r, &l.LogIndex, &l.UpdateMsg); err != nil {
×
724
                return nil, err
×
725
        }
×
726

UNCOV
727
        return l, nil
×
728
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc