• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17132206455

21 Aug 2025 03:56PM UTC coverage: 54.685% (-2.6%) from 57.321%
17132206455

Pull #10167

github

web-flow
Merge 5dd2ed093 into 0c2f045f5
Pull Request #10167: multi: bump Go to 1.24.6

4 of 31 new or added lines in 10 files covered. (12.9%)

23854 existing lines in 284 files now uncovered.

108937 of 199210 relevant lines covered (54.68%)

22026.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.5
/channeldb/forwarding_package.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9

10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
15
// package has potentially been mangled.
16
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
17

18
// FwdState is an enum used to describe the lifecycle of a FwdPkg.
19
type FwdState byte
20

21
const (
22
        // FwdStateLockedIn is the starting state for all forwarding packages.
23
        // Packages in this state have not yet committed to the exact set of
24
        // Adds to forward to the switch.
25
        FwdStateLockedIn FwdState = iota
26

27
        // FwdStateProcessed marks the state in which all Adds have been
28
        // locally processed and the forwarding decision to the switch has been
29
        // persisted.
30
        FwdStateProcessed
31

32
        // FwdStateCompleted signals that all Adds have been acked, and that all
33
        // settles and fails have been delivered to their sources. Packages in
34
        // this state can be removed permanently.
35
        FwdStateCompleted
36
)
37

38
var (
39
        // fwdPackagesKey is the root-level bucket that all forwarding packages
40
        // are written. This bucket is further subdivided based on the short
41
        // channel ID of each channel.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // fwdPackagesKey(root-bucket)
46
        //             |
47
        //             |-- <shortChannelID>
48
        //             |       |
49
        //             |       |-- <height>
50
        //             |       |       |-- ackFilterKey: <encoded bytes of PkgFilter>
51
        //             |       |       |-- settleFailFilterKey: <encoded bytes of PkgFilter>
52
        //             |       |       |-- fwdFilterKey: <encoded bytes of PkgFilter>
53
        //             |       |       |
54
        //             |       |       |-- addBucketKey
55
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
56
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
57
        //             |       |       |        ...
58
        //             |       |       |
59
        //             |       |       |-- failSettleBucketKey
60
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
61
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
62
        //             |       |                ...
63
        //             |       |
64
        //             |       |-- <height>
65
        //             |       |       |
66
        //             |       ...     ...
67
        //             |
68
        //             |
69
        //             |-- <shortChannelID>
70
        //             |       |
71
        //        |       ...
72
        //         ...
73
        //
74
        fwdPackagesKey = []byte("fwd-packages")
75

76
        // addBucketKey is the bucket to which all Add log updates are written.
77
        addBucketKey = []byte("add-updates")
78

79
        // failSettleBucketKey is the bucket to which all Settle/Fail log
80
        // updates are written.
81
        failSettleBucketKey = []byte("fail-settle-updates")
82

83
        // fwdFilterKey is a key used to write the set of Adds that passed
84
        // validation and are to be forwarded to the switch.
85
        // NOTE: The presence of this key within a forwarding package indicates
86
        // that the package has reached FwdStateProcessed.
87
        fwdFilterKey = []byte("fwd-filter-key")
88

89
        // ackFilterKey is a key used to access the PkgFilter indicating which
90
        // Adds have received a Settle/Fail. This response may come from a
91
        // number of sources, including: exitHop settle/fails, switch failures,
92
        // chain arbiter interjections, as well as settle/fails from the
93
        // next hop in the route.
94
        ackFilterKey = []byte("ack-filter-key")
95

96
        // settleFailFilterKey is a key used to access the PkgFilter indicating
97
        // which Settles/Fails in have been received and processed by the link
98
        // that originally received the Add.
99
        settleFailFilterKey = []byte("settle-fail-filter-key")
100
)
101

102
// PkgFilter is used to compactly represent a particular subset of the Adds in a
103
// forwarding package. Each filter is represented as a simple, statically-sized
104
// bitvector, where the elements are intended to be the indices of the Adds as
105
// they are written in the FwdPkg.
106
type PkgFilter struct {
107
        count  uint16
108
        filter []byte
109
}
110

111
// NewPkgFilter initializes an empty PkgFilter supporting `count` elements.
112
func NewPkgFilter(count uint16) *PkgFilter {
7,249✔
113
        // We add 7 to ensure that the integer division yields properly rounded
7,249✔
114
        // values.
7,249✔
115
        filterLen := (count + 7) / 8
7,249✔
116

7,249✔
117
        return &PkgFilter{
7,249✔
118
                count:  count,
7,249✔
119
                filter: make([]byte, filterLen),
7,249✔
120
        }
7,249✔
121
}
7,249✔
122

123
// Count returns the number of elements represented by this PkgFilter.
124
func (f *PkgFilter) Count() uint16 {
1,004✔
125
        return f.count
1,004✔
126
}
1,004✔
127

128
// Set marks the `i`-th element as included by this filter.
129
// NOTE: It is assumed that i is always less than count.
130
func (f *PkgFilter) Set(i uint16) {
500,025✔
131
        byt := i / 8
500,025✔
132
        bit := i % 8
500,025✔
133

500,025✔
134
        // Set the i-th bit in the filter.
500,025✔
135
        // TODO(conner): ignore if > count to prevent panic?
500,025✔
136
        f.filter[byt] |= byte(1 << (7 - bit))
500,025✔
137
}
500,025✔
138

139
// Contains queries the filter for membership of index `i`.
140
// NOTE: It is assumed that i is always less than count.
141
func (f *PkgFilter) Contains(i uint16) bool {
1,013,450✔
142
        byt := i / 8
1,013,450✔
143
        bit := i % 8
1,013,450✔
144

1,013,450✔
145
        // Read the i-th bit in the filter.
1,013,450✔
146
        // TODO(conner): ignore if > count to prevent panic?
1,013,450✔
147
        return f.filter[byt]&(1<<(7-bit)) != 0
1,013,450✔
148
}
1,013,450✔
149

150
// Equal checks two PkgFilters for equality.
151
func (f *PkgFilter) Equal(f2 *PkgFilter) bool {
501,534✔
152
        if f == f2 {
501,534✔
153
                return true
×
154
        }
×
155
        if f.count != f2.count {
501,534✔
156
                return false
×
157
        }
×
158

159
        return bytes.Equal(f.filter, f2.filter)
501,534✔
160
}
161

162
// IsFull returns true if every element in the filter has been Set, and false
163
// otherwise.
164
func (f *PkgFilter) IsFull() bool {
499,622✔
165
        // Batch validate bytes that are fully used.
499,622✔
166
        for i := uint16(0); i < f.count/8; i++ {
21,609,647✔
167
                if f.filter[i] != 0xFF {
21,605,048✔
168
                        return false
495,023✔
169
                }
495,023✔
170
        }
171

172
        // If the count is not a multiple of 8, check that the filter contains
173
        // all remaining bits.
174
        rem := f.count % 8
4,599✔
175
        for idx := f.count - rem; idx < f.count; idx++ {
18,688✔
176
                if !f.Contains(idx) {
17,626✔
177
                        return false
3,537✔
178
                }
3,537✔
179
        }
180

181
        return true
1,062✔
182
}
183

184
// Size returns number of bytes produced when the PkgFilter is serialized.
185
func (f *PkgFilter) Size() uint16 {
1,003,671✔
186
        // 2 bytes for uint16 `count`, then round up number of bytes required to
1,003,671✔
187
        // represent `count` bits.
1,003,671✔
188
        return 2 + (f.count+7)/8
1,003,671✔
189
}
1,003,671✔
190

191
// Encode writes the filter to the provided io.Writer.
192
func (f *PkgFilter) Encode(w io.Writer) error {
507,355✔
193
        if err := binary.Write(w, binary.BigEndian, f.count); err != nil {
507,355✔
194
                return err
×
195
        }
×
196

197
        _, err := w.Write(f.filter)
507,355✔
198

507,355✔
199
        return err
507,355✔
200
}
201

202
// Decode reads the filter from the provided io.Reader.
203
func (f *PkgFilter) Decode(r io.Reader) error {
502,137✔
204
        if err := binary.Read(r, binary.BigEndian, &f.count); err != nil {
502,137✔
205
                return err
×
206
        }
×
207

208
        f.filter = make([]byte, f.Size()-2)
502,137✔
209
        _, err := io.ReadFull(r, f.filter)
502,137✔
210

502,137✔
211
        return err
502,137✔
212
}
213

214
// String returns a human-readable string.
UNCOV
215
func (f *PkgFilter) String() string {
×
UNCOV
216
        return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter)
×
UNCOV
217
}
×
218

219
// FwdPkg records all adds, settles, and fails that were locked in as a result
220
// of the remote peer sending us a revocation. Each package is identified by
221
// the short chanid and remote commitment height corresponding to the revocation
222
// that locked in the HTLCs. For everything except a locally initiated payment,
223
// settles and fails in a forwarding package must have a corresponding Add in
224
// another package, and can be removed individually once the source link has
225
// received the fail/settle.
226
//
227
// Adds cannot be removed, as we need to present the same batch of Adds to
228
// properly handle replay protection. Instead, we use a PkgFilter to mark that
229
// we have finished processing a particular Add. A FwdPkg should only be deleted
230
// after the AckFilter is full and all settles and fails have been persistently
231
// removed.
232
type FwdPkg struct {
233
        // Source identifies the channel that wrote this forwarding package.
234
        Source lnwire.ShortChannelID
235

236
        // Height is the height of the remote commitment chain that locked in
237
        // this forwarding package.
238
        Height uint64
239

240
        // State signals the persistent condition of the package and directs how
241
        // to reprocess the package in the event of failures.
242
        State FwdState
243

244
        // Adds contains all add messages which need to be processed and
245
        // forwarded to the switch. Adds does not change over the life of a
246
        // forwarding package.
247
        Adds []LogUpdate
248

249
        // FwdFilter is a filter containing the indices of all Adds that were
250
        // forwarded to the switch.
251
        //
252
        // NOTE: This value signals when persisted to disk that the fwd package
253
        // has been processed and garbage collection can happen. So it also
254
        // has to be set for packages with no adds (empty packages or only
255
        // settle/fail packages) so that they can be garbage collected as well.
256
        FwdFilter *PkgFilter
257

258
        // AckFilter is a filter containing the indices of all Adds for which
259
        // the source has received a settle or fail and is reflected in the next
260
        // commitment txn. A package should not be removed until IsFull()
261
        // returns true.
262
        AckFilter *PkgFilter
263

264
        // SettleFails contains all settle and fail messages that should be
265
        // forwarded to the switch.
266
        SettleFails []LogUpdate
267

268
        // SettleFailFilter is a filter containing the indices of all Settle or
269
        // Fails originating in this package that have been received and locked
270
        // into the incoming link's commitment state.
271
        SettleFailFilter *PkgFilter
272
}
273

274
// NewFwdPkg initializes a new forwarding package in FwdStateLockedIn. This
275
// should be used to create a package at the time we receive a revocation.
276
func NewFwdPkg(source lnwire.ShortChannelID, height uint64,
277
        addUpdates, settleFailUpdates []LogUpdate) *FwdPkg {
2,078✔
278

2,078✔
279
        nAddUpdates := uint16(len(addUpdates))
2,078✔
280
        nSettleFailUpdates := uint16(len(settleFailUpdates))
2,078✔
281

2,078✔
282
        return &FwdPkg{
2,078✔
283
                Source:           source,
2,078✔
284
                Height:           height,
2,078✔
285
                State:            FwdStateLockedIn,
2,078✔
286
                Adds:             addUpdates,
2,078✔
287
                FwdFilter:        NewPkgFilter(nAddUpdates),
2,078✔
288
                AckFilter:        NewPkgFilter(nAddUpdates),
2,078✔
289
                SettleFails:      settleFailUpdates,
2,078✔
290
                SettleFailFilter: NewPkgFilter(nSettleFailUpdates),
2,078✔
291
        }
2,078✔
292
}
2,078✔
293

294
// SourceRef is a convenience method that returns an AddRef to this forwarding
295
// package for the index in the argument. It is the caller's responsibility
296
// to ensure that the index is in bounds.
297
func (f *FwdPkg) SourceRef(i uint16) AddRef {
449✔
298
        return AddRef{
449✔
299
                Height: f.Height,
449✔
300
                Index:  i,
449✔
301
        }
449✔
302
}
449✔
303

304
// DestRef is a convenience method that returns a SettleFailRef to this
305
// forwarding package for the index in the argument. It is the caller's
306
// responsibility to ensure that the index is in bounds.
307
func (f *FwdPkg) DestRef(i uint16) SettleFailRef {
316✔
308
        return SettleFailRef{
316✔
309
                Source: f.Source,
316✔
310
                Height: f.Height,
316✔
311
                Index:  i,
316✔
312
        }
316✔
313
}
316✔
314

315
// ID returns an unique identifier for this package, used to ensure that sphinx
316
// replay processing of this batch is idempotent.
317
func (f *FwdPkg) ID() []byte {
1,201✔
318
        var id = make([]byte, 16)
1,201✔
319
        byteOrder.PutUint64(id[:8], f.Source.ToUint64())
1,201✔
320
        byteOrder.PutUint64(id[8:], f.Height)
1,201✔
321
        return id
1,201✔
322
}
1,201✔
323

324
// String returns a human-readable description of the forwarding package.
325
func (f *FwdPkg) String() string {
×
326
        return fmt.Sprintf("%T(src=%v, height=%v, nadds=%v, nfailsettles=%v)",
×
327
                f, f.Source, f.Height, len(f.Adds), len(f.SettleFails))
×
328
}
×
329

330
// AddRef is used to identify a particular Add in a FwdPkg. The short channel ID
331
// is assumed to be that of the packager.
332
type AddRef struct {
333
        // Height is the remote commitment height that locked in the Add.
334
        Height uint64
335

336
        // Index is the index of the Add within the fwd pkg's Adds.
337
        //
338
        // NOTE: This index is static over the lifetime of a forwarding package.
339
        Index uint16
340
}
341

342
// Encode serializes the AddRef to the given io.Writer.
343
func (a *AddRef) Encode(w io.Writer) error {
583✔
344
        if err := binary.Write(w, binary.BigEndian, a.Height); err != nil {
583✔
345
                return err
×
346
        }
×
347

348
        return binary.Write(w, binary.BigEndian, a.Index)
583✔
349
}
350

351
// Decode deserializes the AddRef from the given io.Reader.
352
func (a *AddRef) Decode(r io.Reader) error {
102✔
353
        if err := binary.Read(r, binary.BigEndian, &a.Height); err != nil {
102✔
354
                return err
×
355
        }
×
356

357
        return binary.Read(r, binary.BigEndian, &a.Index)
102✔
358
}
359

360
// SettleFailRef is used to locate a Settle/Fail in another channel's FwdPkg. A
361
// channel does not remove its own Settle/Fail htlcs, so the source is provided
362
// to locate a db bucket belonging to another channel.
363
type SettleFailRef struct {
364
        // Source identifies the outgoing link that locked in the settle or
365
        // fail. This is then used by the *incoming* link to find the settle
366
        // fail in another link's forwarding packages.
367
        Source lnwire.ShortChannelID
368

369
        // Height is the remote commitment height that locked in this
370
        // Settle/Fail.
371
        Height uint64
372

373
        // Index is the index of the Add with the fwd pkg's SettleFails.
374
        //
375
        // NOTE: This index is static over the lifetime of a forwarding package.
376
        Index uint16
377
}
378

379
// SettleFailAcker is a generic interface providing the ability to acknowledge
380
// settle/fail HTLCs stored in forwarding packages.
381
type SettleFailAcker interface {
382
        // AckSettleFails atomically updates the settle-fail filters in *other*
383
        // channels' forwarding packages.
384
        AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error
385
}
386

387
// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages
388
// of any active channel.
389
type GlobalFwdPkgReader interface {
390
        // LoadChannelFwdPkgs loads all known forwarding packages for the given
391
        // channel.
392
        LoadChannelFwdPkgs(tx kvdb.RTx,
393
                source lnwire.ShortChannelID) ([]*FwdPkg, error)
394
}
395

396
// FwdOperator defines the interfaces for managing forwarding packages that are
397
// external to a particular channel. This interface is used by the switch to
398
// read forwarding packages from arbitrary channels, and acknowledge settles and
399
// fails for locally-sourced payments.
400
type FwdOperator interface {
401
        // GlobalFwdPkgReader provides read access to all known forwarding
402
        // packages
403
        GlobalFwdPkgReader
404

405
        // SettleFailAcker grants the ability to acknowledge settles or fails
406
        // residing in arbitrary forwarding packages.
407
        SettleFailAcker
408
}
409

410
// SwitchPackager is a concrete implementation of the FwdOperator interface.
411
// A SwitchPackager offers the ability to read any forwarding package, and ack
412
// arbitrary settle and fail HTLCs.
413
type SwitchPackager struct{}
414

415
// NewSwitchPackager instantiates a new SwitchPackager.
416
func NewSwitchPackager() *SwitchPackager {
341✔
417
        return &SwitchPackager{}
341✔
418
}
341✔
419

420
// AckSettleFails atomically updates the settle-fail filters in *other*
421
// channels' forwarding packages, to mark that the switch has received a settle
422
// or fail residing in the forwarding package of a link.
423
func (*SwitchPackager) AckSettleFails(tx kvdb.RwTx,
424
        settleFailRefs ...SettleFailRef) error {
118✔
425

118✔
426
        return ackSettleFails(tx, settleFailRefs)
118✔
427
}
118✔
428

429
// LoadChannelFwdPkgs loads all forwarding packages for a particular channel.
430
func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx,
431
        source lnwire.ShortChannelID) ([]*FwdPkg, error) {
129✔
432

129✔
433
        return loadChannelFwdPkgs(tx, source)
129✔
434
}
129✔
435

436
// FwdPackager supports all operations required to modify fwd packages, such as
437
// creation, updates, reading, and removal. The interfaces are broken down in
438
// this way to support future delegation of the subinterfaces.
439
type FwdPackager interface {
440
        // AddFwdPkg serializes and writes a FwdPkg for this channel at the
441
        // remote commitment height included in the forwarding package.
442
        AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error
443

444
        // SetFwdFilter looks up the forwarding package at the remote `height`
445
        // and sets the `fwdFilter`, marking the Adds for which:
446
        // 1) We are not the exit node
447
        // 2) Passed all validation
448
        // 3) Should be forwarded to the switch immediately after a failure
449
        SetFwdFilter(tx kvdb.RwTx, height uint64, fwdFilter *PkgFilter) error
450

451
        // AckAddHtlcs atomically updates the add filters in this channel's
452
        // forwarding packages to mark the resolution of an Add that was
453
        // received from the remote party.
454
        AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error
455

456
        // SettleFailAcker allows a link to acknowledge settle/fail HTLCs
457
        // belonging to other channels.
458
        SettleFailAcker
459

460
        // LoadFwdPkgs loads all known forwarding packages owned by this
461
        // channel.
462
        LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error)
463

464
        // RemovePkg deletes a forwarding package owned by this channel at
465
        // the provided remote `height`.
466
        RemovePkg(tx kvdb.RwTx, height uint64) error
467

468
        // Wipe deletes all the forwarding packages owned by this channel.
469
        Wipe(tx kvdb.RwTx) error
470
}
471

472
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
473
// packages. The packager is tied to a particular source channel ID, allowing it
474
// to create and edit its own packages. Each packager also has the ability to
475
// remove fail/settle htlcs that correspond to an add contained in one of
476
// source's packages.
477
type ChannelPackager struct {
478
        source lnwire.ShortChannelID
479
}
480

481
// NewChannelPackager creates a new packager for a single channel.
482
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
15,922✔
483
        return &ChannelPackager{
15,922✔
484
                source: source,
15,922✔
485
        }
15,922✔
486
}
15,922✔
487

488
// AddFwdPkg writes a newly locked in forwarding package to disk.
489
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error { // nolint: dupl
2,077✔
490
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
2,077✔
491
        if err != nil {
2,077✔
492
                return err
×
493
        }
×
494

495
        source := makeLogKey(fwdPkg.Source.ToUint64())
2,077✔
496
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
2,077✔
497
        if err != nil {
2,077✔
498
                return err
×
499
        }
×
500

501
        heightKey := makeLogKey(fwdPkg.Height)
2,077✔
502
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
2,077✔
503
        if err != nil {
2,077✔
504
                return err
×
505
        }
×
506

507
        // Write ADD updates we received at this commit height.
508
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
2,077✔
509
        if err != nil {
2,077✔
510
                return err
×
511
        }
×
512

513
        // Write SETTLE/FAIL updates we received at this commit height.
514
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
2,077✔
515
        if err != nil {
2,077✔
516
                return err
×
517
        }
×
518

519
        for i := range fwdPkg.Adds {
2,922✔
520
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
845✔
521
                if err != nil {
845✔
522
                        return err
×
523
                }
×
524
        }
525

526
        // Persist the initialized pkg filter, which will be used to determine
527
        // when we can remove this forwarding package from disk.
528
        var ackFilterBuf bytes.Buffer
2,077✔
529
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
2,077✔
530
                return err
×
531
        }
×
532

533
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
2,077✔
534
                return err
×
535
        }
×
536

537
        for i := range fwdPkg.SettleFails {
2,526✔
538
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
449✔
539
                if err != nil {
449✔
540
                        return err
×
541
                }
×
542
        }
543

544
        var settleFailFilterBuf bytes.Buffer
2,077✔
545
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
2,077✔
546
        if err != nil {
2,077✔
547
                return err
×
548
        }
×
549

550
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
2,077✔
551
}
552

553
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
554
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
1,294✔
555
        var b bytes.Buffer
1,294✔
556
        if err := serializeLogUpdate(&b, htlc); err != nil {
1,294✔
557
                return err
×
558
        }
×
559

560
        return bkt.Put(uint16Key(idx), b.Bytes())
1,294✔
561
}
562

563
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
564
// processed, and returns their deserialized log updates in a map indexed by the
565
// remote commitment height at which the updates were locked in.
566
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error) {
467✔
567
        return loadChannelFwdPkgs(tx, p.source)
467✔
568
}
467✔
569

570
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
571
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
596✔
572
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
596✔
573
        if fwdPkgBkt == nil {
602✔
574
                return nil, nil
6✔
575
        }
6✔
576

577
        sourceKey := makeLogKey(source.ToUint64())
590✔
578
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
590✔
579
        if sourceBkt == nil {
1,131✔
580
                return nil, nil
541✔
581
        }
541✔
582

583
        var heights []uint64
49✔
584
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
100✔
585
                if len(k) != 8 {
51✔
586
                        return ErrCorruptedFwdPkg
×
587
                }
×
588

589
                heights = append(heights, byteOrder.Uint64(k))
51✔
590

51✔
591
                return nil
51✔
592
        }); err != nil {
×
593
                return nil, err
×
594
        }
×
595

596
        // Load the forwarding package for each retrieved height.
597
        fwdPkgs := make([]*FwdPkg, 0, len(heights))
49✔
598
        for _, height := range heights {
100✔
599
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
51✔
600
                if err != nil {
51✔
601
                        return nil, err
×
602
                }
×
603

604
                fwdPkgs = append(fwdPkgs, fwdPkg)
51✔
605
        }
606

607
        return fwdPkgs, nil
49✔
608
}
609

610
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
611
// appropriate FwdState.
612
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
613
        height uint64) (*FwdPkg, error) {
51✔
614

51✔
615
        sourceKey := makeLogKey(source.ToUint64())
51✔
616
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
51✔
617
        if sourceBkt == nil {
51✔
618
                return nil, ErrCorruptedFwdPkg
×
619
        }
×
620

621
        heightKey := makeLogKey(height)
51✔
622
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
51✔
623
        if heightBkt == nil {
51✔
624
                return nil, ErrCorruptedFwdPkg
×
625
        }
×
626

627
        // Load ADDs from disk.
628
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
51✔
629
        if addBkt == nil {
51✔
630
                return nil, ErrCorruptedFwdPkg
×
631
        }
×
632

633
        adds, err := loadHtlcs(addBkt)
51✔
634
        if err != nil {
51✔
635
                return nil, err
×
636
        }
×
637

638
        // Load ack filter from disk.
639
        ackFilterBytes := heightBkt.Get(ackFilterKey)
51✔
640
        if ackFilterBytes == nil {
51✔
641
                return nil, ErrCorruptedFwdPkg
×
642
        }
×
643
        ackFilterReader := bytes.NewReader(ackFilterBytes)
51✔
644

51✔
645
        ackFilter := &PkgFilter{}
51✔
646
        if err := ackFilter.Decode(ackFilterReader); err != nil {
51✔
647
                return nil, err
×
648
        }
×
649

650
        // Load SETTLE/FAILs from disk.
651
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
51✔
652
        if failSettleBkt == nil {
51✔
653
                return nil, ErrCorruptedFwdPkg
×
654
        }
×
655

656
        failSettles, err := loadHtlcs(failSettleBkt)
51✔
657
        if err != nil {
51✔
658
                return nil, err
×
659
        }
×
660

661
        // Load settle fail filter from disk.
662
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
51✔
663
        if settleFailFilterBytes == nil {
51✔
664
                return nil, ErrCorruptedFwdPkg
×
665
        }
×
666
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
51✔
667

51✔
668
        settleFailFilter := &PkgFilter{}
51✔
669
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
51✔
670
                return nil, err
×
671
        }
×
672

673
        // Initialize the fwding package, which always starts in the
674
        // FwdStateLockedIn. We can determine what state the package was left in
675
        // by examining constraints on the information loaded from disk.
676
        fwdPkg := &FwdPkg{
51✔
677
                Source:           source,
51✔
678
                State:            FwdStateLockedIn,
51✔
679
                Height:           height,
51✔
680
                Adds:             adds,
51✔
681
                AckFilter:        ackFilter,
51✔
682
                SettleFails:      failSettles,
51✔
683
                SettleFailFilter: settleFailFilter,
51✔
684
        }
51✔
685

51✔
686
        // Check if the forward filter has been persisted to disk.
51✔
687
        // This indicates whether the Adds in this package have been processed.
51✔
688
        //
51✔
689
        // NOTE: We also expect packages with no Adds (settle/fail only packages
51✔
690
        // or empty packages) to have the fwd filter set to signal that the
51✔
691
        // packages have been processed.
51✔
692
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
51✔
693

51✔
694
        // Handle packages with Adds that haven't been processed yet.
51✔
695
        if fwdFilterBytes == nil {
65✔
696
                // Create a new forward filter for the unprocessed Adds.
14✔
697
                nAdds := uint16(len(adds))
14✔
698
                fwdPkg.FwdFilter = NewPkgFilter(nAdds)
14✔
699

14✔
700
                return fwdPkg, nil
14✔
701
        }
14✔
702

703
        // Load the existing forward filter from disk.
704
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
37✔
705
        fwdPkg.FwdFilter = &PkgFilter{}
37✔
706
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
37✔
707
                return nil, err
×
708
        }
×
709

710
        // Mark the package as processed since the forward filter exists.
711
        fwdPkg.State = FwdStateProcessed
37✔
712

37✔
713
        // If every add, settle, and fail has been fully acknowledged, we can
37✔
714
        // safely set the package's state to FwdStateCompleted, signalling that
37✔
715
        // it can be garbage collected.
37✔
716
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
52✔
717
                fwdPkg.State = FwdStateCompleted
15✔
718
        }
15✔
719

720
        return fwdPkg, nil
37✔
721
}
722

723
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
724
// them in order of the indexes they were written under.
725
func loadHtlcs(bkt kvdb.RBucket) ([]LogUpdate, error) {
102✔
726
        var htlcs []LogUpdate
102✔
727
        if err := bkt.ForEach(func(_, v []byte) error {
186✔
728
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
84✔
729
                if err != nil {
84✔
730
                        return err
×
731
                }
×
732

733
                htlcs = append(htlcs, *htlc)
84✔
734

84✔
735
                return nil
84✔
736
        }); err != nil {
×
737
                return nil, err
×
738
        }
×
739

740
        return htlcs, nil
102✔
741
}
742

743
// SetFwdFilter writes the set of indexes corresponding to Adds at the
744
// `height` that are to be forwarded to the switch. Calling this method causes
745
// the forwarding package at `height` to be in FwdStateProcessed. We write this
746
// forwarding decision so that we always arrive at the same behavior for HTLCs
747
// leaving this channel. After a restart, we skip validation of these Adds,
748
// since they are assumed to have already been validated, and make the switch or
749
// outgoing link responsible for handling replays.
750
func (p *ChannelPackager) SetFwdFilter(tx kvdb.RwTx, height uint64,
751
        fwdFilter *PkgFilter) error {
1,203✔
752

1,203✔
753
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
1,203✔
754
        if fwdPkgBkt == nil {
1,203✔
755
                return ErrCorruptedFwdPkg
×
756
        }
×
757

758
        source := makeLogKey(p.source.ToUint64())
1,203✔
759
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(source[:])
1,203✔
760
        if sourceBkt == nil {
1,203✔
761
                return ErrCorruptedFwdPkg
×
762
        }
×
763

764
        heightKey := makeLogKey(height)
1,203✔
765
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
1,203✔
766
        if heightBkt == nil {
1,203✔
767
                return ErrCorruptedFwdPkg
×
768
        }
×
769

770
        // If the fwd filter has already been written, we return early to avoid
771
        // modifying the persistent state.
772
        forwardedAddsBytes := heightBkt.Get(fwdFilterKey)
1,203✔
773
        if forwardedAddsBytes != nil {
1,203✔
774
                return nil
×
775
        }
×
776

777
        // Otherwise we serialize and write the provided fwd filter.
778
        var b bytes.Buffer
1,203✔
779
        if err := fwdFilter.Encode(&b); err != nil {
1,203✔
780
                return err
×
781
        }
×
782

783
        return heightBkt.Put(fwdFilterKey, b.Bytes())
1,203✔
784
}
785

786
// AckAddHtlcs accepts a list of references to add htlcs, and updates the
787
// AckAddFilter of those forwarding packages to indicate that a settle or fail
788
// has been received in response to the add.
789
func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error {
2,165✔
790
        if len(addRefs) == 0 {
3,990✔
791
                return nil
1,825✔
792
        }
1,825✔
793

794
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
340✔
795
        if fwdPkgBkt == nil {
340✔
796
                return ErrCorruptedFwdPkg
×
797
        }
×
798

799
        sourceKey := makeLogKey(p.source.ToUint64())
340✔
800
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceKey[:])
340✔
801
        if sourceBkt == nil {
340✔
802
                return ErrCorruptedFwdPkg
×
803
        }
×
804

805
        // Organize the forward references such that we just get a single slice
806
        // of indexes for each unique height.
807
        heightDiffs := make(map[uint64][]uint16)
340✔
808
        for _, addRef := range addRefs {
688✔
809
                heightDiffs[addRef.Height] = append(
348✔
810
                        heightDiffs[addRef.Height],
348✔
811
                        addRef.Index,
348✔
812
                )
348✔
813
        }
348✔
814

815
        // Load each height bucket once and remove all acked htlcs at that
816
        // height.
817
        for height, indexes := range heightDiffs {
680✔
818
                err := ackAddHtlcsAtHeight(sourceBkt, height, indexes)
340✔
819
                if err != nil {
340✔
820
                        return err
×
821
                }
×
822
        }
823

824
        return nil
340✔
825
}
826

827
// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package
828
// with a list of indexes, writing the resulting filter back in its place.
829
func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64,
830
        indexes []uint16) error {
340✔
831

340✔
832
        heightKey := makeLogKey(height)
340✔
833
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
340✔
834
        if heightBkt == nil {
340✔
835
                // If the height bucket isn't found, this could be because the
×
836
                // forwarding package was already removed. We'll return nil to
×
837
                // signal that the operation is successful, as there is nothing
×
838
                // to ack.
×
839
                return nil
×
840
        }
×
841

842
        // Load ack filter from disk.
843
        ackFilterBytes := heightBkt.Get(ackFilterKey)
340✔
844
        if ackFilterBytes == nil {
340✔
845
                return ErrCorruptedFwdPkg
×
846
        }
×
847

848
        ackFilter := &PkgFilter{}
340✔
849
        ackFilterReader := bytes.NewReader(ackFilterBytes)
340✔
850
        if err := ackFilter.Decode(ackFilterReader); err != nil {
340✔
851
                return err
×
852
        }
×
853

854
        // Update the ack filter for this height.
855
        for _, index := range indexes {
688✔
856
                ackFilter.Set(index)
348✔
857
        }
348✔
858

859
        // Write the resulting filter to disk.
860
        var ackFilterBuf bytes.Buffer
340✔
861
        if err := ackFilter.Encode(&ackFilterBuf); err != nil {
340✔
862
                return err
×
863
        }
×
864

865
        return heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes())
340✔
866
}
867

868
// AckSettleFails persistently acknowledges settles or fails from a remote forwarding
869
// package. This should only be called after the source of the Add has locked in
870
// the settle/fail, or it becomes otherwise safe to forgo retransmitting the
871
// settle/fail after a restart.
872
func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error {
2,164✔
873
        return ackSettleFails(tx, settleFailRefs)
2,164✔
874
}
2,164✔
875

876
// ackSettleFails persistently acknowledges a batch of settle fail references.
877
func ackSettleFails(tx kvdb.RwTx, settleFailRefs []SettleFailRef) error {
2,282✔
878
        if len(settleFailRefs) == 0 {
4,437✔
879
                return nil
2,155✔
880
        }
2,155✔
881

882
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
127✔
883
        if fwdPkgBkt == nil {
127✔
884
                return ErrCorruptedFwdPkg
×
885
        }
×
886

887
        // Organize the forward references such that we just get a single slice
888
        // of indexes for each unique destination-height pair.
889
        destHeightDiffs := make(map[lnwire.ShortChannelID]map[uint64][]uint16)
127✔
890
        for _, settleFailRef := range settleFailRefs {
254✔
891
                destHeights, ok := destHeightDiffs[settleFailRef.Source]
127✔
892
                if !ok {
254✔
893
                        destHeights = make(map[uint64][]uint16)
127✔
894
                        destHeightDiffs[settleFailRef.Source] = destHeights
127✔
895
                }
127✔
896

897
                destHeights[settleFailRef.Height] = append(
127✔
898
                        destHeights[settleFailRef.Height],
127✔
899
                        settleFailRef.Index,
127✔
900
                )
127✔
901
        }
902

903
        // With the references organized by destination and height, we now load
904
        // each remote bucket, and update the settle fail filter for any
905
        // settle/fail htlcs.
906
        for dest, destHeights := range destHeightDiffs {
254✔
907
                destKey := makeLogKey(dest.ToUint64())
127✔
908
                destBkt := fwdPkgBkt.NestedReadWriteBucket(destKey[:])
127✔
909
                if destBkt == nil {
130✔
910
                        // If the destination bucket is not found, this is
3✔
911
                        // likely the result of the destination channel being
3✔
912
                        // closed and having it's forwarding packages wiped. We
3✔
913
                        // won't treat this as an error, because the response
3✔
914
                        // will no longer be retransmitted internally.
3✔
915
                        continue
3✔
916
                }
917

918
                for height, indexes := range destHeights {
248✔
919
                        err := ackSettleFailsAtHeight(destBkt, height, indexes)
124✔
920
                        if err != nil {
124✔
921
                                return err
×
922
                        }
×
923
                }
924
        }
925

926
        return nil
127✔
927
}
928

929
// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes
930
// at particular a height by updating the settle fail filter.
931
func ackSettleFailsAtHeight(destBkt kvdb.RwBucket, height uint64,
932
        indexes []uint16) error {
124✔
933

124✔
934
        heightKey := makeLogKey(height)
124✔
935
        heightBkt := destBkt.NestedReadWriteBucket(heightKey[:])
124✔
936
        if heightBkt == nil {
124✔
937
                // If the height bucket isn't found, this could be because the
×
938
                // forwarding package was already removed. We'll return nil to
×
939
                // signal that the operation is as there is nothing to ack.
×
940
                return nil
×
941
        }
×
942

943
        // Load ack filter from disk.
944
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
124✔
945
        if settleFailFilterBytes == nil {
124✔
946
                return ErrCorruptedFwdPkg
×
947
        }
×
948

949
        settleFailFilter := &PkgFilter{}
124✔
950
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
124✔
951
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
124✔
952
                return err
×
953
        }
×
954

955
        // Update the ack filter for this height.
956
        for _, index := range indexes {
248✔
957
                settleFailFilter.Set(index)
124✔
958
        }
124✔
959

960
        // Write the resulting filter to disk.
961
        var settleFailFilterBuf bytes.Buffer
124✔
962
        if err := settleFailFilter.Encode(&settleFailFilterBuf); err != nil {
124✔
963
                return err
×
964
        }
×
965

966
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
124✔
967
}
968

969
// RemovePkg deletes the forwarding package at the given height from the
970
// packager's source bucket.
971
func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
6✔
972
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
6✔
973
        if fwdPkgBkt == nil {
6✔
974
                return nil
×
975
        }
×
976

977
        sourceBytes := makeLogKey(p.source.ToUint64())
6✔
978
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:])
6✔
979
        if sourceBkt == nil {
6✔
980
                return ErrCorruptedFwdPkg
×
981
        }
×
982

983
        heightKey := makeLogKey(height)
6✔
984

6✔
985
        return sourceBkt.DeleteNestedBucket(heightKey[:])
6✔
986
}
987

988
// Wipe deletes all the channel's forwarding packages, if any.
989
func (p *ChannelPackager) Wipe(tx kvdb.RwTx) error {
117✔
990
        // If the root bucket doesn't exist, there's no need to delete.
117✔
991
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
117✔
992
        if fwdPkgBkt == nil {
118✔
993
                return nil
1✔
994
        }
1✔
995

996
        sourceBytes := makeLogKey(p.source.ToUint64())
116✔
997

116✔
998
        // If the nested bucket doesn't exist, there's no need to delete.
116✔
999
        if fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:]) == nil {
226✔
1000
                return nil
110✔
1001
        }
110✔
1002

1003
        return fwdPkgBkt.DeleteNestedBucket(sourceBytes[:])
6✔
1004
}
1005

1006
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
1007
func uint16Key(i uint16) []byte {
1,294✔
1008
        key := make([]byte, 2)
1,294✔
1009
        byteOrder.PutUint16(key, i)
1,294✔
1010
        return key
1,294✔
1011
}
1,294✔
1012

1013
// Compile-time constraint to ensure that ChannelPackager implements the public
1014
// FwdPackager interface.
1015
var _ FwdPackager = (*ChannelPackager)(nil)
1016

1017
// Compile-time constraint to ensure that SwitchPackager implements the public
1018
// FwdOperator interface.
1019
var _ FwdOperator = (*SwitchPackager)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc