• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.15
/channeldb/forwarding_package.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9

10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
15
// package has potentially been mangled.
16
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
17

18
// FwdState is an enum used to describe the lifecycle of a FwdPkg.
19
type FwdState byte
20

21
const (
22
        // FwdStateLockedIn is the starting state for all forwarding packages.
23
        // Packages in this state have not yet committed to the exact set of
24
        // Adds to forward to the switch.
25
        FwdStateLockedIn FwdState = iota
26

27
        // FwdStateProcessed marks the state in which all Adds have been
28
        // locally processed and the forwarding decision to the switch has been
29
        // persisted.
30
        FwdStateProcessed
31

32
        // FwdStateCompleted signals that all Adds have been acked, and that all
33
        // settles and fails have been delivered to their sources. Packages in
34
        // this state can be removed permanently.
35
        FwdStateCompleted
36
)
37

38
var (
39
        // fwdPackagesKey is the root-level bucket that all forwarding packages
40
        // are written. This bucket is further subdivided based on the short
41
        // channel ID of each channel.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // fwdPackagesKey(root-bucket)
46
        //             |
47
        //             |-- <shortChannelID>
48
        //             |       |
49
        //             |       |-- <height>
50
        //             |       |       |-- ackFilterKey: <encoded bytes of PkgFilter>
51
        //             |       |       |-- settleFailFilterKey: <encoded bytes of PkgFilter>
52
        //             |       |       |-- fwdFilterKey: <encoded bytes of PkgFilter>
53
        //             |       |       |
54
        //             |       |       |-- addBucketKey
55
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
56
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
57
        //             |       |       |        ...
58
        //             |       |       |
59
        //             |       |       |-- failSettleBucketKey
60
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
61
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
62
        //             |       |                ...
63
        //             |       |
64
        //             |       |-- <height>
65
        //             |       |       |
66
        //             |       ...     ...
67
        //             |
68
        //             |
69
        //             |-- <shortChannelID>
70
        //             |       |
71
        //        |       ...
72
        //         ...
73
        //
74
        fwdPackagesKey = []byte("fwd-packages")
75

76
        // addBucketKey is the bucket to which all Add log updates are written.
77
        addBucketKey = []byte("add-updates")
78

79
        // failSettleBucketKey is the bucket to which all Settle/Fail log
80
        // updates are written.
81
        failSettleBucketKey = []byte("fail-settle-updates")
82

83
        // fwdFilterKey is a key used to write the set of Adds that passed
84
        // validation and are to be forwarded to the switch.
85
        // NOTE: The presence of this key within a forwarding package indicates
86
        // that the package has reached FwdStateProcessed.
87
        fwdFilterKey = []byte("fwd-filter-key")
88

89
        // ackFilterKey is a key used to access the PkgFilter indicating which
90
        // Adds have received a Settle/Fail. This response may come from a
91
        // number of sources, including: exitHop settle/fails, switch failures,
92
        // chain arbiter interjections, as well as settle/fails from the
93
        // next hop in the route.
94
        ackFilterKey = []byte("ack-filter-key")
95

96
        // settleFailFilterKey is a key used to access the PkgFilter indicating
97
        // which Settles/Fails in have been received and processed by the link
98
        // that originally received the Add.
99
        settleFailFilterKey = []byte("settle-fail-filter-key")
100
)
101

102
// PkgFilter is used to compactly represent a particular subset of the Adds in a
103
// forwarding package. Each filter is represented as a simple, statically-sized
104
// bitvector, where the elements are intended to be the indices of the Adds as
105
// they are written in the FwdPkg.
106
type PkgFilter struct {
107
        count  uint16
108
        filter []byte
109
}
110

111
// NewPkgFilter initializes an empty PkgFilter supporting `count` elements.
112
func NewPkgFilter(count uint16) *PkgFilter {
11,118✔
113
        // We add 7 to ensure that the integer division yields properly rounded
11,118✔
114
        // values.
11,118✔
115
        filterLen := (count + 7) / 8
11,118✔
116

11,118✔
117
        return &PkgFilter{
11,118✔
118
                count:  count,
11,118✔
119
                filter: make([]byte, filterLen),
11,118✔
120
        }
11,118✔
121
}
11,118✔
122

123
// Count returns the number of elements represented by this PkgFilter.
124
func (f *PkgFilter) Count() uint16 {
1,004✔
125
        return f.count
1,004✔
126
}
1,004✔
127

128
// Set marks the `i`-th element as included by this filter.
129
// NOTE: It is assumed that i is always less than count.
130
func (f *PkgFilter) Set(i uint16) {
500,892✔
131
        byt := i / 8
500,892✔
132
        bit := i % 8
500,892✔
133

500,892✔
134
        // Set the i-th bit in the filter.
500,892✔
135
        // TODO(conner): ignore if > count to prevent panic?
500,892✔
136
        f.filter[byt] |= byte(1 << (7 - bit))
500,892✔
137
}
500,892✔
138

139
// Contains queries the filter for membership of index `i`.
140
// NOTE: It is assumed that i is always less than count.
141
func (f *PkgFilter) Contains(i uint16) bool {
1,014,401✔
142
        byt := i / 8
1,014,401✔
143
        bit := i % 8
1,014,401✔
144

1,014,401✔
145
        // Read the i-th bit in the filter.
1,014,401✔
146
        // TODO(conner): ignore if > count to prevent panic?
1,014,401✔
147
        return f.filter[byt]&(1<<(7-bit)) != 0
1,014,401✔
148
}
1,014,401✔
149

150
// Equal checks two PkgFilters for equality.
151
func (f *PkgFilter) Equal(f2 *PkgFilter) bool {
501,534✔
152
        if f == f2 {
501,534✔
153
                return true
×
154
        }
×
155
        if f.count != f2.count {
501,534✔
156
                return false
×
157
        }
×
158

159
        return bytes.Equal(f.filter, f2.filter)
501,534✔
160
}
161

162
// IsFull returns true if every element in the filter has been Set, and false
163
// otherwise.
164
func (f *PkgFilter) IsFull() bool {
501,562✔
165
        // Batch validate bytes that are fully used.
501,562✔
166
        for i := uint16(0); i < f.count/8; i++ {
21,611,700✔
167
                if f.filter[i] != 0xFF {
21,605,209✔
168
                        return false
495,071✔
169
                }
495,071✔
170
        }
171

172
        // If the count is not a multiple of 8, check that the filter contains
173
        // all remaining bits.
174
        rem := f.count % 8
6,491✔
175
        for idx := f.count - rem; idx < f.count; idx++ {
21,099✔
176
                if !f.Contains(idx) {
18,212✔
177
                        return false
3,604✔
178
                }
3,604✔
179
        }
180

181
        return true
2,887✔
182
}
183

184
// Size returns number of bytes produced when the PkgFilter is serialized.
185
func (f *PkgFilter) Size() uint16 {
1,007,525✔
186
        // 2 bytes for uint16 `count`, then round up number of bytes required to
1,007,525✔
187
        // represent `count` bits.
1,007,525✔
188
        return 2 + (f.count+7)/8
1,007,525✔
189
}
1,007,525✔
190

191
// Encode writes the filter to the provided io.Writer.
192
func (f *PkgFilter) Encode(w io.Writer) error {
512,179✔
193
        if err := binary.Write(w, binary.BigEndian, f.count); err != nil {
512,179✔
194
                return err
×
195
        }
×
196

197
        _, err := w.Write(f.filter)
512,179✔
198

512,179✔
199
        return err
512,179✔
200
}
201

202
// Decode reads the filter from the provided io.Reader.
203
func (f *PkgFilter) Decode(r io.Reader) error {
505,991✔
204
        if err := binary.Read(r, binary.BigEndian, &f.count); err != nil {
505,991✔
205
                return err
×
206
        }
×
207

208
        f.filter = make([]byte, f.Size()-2)
505,991✔
209
        _, err := io.ReadFull(r, f.filter)
505,991✔
210

505,991✔
211
        return err
505,991✔
212
}
213

214
// String returns a human-readable string.
UNCOV
215
func (f *PkgFilter) String() string {
×
UNCOV
216
        return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter)
×
UNCOV
217
}
×
218

219
// FwdPkg records all adds, settles, and fails that were locked in as a result
220
// of the remote peer sending us a revocation. Each package is identified by
221
// the short chanid and remote commitment height corresponding to the revocation
222
// that locked in the HTLCs. For everything except a locally initiated payment,
223
// settles and fails in a forwarding package must have a corresponding Add in
224
// another package, and can be removed individually once the source link has
225
// received the fail/settle.
226
//
227
// Adds cannot be removed, as we need to present the same batch of Adds to
228
// properly handle replay protection. Instead, we use a PkgFilter to mark that
229
// we have finished processing a particular Add. A FwdPkg should only be deleted
230
// after the AckFilter is full and all settles and fails have been persistently
231
// removed.
232
type FwdPkg struct {
233
        // Source identifies the channel that wrote this forwarding package.
234
        Source lnwire.ShortChannelID
235

236
        // Height is the height of the remote commitment chain that locked in
237
        // this forwarding package.
238
        Height uint64
239

240
        // State signals the persistent condition of the package and directs how
241
        // to reprocess the package in the event of failures.
242
        State FwdState
243

244
        // Adds contains all add messages which need to be processed and
245
        // forwarded to the switch. Adds does not change over the life of a
246
        // forwarding package.
247
        Adds []LogUpdate
248

249
        // FwdFilter is a filter containing the indices of all Adds that were
250
        // forwarded to the switch.
251
        FwdFilter *PkgFilter
252

253
        // AckFilter is a filter containing the indices of all Adds for which
254
        // the source has received a settle or fail and is reflected in the next
255
        // commitment txn. A package should not be removed until IsFull()
256
        // returns true.
257
        AckFilter *PkgFilter
258

259
        // SettleFails contains all settle and fail messages that should be
260
        // forwarded to the switch.
261
        SettleFails []LogUpdate
262

263
        // SettleFailFilter is a filter containing the indices of all Settle or
264
        // Fails originating in this package that have been received and locked
265
        // into the incoming link's commitment state.
266
        SettleFailFilter *PkgFilter
267
}
268

269
// NewFwdPkg initializes a new forwarding package in FwdStateLockedIn. This
270
// should be used to create a package at the time we receive a revocation.
271
func NewFwdPkg(source lnwire.ShortChannelID, height uint64,
272
        addUpdates, settleFailUpdates []LogUpdate) *FwdPkg {
3,368✔
273

3,368✔
274
        nAddUpdates := uint16(len(addUpdates))
3,368✔
275
        nSettleFailUpdates := uint16(len(settleFailUpdates))
3,368✔
276

3,368✔
277
        return &FwdPkg{
3,368✔
278
                Source:           source,
3,368✔
279
                Height:           height,
3,368✔
280
                State:            FwdStateLockedIn,
3,368✔
281
                Adds:             addUpdates,
3,368✔
282
                FwdFilter:        NewPkgFilter(nAddUpdates),
3,368✔
283
                AckFilter:        NewPkgFilter(nAddUpdates),
3,368✔
284
                SettleFails:      settleFailUpdates,
3,368✔
285
                SettleFailFilter: NewPkgFilter(nSettleFailUpdates),
3,368✔
286
        }
3,368✔
287
}
3,368✔
288

289
// SourceRef is a convenience method that returns an AddRef to this forwarding
290
// package for the index in the argument. It is the caller's responsibility
291
// to ensure that the index is in bounds.
292
func (f *FwdPkg) SourceRef(i uint16) AddRef {
1,490✔
293
        return AddRef{
1,490✔
294
                Height: f.Height,
1,490✔
295
                Index:  i,
1,490✔
296
        }
1,490✔
297
}
1,490✔
298

299
// DestRef is a convenience method that returns a SettleFailRef to this
300
// forwarding package for the index in the argument. It is the caller's
301
// responsibility to ensure that the index is in bounds.
302
func (f *FwdPkg) DestRef(i uint16) SettleFailRef {
748✔
303
        return SettleFailRef{
748✔
304
                Source: f.Source,
748✔
305
                Height: f.Height,
748✔
306
                Index:  i,
748✔
307
        }
748✔
308
}
748✔
309

310
// ID returns an unique identifier for this package, used to ensure that sphinx
311
// replay processing of this batch is idempotent.
312
func (f *FwdPkg) ID() []byte {
2,580✔
313
        var id = make([]byte, 16)
2,580✔
314
        byteOrder.PutUint64(id[:8], f.Source.ToUint64())
2,580✔
315
        byteOrder.PutUint64(id[8:], f.Height)
2,580✔
316
        return id
2,580✔
317
}
2,580✔
318

319
// String returns a human-readable description of the forwarding package.
320
func (f *FwdPkg) String() string {
×
321
        return fmt.Sprintf("%T(src=%v, height=%v, nadds=%v, nfailsettles=%v)",
×
322
                f, f.Source, f.Height, len(f.Adds), len(f.SettleFails))
×
323
}
×
324

325
// AddRef is used to identify a particular Add in a FwdPkg. The short channel ID
326
// is assumed to be that of the packager.
327
type AddRef struct {
328
        // Height is the remote commitment height that locked in the Add.
329
        Height uint64
330

331
        // Index is the index of the Add within the fwd pkg's Adds.
332
        //
333
        // NOTE: This index is static over the lifetime of a forwarding package.
334
        Index uint16
335
}
336

337
// Encode serializes the AddRef to the given io.Writer.
338
func (a *AddRef) Encode(w io.Writer) error {
1,624✔
339
        if err := binary.Write(w, binary.BigEndian, a.Height); err != nil {
1,624✔
340
                return err
×
341
        }
×
342

343
        return binary.Write(w, binary.BigEndian, a.Index)
1,624✔
344
}
345

346
// Decode deserializes the AddRef from the given io.Reader.
347
func (a *AddRef) Decode(r io.Reader) error {
102✔
348
        if err := binary.Read(r, binary.BigEndian, &a.Height); err != nil {
102✔
349
                return err
×
350
        }
×
351

352
        return binary.Read(r, binary.BigEndian, &a.Index)
102✔
353
}
354

355
// SettleFailRef is used to locate a Settle/Fail in another channel's FwdPkg. A
356
// channel does not remove its own Settle/Fail htlcs, so the source is provided
357
// to locate a db bucket belonging to another channel.
358
type SettleFailRef struct {
359
        // Source identifies the outgoing link that locked in the settle or
360
        // fail. This is then used by the *incoming* link to find the settle
361
        // fail in another link's forwarding packages.
362
        Source lnwire.ShortChannelID
363

364
        // Height is the remote commitment height that locked in this
365
        // Settle/Fail.
366
        Height uint64
367

368
        // Index is the index of the Add with the fwd pkg's SettleFails.
369
        //
370
        // NOTE: This index is static over the lifetime of a forwarding package.
371
        Index uint16
372
}
373

374
// SettleFailAcker is a generic interface providing the ability to acknowledge
375
// settle/fail HTLCs stored in forwarding packages.
376
type SettleFailAcker interface {
377
        // AckSettleFails atomically updates the settle-fail filters in *other*
378
        // channels' forwarding packages.
379
        AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error
380
}
381

382
// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages
383
// of any active channel.
384
type GlobalFwdPkgReader interface {
385
        // LoadChannelFwdPkgs loads all known forwarding packages for the given
386
        // channel.
387
        LoadChannelFwdPkgs(tx kvdb.RTx,
388
                source lnwire.ShortChannelID) ([]*FwdPkg, error)
389
}
390

391
// FwdOperator defines the interfaces for managing forwarding packages that are
392
// external to a particular channel. This interface is used by the switch to
393
// read forwarding packages from arbitrary channels, and acknowledge settles and
394
// fails for locally-sourced payments.
395
type FwdOperator interface {
396
        // GlobalFwdPkgReader provides read access to all known forwarding
397
        // packages
398
        GlobalFwdPkgReader
399

400
        // SettleFailAcker grants the ability to acknowledge settles or fails
401
        // residing in arbitrary forwarding packages.
402
        SettleFailAcker
403
}
404

405
// SwitchPackager is a concrete implementation of the FwdOperator interface.
406
// A SwitchPackager offers the ability to read any forwarding package, and ack
407
// arbitrary settle and fail HTLCs.
408
type SwitchPackager struct{}
409

410
// NewSwitchPackager instantiates a new SwitchPackager.
411
func NewSwitchPackager() *SwitchPackager {
341✔
412
        return &SwitchPackager{}
341✔
413
}
341✔
414

415
// AckSettleFails atomically updates the settle-fail filters in *other*
416
// channels' forwarding packages, to mark that the switch has received a settle
417
// or fail residing in the forwarding package of a link.
418
func (*SwitchPackager) AckSettleFails(tx kvdb.RwTx,
419
        settleFailRefs ...SettleFailRef) error {
124✔
420

124✔
421
        return ackSettleFails(tx, settleFailRefs)
124✔
422
}
124✔
423

424
// LoadChannelFwdPkgs loads all forwarding packages for a particular channel.
425
func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx,
426
        source lnwire.ShortChannelID) ([]*FwdPkg, error) {
129✔
427

129✔
428
        return loadChannelFwdPkgs(tx, source)
129✔
429
}
129✔
430

431
// FwdPackager supports all operations required to modify fwd packages, such as
432
// creation, updates, reading, and removal. The interfaces are broken down in
433
// this way to support future delegation of the subinterfaces.
434
type FwdPackager interface {
435
        // AddFwdPkg serializes and writes a FwdPkg for this channel at the
436
        // remote commitment height included in the forwarding package.
437
        AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error
438

439
        // SetFwdFilter looks up the forwarding package at the remote `height`
440
        // and sets the `fwdFilter`, marking the Adds for which:
441
        // 1) We are not the exit node
442
        // 2) Passed all validation
443
        // 3) Should be forwarded to the switch immediately after a failure
444
        SetFwdFilter(tx kvdb.RwTx, height uint64, fwdFilter *PkgFilter) error
445

446
        // AckAddHtlcs atomically updates the add filters in this channel's
447
        // forwarding packages to mark the resolution of an Add that was
448
        // received from the remote party.
449
        AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error
450

451
        // SettleFailAcker allows a link to acknowledge settle/fail HTLCs
452
        // belonging to other channels.
453
        SettleFailAcker
454

455
        // LoadFwdPkgs loads all known forwarding packages owned by this
456
        // channel.
457
        LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error)
458

459
        // RemovePkg deletes a forwarding package owned by this channel at
460
        // the provided remote `height`.
461
        RemovePkg(tx kvdb.RwTx, height uint64) error
462

463
        // Wipe deletes all the forwarding packages owned by this channel.
464
        Wipe(tx kvdb.RwTx) error
465
}
466

467
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
468
// packages. The packager is tied to a particular source channel ID, allowing it
469
// to create and edit its own packages. Each packager also has the ability to
470
// remove fail/settle htlcs that correspond to an add contained in one of
471
// source's packages.
472
type ChannelPackager struct {
473
        source lnwire.ShortChannelID
474
}
475

476
// NewChannelPackager creates a new packager for a single channel.
477
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
23,639✔
478
        return &ChannelPackager{
23,639✔
479
                source: source,
23,639✔
480
        }
23,639✔
481
}
23,639✔
482

483
// AddFwdPkg writes a newly locked in forwarding package to disk.
484
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error { // nolint: dupl
3,367✔
485
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
3,367✔
486
        if err != nil {
3,367✔
487
                return err
×
488
        }
×
489

490
        source := makeLogKey(fwdPkg.Source.ToUint64())
3,367✔
491
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
3,367✔
492
        if err != nil {
3,367✔
493
                return err
×
494
        }
×
495

496
        heightKey := makeLogKey(fwdPkg.Height)
3,367✔
497
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
3,367✔
498
        if err != nil {
3,367✔
499
                return err
×
500
        }
×
501

502
        // Write ADD updates we received at this commit height.
503
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
3,367✔
504
        if err != nil {
3,367✔
505
                return err
×
506
        }
×
507

508
        // Write SETTLE/FAIL updates we received at this commit height.
509
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
3,367✔
510
        if err != nil {
3,367✔
511
                return err
×
512
        }
×
513

514
        for i := range fwdPkg.Adds {
5,228✔
515
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
1,861✔
516
                if err != nil {
1,861✔
517
                        return err
×
518
                }
×
519
        }
520

521
        // Persist the initialized pkg filter, which will be used to determine
522
        // when we can remove this forwarding package from disk.
523
        var ackFilterBuf bytes.Buffer
3,367✔
524
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
3,367✔
525
                return err
×
526
        }
×
527

528
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
3,367✔
529
                return err
×
530
        }
×
531

532
        for i := range fwdPkg.SettleFails {
4,226✔
533
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
859✔
534
                if err != nil {
859✔
535
                        return err
×
536
                }
×
537
        }
538

539
        var settleFailFilterBuf bytes.Buffer
3,367✔
540
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
3,367✔
541
        if err != nil {
3,367✔
542
                return err
×
543
        }
×
544

545
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
3,367✔
546
}
547

548
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
549
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
2,720✔
550
        var b bytes.Buffer
2,720✔
551
        if err := serializeLogUpdate(&b, htlc); err != nil {
2,720✔
552
                return err
×
553
        }
×
554

555
        return bkt.Put(uint16Key(idx), b.Bytes())
2,720✔
556
}
557

558
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
559
// processed, and returns their deserialized log updates in a map indexed by the
560
// remote commitment height at which the updates were locked in.
561
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error) {
479✔
562
        return loadChannelFwdPkgs(tx, p.source)
479✔
563
}
479✔
564

565
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
566
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
608✔
567
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
608✔
568
        if fwdPkgBkt == nil {
614✔
569
                return nil, nil
6✔
570
        }
6✔
571

572
        sourceKey := makeLogKey(source.ToUint64())
602✔
573
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
602✔
574
        if sourceBkt == nil {
1,144✔
575
                return nil, nil
542✔
576
        }
542✔
577

578
        var heights []uint64
60✔
579
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
1,107✔
580
                if len(k) != 8 {
1,047✔
581
                        return ErrCorruptedFwdPkg
×
582
                }
×
583

584
                heights = append(heights, byteOrder.Uint64(k))
1,047✔
585

1,047✔
586
                return nil
1,047✔
587
        }); err != nil {
×
588
                return nil, err
×
589
        }
×
590

591
        // Load the forwarding package for each retrieved height.
592
        fwdPkgs := make([]*FwdPkg, 0, len(heights))
60✔
593
        for _, height := range heights {
1,107✔
594
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
1,047✔
595
                if err != nil {
1,047✔
596
                        return nil, err
×
597
                }
×
598

599
                fwdPkgs = append(fwdPkgs, fwdPkg)
1,047✔
600
        }
601

602
        return fwdPkgs, nil
60✔
603
}
604

605
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
606
// appropriate FwdState.
607
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
608
        height uint64) (*FwdPkg, error) {
1,047✔
609

1,047✔
610
        sourceKey := makeLogKey(source.ToUint64())
1,047✔
611
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
1,047✔
612
        if sourceBkt == nil {
1,047✔
613
                return nil, ErrCorruptedFwdPkg
×
614
        }
×
615

616
        heightKey := makeLogKey(height)
1,047✔
617
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
1,047✔
618
        if heightBkt == nil {
1,047✔
619
                return nil, ErrCorruptedFwdPkg
×
620
        }
×
621

622
        // Load ADDs from disk.
623
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
1,047✔
624
        if addBkt == nil {
1,047✔
625
                return nil, ErrCorruptedFwdPkg
×
626
        }
×
627

628
        adds, err := loadHtlcs(addBkt)
1,047✔
629
        if err != nil {
1,047✔
630
                return nil, err
×
631
        }
×
632

633
        // Load ack filter from disk.
634
        ackFilterBytes := heightBkt.Get(ackFilterKey)
1,047✔
635
        if ackFilterBytes == nil {
1,047✔
636
                return nil, ErrCorruptedFwdPkg
×
637
        }
×
638
        ackFilterReader := bytes.NewReader(ackFilterBytes)
1,047✔
639

1,047✔
640
        ackFilter := &PkgFilter{}
1,047✔
641
        if err := ackFilter.Decode(ackFilterReader); err != nil {
1,047✔
642
                return nil, err
×
643
        }
×
644

645
        // Load SETTLE/FAILs from disk.
646
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
1,047✔
647
        if failSettleBkt == nil {
1,047✔
648
                return nil, ErrCorruptedFwdPkg
×
649
        }
×
650

651
        failSettles, err := loadHtlcs(failSettleBkt)
1,047✔
652
        if err != nil {
1,047✔
653
                return nil, err
×
654
        }
×
655

656
        // Load settle fail filter from disk.
657
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
1,047✔
658
        if settleFailFilterBytes == nil {
1,047✔
659
                return nil, ErrCorruptedFwdPkg
×
660
        }
×
661
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
1,047✔
662

1,047✔
663
        settleFailFilter := &PkgFilter{}
1,047✔
664
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
1,047✔
665
                return nil, err
×
666
        }
×
667

668
        // Initialize the fwding package, which always starts in the
669
        // FwdStateLockedIn. We can determine what state the package was left in
670
        // by examining constraints on the information loaded from disk.
671
        fwdPkg := &FwdPkg{
1,047✔
672
                Source:           source,
1,047✔
673
                State:            FwdStateLockedIn,
1,047✔
674
                Height:           height,
1,047✔
675
                Adds:             adds,
1,047✔
676
                AckFilter:        ackFilter,
1,047✔
677
                SettleFails:      failSettles,
1,047✔
678
                SettleFailFilter: settleFailFilter,
1,047✔
679
        }
1,047✔
680

1,047✔
681
        // Check to see if we have written the set exported filter adds to
1,047✔
682
        // disk. If we haven't, processing of this package was never started, or
1,047✔
683
        // failed during the last attempt.
1,047✔
684
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
1,047✔
685
        if fwdFilterBytes == nil {
1,060✔
686
                nAdds := uint16(len(adds))
13✔
687
                fwdPkg.FwdFilter = NewPkgFilter(nAdds)
13✔
688
                return fwdPkg, nil
13✔
689
        }
13✔
690

691
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
1,034✔
692
        fwdPkg.FwdFilter = &PkgFilter{}
1,034✔
693
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
1,034✔
694
                return nil, err
×
695
        }
×
696

697
        // Otherwise, a complete round of processing was completed, and we
698
        // advance the package to FwdStateProcessed.
699
        fwdPkg.State = FwdStateProcessed
1,034✔
700

1,034✔
701
        // If every add, settle, and fail has been fully acknowledged, we can
1,034✔
702
        // safely set the package's state to FwdStateCompleted, signalling that
1,034✔
703
        // it can be garbage collected.
1,034✔
704
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
1,931✔
705
                fwdPkg.State = FwdStateCompleted
897✔
706
        }
897✔
707

708
        return fwdPkg, nil
1,034✔
709
}
710

711
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
712
// them in order of the indexes they were written under.
713
func loadHtlcs(bkt kvdb.RBucket) ([]LogUpdate, error) {
2,094✔
714
        var htlcs []LogUpdate
2,094✔
715
        if err := bkt.ForEach(func(_, v []byte) error {
5,348✔
716
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
3,254✔
717
                if err != nil {
3,254✔
718
                        return err
×
719
                }
×
720

721
                htlcs = append(htlcs, *htlc)
3,254✔
722

3,254✔
723
                return nil
3,254✔
724
        }); err != nil {
×
725
                return nil, err
×
726
        }
×
727

728
        return htlcs, nil
2,094✔
729
}
730

731
// SetFwdFilter writes the set of indexes corresponding to Adds at the
732
// `height` that are to be forwarded to the switch. Calling this method causes
733
// the forwarding package at `height` to be in FwdStateProcessed. We write this
734
// forwarding decision so that we always arrive at the same behavior for HTLCs
735
// leaving this channel. After a restart, we skip validation of these Adds,
736
// since they are assumed to have already been validated, and make the switch or
737
// outgoing link responsible for handling replays.
738
func (p *ChannelPackager) SetFwdFilter(tx kvdb.RwTx, height uint64,
739
        fwdFilter *PkgFilter) error {
2,582✔
740

2,582✔
741
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
2,582✔
742
        if fwdPkgBkt == nil {
2,582✔
743
                return ErrCorruptedFwdPkg
×
744
        }
×
745

746
        source := makeLogKey(p.source.ToUint64())
2,582✔
747
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(source[:])
2,582✔
748
        if sourceBkt == nil {
2,582✔
749
                return ErrCorruptedFwdPkg
×
750
        }
×
751

752
        heightKey := makeLogKey(height)
2,582✔
753
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
2,582✔
754
        if heightBkt == nil {
2,582✔
755
                return ErrCorruptedFwdPkg
×
756
        }
×
757

758
        // If the fwd filter has already been written, we return early to avoid
759
        // modifying the persistent state.
760
        forwardedAddsBytes := heightBkt.Get(fwdFilterKey)
2,582✔
761
        if forwardedAddsBytes != nil {
2,582✔
762
                return nil
×
763
        }
×
764

765
        // Otherwise we serialize and write the provided fwd filter.
766
        var b bytes.Buffer
2,582✔
767
        if err := fwdFilter.Encode(&b); err != nil {
2,582✔
768
                return err
×
769
        }
×
770

771
        return heightBkt.Put(fwdFilterKey, b.Bytes())
2,582✔
772
}
773

774
// AckAddHtlcs accepts a list of references to add htlcs, and updates the
775
// AckAddFilter of those forwarding packages to indicate that a settle or fail
776
// has been received in response to the add.
777
func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error {
3,454✔
778
        if len(addRefs) == 0 {
6,136✔
779
                return nil
2,682✔
780
        }
2,682✔
781

782
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
772✔
783
        if fwdPkgBkt == nil {
772✔
784
                return ErrCorruptedFwdPkg
×
785
        }
×
786

787
        sourceKey := makeLogKey(p.source.ToUint64())
772✔
788
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceKey[:])
772✔
789
        if sourceBkt == nil {
772✔
790
                return ErrCorruptedFwdPkg
×
791
        }
×
792

793
        // Organize the forward references such that we just get a single slice
794
        // of indexes for each unique height.
795
        heightDiffs := make(map[uint64][]uint16)
772✔
796
        for _, addRef := range addRefs {
1,553✔
797
                heightDiffs[addRef.Height] = append(
781✔
798
                        heightDiffs[addRef.Height],
781✔
799
                        addRef.Index,
781✔
800
                )
781✔
801
        }
781✔
802

803
        // Load each height bucket once and remove all acked htlcs at that
804
        // height.
805
        for height, indexes := range heightDiffs {
1,544✔
806
                err := ackAddHtlcsAtHeight(sourceBkt, height, indexes)
772✔
807
                if err != nil {
772✔
808
                        return err
×
809
                }
×
810
        }
811

812
        return nil
772✔
813
}
814

815
// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package
816
// with a list of indexes, writing the resulting filter back in its place.
817
func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64,
818
        indexes []uint16) error {
772✔
819

772✔
820
        heightKey := makeLogKey(height)
772✔
821
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
772✔
822
        if heightBkt == nil {
772✔
823
                // If the height bucket isn't found, this could be because the
×
824
                // forwarding package was already removed. We'll return nil to
×
825
                // signal that the operation is successful, as there is nothing
×
826
                // to ack.
×
827
                return nil
×
828
        }
×
829

830
        // Load ack filter from disk.
831
        ackFilterBytes := heightBkt.Get(ackFilterKey)
772✔
832
        if ackFilterBytes == nil {
772✔
833
                return ErrCorruptedFwdPkg
×
834
        }
×
835

836
        ackFilter := &PkgFilter{}
772✔
837
        ackFilterReader := bytes.NewReader(ackFilterBytes)
772✔
838
        if err := ackFilter.Decode(ackFilterReader); err != nil {
772✔
839
                return err
×
840
        }
×
841

842
        // Update the ack filter for this height.
843
        for _, index := range indexes {
1,553✔
844
                ackFilter.Set(index)
781✔
845
        }
781✔
846

847
        // Write the resulting filter to disk.
848
        var ackFilterBuf bytes.Buffer
772✔
849
        if err := ackFilter.Encode(&ackFilterBuf); err != nil {
772✔
850
                return err
×
851
        }
×
852

853
        return heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes())
772✔
854
}
855

856
// AckSettleFails persistently acknowledges settles or fails from a remote forwarding
857
// package. This should only be called after the source of the Add has locked in
858
// the settle/fail, or it becomes otherwise safe to forgo retransmitting the
859
// settle/fail after a restart.
860
func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error {
3,453✔
861
        return ackSettleFails(tx, settleFailRefs)
3,453✔
862
}
3,453✔
863

864
// ackSettleFails persistently acknowledges a batch of settle fail references.
865
func ackSettleFails(tx kvdb.RwTx, settleFailRefs []SettleFailRef) error {
3,577✔
866
        if len(settleFailRefs) == 0 {
7,021✔
867
                return nil
3,444✔
868
        }
3,444✔
869

870
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
133✔
871
        if fwdPkgBkt == nil {
133✔
872
                return ErrCorruptedFwdPkg
×
873
        }
×
874

875
        // Organize the forward references such that we just get a single slice
876
        // of indexes for each unique destination-height pair.
877
        destHeightDiffs := make(map[lnwire.ShortChannelID]map[uint64][]uint16)
133✔
878
        for _, settleFailRef := range settleFailRefs {
694✔
879
                destHeights, ok := destHeightDiffs[settleFailRef.Source]
561✔
880
                if !ok {
694✔
881
                        destHeights = make(map[uint64][]uint16)
133✔
882
                        destHeightDiffs[settleFailRef.Source] = destHeights
133✔
883
                }
133✔
884

885
                destHeights[settleFailRef.Height] = append(
561✔
886
                        destHeights[settleFailRef.Height],
561✔
887
                        settleFailRef.Index,
561✔
888
                )
561✔
889
        }
890

891
        // With the references organized by destination and height, we now load
892
        // each remote bucket, and update the settle fail filter for any
893
        // settle/fail htlcs.
894
        for dest, destHeights := range destHeightDiffs {
266✔
895
                destKey := makeLogKey(dest.ToUint64())
133✔
896
                destBkt := fwdPkgBkt.NestedReadWriteBucket(destKey[:])
133✔
897
                if destBkt == nil {
136✔
898
                        // If the destination bucket is not found, this is
3✔
899
                        // likely the result of the destination channel being
3✔
900
                        // closed and having it's forwarding packages wiped. We
3✔
901
                        // won't treat this as an error, because the response
3✔
902
                        // will no longer be retransmitted internally.
3✔
903
                        continue
3✔
904
                }
905

906
                for height, indexes := range destHeights {
687✔
907
                        err := ackSettleFailsAtHeight(destBkt, height, indexes)
557✔
908
                        if err != nil {
557✔
909
                                return err
×
910
                        }
×
911
                }
912
        }
913

914
        return nil
133✔
915
}
916

917
// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes
918
// at particular a height by updating the settle fail filter.
919
func ackSettleFailsAtHeight(destBkt kvdb.RwBucket, height uint64,
920
        indexes []uint16) error {
557✔
921

557✔
922
        heightKey := makeLogKey(height)
557✔
923
        heightBkt := destBkt.NestedReadWriteBucket(heightKey[:])
557✔
924
        if heightBkt == nil {
557✔
925
                // If the height bucket isn't found, this could be because the
×
926
                // forwarding package was already removed. We'll return nil to
×
927
                // signal that the operation is as there is nothing to ack.
×
928
                return nil
×
929
        }
×
930

931
        // Load ack filter from disk.
932
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
557✔
933
        if settleFailFilterBytes == nil {
557✔
934
                return ErrCorruptedFwdPkg
×
935
        }
×
936

937
        settleFailFilter := &PkgFilter{}
557✔
938
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
557✔
939
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
557✔
940
                return err
×
941
        }
×
942

943
        // Update the ack filter for this height.
944
        for _, index := range indexes {
1,115✔
945
                settleFailFilter.Set(index)
558✔
946
        }
558✔
947

948
        // Write the resulting filter to disk.
949
        var settleFailFilterBuf bytes.Buffer
557✔
950
        if err := settleFailFilter.Encode(&settleFailFilterBuf); err != nil {
557✔
951
                return err
×
952
        }
×
953

954
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
557✔
955
}
956

957
// RemovePkg deletes the forwarding package at the given height from the
958
// packager's source bucket.
959
func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
888✔
960
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
888✔
961
        if fwdPkgBkt == nil {
888✔
962
                return nil
×
963
        }
×
964

965
        sourceBytes := makeLogKey(p.source.ToUint64())
888✔
966
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:])
888✔
967
        if sourceBkt == nil {
888✔
968
                return ErrCorruptedFwdPkg
×
969
        }
×
970

971
        heightKey := makeLogKey(height)
888✔
972

888✔
973
        return sourceBkt.DeleteNestedBucket(heightKey[:])
888✔
974
}
975

976
// Wipe deletes all the channel's forwarding packages, if any.
977
func (p *ChannelPackager) Wipe(tx kvdb.RwTx) error {
116✔
978
        // If the root bucket doesn't exist, there's no need to delete.
116✔
979
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
116✔
980
        if fwdPkgBkt == nil {
117✔
981
                return nil
1✔
982
        }
1✔
983

984
        sourceBytes := makeLogKey(p.source.ToUint64())
115✔
985

115✔
986
        // If the nested bucket doesn't exist, there's no need to delete.
115✔
987
        if fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:]) == nil {
224✔
988
                return nil
109✔
989
        }
109✔
990

991
        return fwdPkgBkt.DeleteNestedBucket(sourceBytes[:])
6✔
992
}
993

994
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
995
func uint16Key(i uint16) []byte {
2,720✔
996
        key := make([]byte, 2)
2,720✔
997
        byteOrder.PutUint16(key, i)
2,720✔
998
        return key
2,720✔
999
}
2,720✔
1000

1001
// Compile-time constraint to ensure that ChannelPackager implements the public
1002
// FwdPackager interface.
1003
var _ FwdPackager = (*ChannelPackager)(nil)
1004

1005
// Compile-time constraint to ensure that SwitchPackager implements the public
1006
// FwdOperator interface.
1007
var _ FwdOperator = (*SwitchPackager)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc