• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.15
/channeldb/forwarding_package.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9

10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
15
// package has potentially been mangled.
16
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
17

18
// FwdState is an enum used to describe the lifecycle of a FwdPkg.
19
type FwdState byte
20

21
const (
22
        // FwdStateLockedIn is the starting state for all forwarding packages.
23
        // Packages in this state have not yet committed to the exact set of
24
        // Adds to forward to the switch.
25
        FwdStateLockedIn FwdState = iota
26

27
        // FwdStateProcessed marks the state in which all Adds have been
28
        // locally processed and the forwarding decision to the switch has been
29
        // persisted.
30
        FwdStateProcessed
31

32
        // FwdStateCompleted signals that all Adds have been acked, and that all
33
        // settles and fails have been delivered to their sources. Packages in
34
        // this state can be removed permanently.
35
        FwdStateCompleted
36
)
37

38
var (
39
        // fwdPackagesKey is the root-level bucket that all forwarding packages
40
        // are written. This bucket is further subdivided based on the short
41
        // channel ID of each channel.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // fwdPackagesKey(root-bucket)
46
        //             |
47
        //             |-- <shortChannelID>
48
        //             |       |
49
        //             |       |-- <height>
50
        //             |       |       |-- ackFilterKey: <encoded bytes of PkgFilter>
51
        //             |       |       |-- settleFailFilterKey: <encoded bytes of PkgFilter>
52
        //             |       |       |-- fwdFilterKey: <encoded bytes of PkgFilter>
53
        //             |       |       |
54
        //             |       |       |-- addBucketKey
55
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
56
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
57
        //             |       |       |        ...
58
        //             |       |       |
59
        //             |       |       |-- failSettleBucketKey
60
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
61
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
62
        //             |       |                ...
63
        //             |       |
64
        //             |       |-- <height>
65
        //             |       |       |
66
        //             |       ...     ...
67
        //             |
68
        //             |
69
        //             |-- <shortChannelID>
70
        //             |       |
71
        //        |       ...
72
        //         ...
73
        //
74
        fwdPackagesKey = []byte("fwd-packages")
75

76
        // addBucketKey is the bucket to which all Add log updates are written.
77
        addBucketKey = []byte("add-updates")
78

79
        // failSettleBucketKey is the bucket to which all Settle/Fail log
80
        // updates are written.
81
        failSettleBucketKey = []byte("fail-settle-updates")
82

83
        // fwdFilterKey is a key used to write the set of Adds that passed
84
        // validation and are to be forwarded to the switch.
85
        // NOTE: The presence of this key within a forwarding package indicates
86
        // that the package has reached FwdStateProcessed.
87
        fwdFilterKey = []byte("fwd-filter-key")
88

89
        // ackFilterKey is a key used to access the PkgFilter indicating which
90
        // Adds have received a Settle/Fail. This response may come from a
91
        // number of sources, including: exitHop settle/fails, switch failures,
92
        // chain arbiter interjections, as well as settle/fails from the
93
        // next hop in the route.
94
        ackFilterKey = []byte("ack-filter-key")
95

96
        // settleFailFilterKey is a key used to access the PkgFilter indicating
97
        // which Settles/Fails in have been received and processed by the link
98
        // that originally received the Add.
99
        settleFailFilterKey = []byte("settle-fail-filter-key")
100
)
101

102
// PkgFilter is used to compactly represent a particular subset of the Adds in a
103
// forwarding package. Each filter is represented as a simple, statically-sized
104
// bitvector, where the elements are intended to be the indices of the Adds as
105
// they are written in the FwdPkg.
106
type PkgFilter struct {
107
        count  uint16
108
        filter []byte
109
}
110

111
// NewPkgFilter initializes an empty PkgFilter supporting `count` elements.
112
func NewPkgFilter(count uint16) *PkgFilter {
6,903✔
113
        // We add 7 to ensure that the integer division yields properly rounded
6,903✔
114
        // values.
6,903✔
115
        filterLen := (count + 7) / 8
6,903✔
116

6,903✔
117
        return &PkgFilter{
6,903✔
118
                count:  count,
6,903✔
119
                filter: make([]byte, filterLen),
6,903✔
120
        }
6,903✔
121
}
6,903✔
122

123
// Count returns the number of elements represented by this PkgFilter.
124
func (f *PkgFilter) Count() uint16 {
1,004✔
125
        return f.count
1,004✔
126
}
1,004✔
127

128
// Set marks the `i`-th element as included by this filter.
129
// NOTE: It is assumed that i is always less than count.
130
func (f *PkgFilter) Set(i uint16) {
500,025✔
131
        byt := i / 8
500,025✔
132
        bit := i % 8
500,025✔
133

500,025✔
134
        // Set the i-th bit in the filter.
500,025✔
135
        // TODO(conner): ignore if > count to prevent panic?
500,025✔
136
        f.filter[byt] |= byte(1 << (7 - bit))
500,025✔
137
}
500,025✔
138

139
// Contains queries the filter for membership of index `i`.
140
// NOTE: It is assumed that i is always less than count.
141
func (f *PkgFilter) Contains(i uint16) bool {
1,013,448✔
142
        byt := i / 8
1,013,448✔
143
        bit := i % 8
1,013,448✔
144

1,013,448✔
145
        // Read the i-th bit in the filter.
1,013,448✔
146
        // TODO(conner): ignore if > count to prevent panic?
1,013,448✔
147
        return f.filter[byt]&(1<<(7-bit)) != 0
1,013,448✔
148
}
1,013,448✔
149

150
// Equal checks two PkgFilters for equality.
151
func (f *PkgFilter) Equal(f2 *PkgFilter) bool {
501,534✔
152
        if f == f2 {
501,534✔
153
                return true
×
154
        }
×
155
        if f.count != f2.count {
501,534✔
156
                return false
×
157
        }
×
158

159
        return bytes.Equal(f.filter, f2.filter)
501,534✔
160
}
161

162
// IsFull returns true if every element in the filter has been Set, and false
163
// otherwise.
164
func (f *PkgFilter) IsFull() bool {
499,622✔
165
        // Batch validate bytes that are fully used.
499,622✔
166
        for i := uint16(0); i < f.count/8; i++ {
21,609,647✔
167
                if f.filter[i] != 0xFF {
21,605,048✔
168
                        return false
495,023✔
169
                }
495,023✔
170
        }
171

172
        // If the count is not a multiple of 8, check that the filter contains
173
        // all remaining bits.
174
        rem := f.count % 8
4,599✔
175
        for idx := f.count - rem; idx < f.count; idx++ {
18,688✔
176
                if !f.Contains(idx) {
17,626✔
177
                        return false
3,537✔
178
                }
3,537✔
179
        }
180

181
        return true
1,062✔
182
}
183

184
// Size returns number of bytes produced when the PkgFilter is serialized.
185
func (f *PkgFilter) Size() uint16 {
1,003,669✔
186
        // 2 bytes for uint16 `count`, then round up number of bytes required to
1,003,669✔
187
        // represent `count` bits.
1,003,669✔
188
        return 2 + (f.count+7)/8
1,003,669✔
189
}
1,003,669✔
190

191
// Encode writes the filter to the provided io.Writer.
192
func (f *PkgFilter) Encode(w io.Writer) error {
507,100✔
193
        if err := binary.Write(w, binary.BigEndian, f.count); err != nil {
507,100✔
194
                return err
×
195
        }
×
196

197
        _, err := w.Write(f.filter)
507,100✔
198

507,100✔
199
        return err
507,100✔
200
}
201

202
// Decode reads the filter from the provided io.Reader.
203
func (f *PkgFilter) Decode(r io.Reader) error {
502,135✔
204
        if err := binary.Read(r, binary.BigEndian, &f.count); err != nil {
502,135✔
205
                return err
×
206
        }
×
207

208
        f.filter = make([]byte, f.Size()-2)
502,135✔
209
        _, err := io.ReadFull(r, f.filter)
502,135✔
210

502,135✔
211
        return err
502,135✔
212
}
213

214
// String returns a human-readable string.
215
func (f *PkgFilter) String() string {
×
216
        return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter)
×
217
}
×
218

219
// FwdPkg records all adds, settles, and fails that were locked in as a result
220
// of the remote peer sending us a revocation. Each package is identified by
221
// the short chanid and remote commitment height corresponding to the revocation
222
// that locked in the HTLCs. For everything except a locally initiated payment,
223
// settles and fails in a forwarding package must have a corresponding Add in
224
// another package, and can be removed individually once the source link has
225
// received the fail/settle.
226
//
227
// Adds cannot be removed, as we need to present the same batch of Adds to
228
// properly handle replay protection. Instead, we use a PkgFilter to mark that
229
// we have finished processing a particular Add. A FwdPkg should only be deleted
230
// after the AckFilter is full and all settles and fails have been persistently
231
// removed.
232
type FwdPkg struct {
233
        // Source identifies the channel that wrote this forwarding package.
234
        Source lnwire.ShortChannelID
235

236
        // Height is the height of the remote commitment chain that locked in
237
        // this forwarding package.
238
        Height uint64
239

240
        // State signals the persistent condition of the package and directs how
241
        // to reprocess the package in the event of failures.
242
        State FwdState
243

244
        // Adds contains all add messages which need to be processed and
245
        // forwarded to the switch. Adds does not change over the life of a
246
        // forwarding package.
247
        Adds []LogUpdate
248

249
        // FwdFilter is a filter containing the indices of all Adds that were
250
        // forwarded to the switch.
251
        FwdFilter *PkgFilter
252

253
        // AckFilter is a filter containing the indices of all Adds for which
254
        // the source has received a settle or fail and is reflected in the next
255
        // commitment txn. A package should not be removed until IsFull()
256
        // returns true.
257
        AckFilter *PkgFilter
258

259
        // SettleFails contains all settle and fail messages that should be
260
        // forwarded to the switch.
261
        SettleFails []LogUpdate
262

263
        // SettleFailFilter is a filter containing the indices of all Settle or
264
        // Fails originating in this package that have been received and locked
265
        // into the incoming link's commitment state.
266
        SettleFailFilter *PkgFilter
267
}
268

269
// NewFwdPkg initializes a new forwarding package in FwdStateLockedIn. This
270
// should be used to create a package at the time we receive a revocation.
271
func NewFwdPkg(source lnwire.ShortChannelID, height uint64,
272
        addUpdates, settleFailUpdates []LogUpdate) *FwdPkg {
1,963✔
273

1,963✔
274
        nAddUpdates := uint16(len(addUpdates))
1,963✔
275
        nSettleFailUpdates := uint16(len(settleFailUpdates))
1,963✔
276

1,963✔
277
        return &FwdPkg{
1,963✔
278
                Source:           source,
1,963✔
279
                Height:           height,
1,963✔
280
                State:            FwdStateLockedIn,
1,963✔
281
                Adds:             addUpdates,
1,963✔
282
                FwdFilter:        NewPkgFilter(nAddUpdates),
1,963✔
283
                AckFilter:        NewPkgFilter(nAddUpdates),
1,963✔
284
                SettleFails:      settleFailUpdates,
1,963✔
285
                SettleFailFilter: NewPkgFilter(nSettleFailUpdates),
1,963✔
286
        }
1,963✔
287
}
1,963✔
288

289
// SourceRef is a convenience method that returns an AddRef to this forwarding
290
// package for the index in the argument. It is the caller's responsibility
291
// to ensure that the index is in bounds.
292
func (f *FwdPkg) SourceRef(i uint16) AddRef {
449✔
293
        return AddRef{
449✔
294
                Height: f.Height,
449✔
295
                Index:  i,
449✔
296
        }
449✔
297
}
449✔
298

299
// DestRef is a convenience method that returns a SettleFailRef to this
300
// forwarding package for the index in the argument. It is the caller's
301
// responsibility to ensure that the index is in bounds.
302
func (f *FwdPkg) DestRef(i uint16) SettleFailRef {
314✔
303
        return SettleFailRef{
314✔
304
                Source: f.Source,
314✔
305
                Height: f.Height,
314✔
306
                Index:  i,
314✔
307
        }
314✔
308
}
314✔
309

310
// ID returns an unique identifier for this package, used to ensure that sphinx
311
// replay processing of this batch is idempotent.
312
func (f *FwdPkg) ID() []byte {
1,176✔
313
        var id = make([]byte, 16)
1,176✔
314
        byteOrder.PutUint64(id[:8], f.Source.ToUint64())
1,176✔
315
        byteOrder.PutUint64(id[8:], f.Height)
1,176✔
316
        return id
1,176✔
317
}
1,176✔
318

319
// String returns a human-readable description of the forwarding package.
320
func (f *FwdPkg) String() string {
×
321
        return fmt.Sprintf("%T(src=%v, height=%v, nadds=%v, nfailsettles=%v)",
×
322
                f, f.Source, f.Height, len(f.Adds), len(f.SettleFails))
×
323
}
×
324

325
// AddRef is used to identify a particular Add in a FwdPkg. The short channel ID
326
// is assumed to be that of the packager.
327
type AddRef struct {
328
        // Height is the remote commitment height that locked in the Add.
329
        Height uint64
330

331
        // Index is the index of the Add within the fwd pkg's Adds.
332
        //
333
        // NOTE: This index is static over the lifetime of a forwarding package.
334
        Index uint16
335
}
336

337
// Encode serializes the AddRef to the given io.Writer.
338
func (a *AddRef) Encode(w io.Writer) error {
583✔
339
        if err := binary.Write(w, binary.BigEndian, a.Height); err != nil {
583✔
340
                return err
×
341
        }
×
342

343
        return binary.Write(w, binary.BigEndian, a.Index)
583✔
344
}
345

346
// Decode deserializes the AddRef from the given io.Reader.
347
func (a *AddRef) Decode(r io.Reader) error {
102✔
348
        if err := binary.Read(r, binary.BigEndian, &a.Height); err != nil {
102✔
349
                return err
×
350
        }
×
351

352
        return binary.Read(r, binary.BigEndian, &a.Index)
102✔
353
}
354

355
// SettleFailRef is used to locate a Settle/Fail in another channel's FwdPkg. A
356
// channel does not remove its own Settle/Fail htlcs, so the source is provided
357
// to locate a db bucket belonging to another channel.
358
type SettleFailRef struct {
359
        // Source identifies the outgoing link that locked in the settle or
360
        // fail. This is then used by the *incoming* link to find the settle
361
        // fail in another link's forwarding packages.
362
        Source lnwire.ShortChannelID
363

364
        // Height is the remote commitment height that locked in this
365
        // Settle/Fail.
366
        Height uint64
367

368
        // Index is the index of the Add with the fwd pkg's SettleFails.
369
        //
370
        // NOTE: This index is static over the lifetime of a forwarding package.
371
        Index uint16
372
}
373

374
// SettleFailAcker is a generic interface providing the ability to acknowledge
375
// settle/fail HTLCs stored in forwarding packages.
376
type SettleFailAcker interface {
377
        // AckSettleFails atomically updates the settle-fail filters in *other*
378
        // channels' forwarding packages.
379
        AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error
380
}
381

382
// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages
383
// of any active channel.
384
type GlobalFwdPkgReader interface {
385
        // LoadChannelFwdPkgs loads all known forwarding packages for the given
386
        // channel.
387
        LoadChannelFwdPkgs(tx kvdb.RTx,
388
                source lnwire.ShortChannelID) ([]*FwdPkg, error)
389
}
390

391
// FwdOperator defines the interfaces for managing forwarding packages that are
392
// external to a particular channel. This interface is used by the switch to
393
// read forwarding packages from arbitrary channels, and acknowledge settles and
394
// fails for locally-sourced payments.
395
type FwdOperator interface {
396
        // GlobalFwdPkgReader provides read access to all known forwarding
397
        // packages
398
        GlobalFwdPkgReader
399

400
        // SettleFailAcker grants the ability to acknowledge settles or fails
401
        // residing in arbitrary forwarding packages.
402
        SettleFailAcker
403
}
404

405
// SwitchPackager is a concrete implementation of the FwdOperator interface.
406
// A SwitchPackager offers the ability to read any forwarding package, and ack
407
// arbitrary settle and fail HTLCs.
408
type SwitchPackager struct{}
409

410
// NewSwitchPackager instantiates a new SwitchPackager.
411
func NewSwitchPackager() *SwitchPackager {
341✔
412
        return &SwitchPackager{}
341✔
413
}
341✔
414

415
// AckSettleFails atomically updates the settle-fail filters in *other*
416
// channels' forwarding packages, to mark that the switch has received a settle
417
// or fail residing in the forwarding package of a link.
418
func (*SwitchPackager) AckSettleFails(tx kvdb.RwTx,
419
        settleFailRefs ...SettleFailRef) error {
118✔
420

118✔
421
        return ackSettleFails(tx, settleFailRefs)
118✔
422
}
118✔
423

424
// LoadChannelFwdPkgs loads all forwarding packages for a particular channel.
425
func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx,
426
        source lnwire.ShortChannelID) ([]*FwdPkg, error) {
129✔
427

129✔
428
        return loadChannelFwdPkgs(tx, source)
129✔
429
}
129✔
430

431
// FwdPackager supports all operations required to modify fwd packages, such as
432
// creation, updates, reading, and removal. The interfaces are broken down in
433
// this way to support future delegation of the subinterfaces.
434
type FwdPackager interface {
435
        // AddFwdPkg serializes and writes a FwdPkg for this channel at the
436
        // remote commitment height included in the forwarding package.
437
        AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error
438

439
        // SetFwdFilter looks up the forwarding package at the remote `height`
440
        // and sets the `fwdFilter`, marking the Adds for which:
441
        // 1) We are not the exit node
442
        // 2) Passed all validation
443
        // 3) Should be forwarded to the switch immediately after a failure
444
        SetFwdFilter(tx kvdb.RwTx, height uint64, fwdFilter *PkgFilter) error
445

446
        // AckAddHtlcs atomically updates the add filters in this channel's
447
        // forwarding packages to mark the resolution of an Add that was
448
        // received from the remote party.
449
        AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error
450

451
        // SettleFailAcker allows a link to acknowledge settle/fail HTLCs
452
        // belonging to other channels.
453
        SettleFailAcker
454

455
        // LoadFwdPkgs loads all known forwarding packages owned by this
456
        // channel.
457
        LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error)
458

459
        // RemovePkg deletes a forwarding package owned by this channel at
460
        // the provided remote `height`.
461
        RemovePkg(tx kvdb.RwTx, height uint64) error
462

463
        // Wipe deletes all the forwarding packages owned by this channel.
464
        Wipe(tx kvdb.RwTx) error
465
}
466

467
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
468
// packages. The packager is tied to a particular source channel ID, allowing it
469
// to create and edit its own packages. Each packager also has the ability to
470
// remove fail/settle htlcs that correspond to an add contained in one of
471
// source's packages.
472
type ChannelPackager struct {
473
        source lnwire.ShortChannelID
474
}
475

476
// NewChannelPackager creates a new packager for a single channel.
477
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
15,199✔
478
        return &ChannelPackager{
15,199✔
479
                source: source,
15,199✔
480
        }
15,199✔
481
}
15,199✔
482

483
// AddFwdPkg writes a newly locked in forwarding package to disk.
484
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error { // nolint: dupl
1,962✔
485
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
1,962✔
486
        if err != nil {
1,962✔
487
                return err
×
488
        }
×
489

490
        source := makeLogKey(fwdPkg.Source.ToUint64())
1,962✔
491
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
1,962✔
492
        if err != nil {
1,962✔
493
                return err
×
494
        }
×
495

496
        heightKey := makeLogKey(fwdPkg.Height)
1,962✔
497
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
1,962✔
498
        if err != nil {
1,962✔
499
                return err
×
500
        }
×
501

502
        // Write ADD updates we received at this commit height.
503
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
1,962✔
504
        if err != nil {
1,962✔
505
                return err
×
506
        }
×
507

508
        // Write SETTLE/FAIL updates we received at this commit height.
509
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
1,962✔
510
        if err != nil {
1,962✔
511
                return err
×
512
        }
×
513

514
        for i := range fwdPkg.Adds {
2,782✔
515
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
820✔
516
                if err != nil {
820✔
517
                        return err
×
518
                }
×
519
        }
520

521
        // Persist the initialized pkg filter, which will be used to determine
522
        // when we can remove this forwarding package from disk.
523
        var ackFilterBuf bytes.Buffer
1,962✔
524
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
1,962✔
525
                return err
×
526
        }
×
527

528
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
1,962✔
529
                return err
×
530
        }
×
531

532
        for i := range fwdPkg.SettleFails {
2,387✔
533
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
425✔
534
                if err != nil {
425✔
535
                        return err
×
536
                }
×
537
        }
538

539
        var settleFailFilterBuf bytes.Buffer
1,962✔
540
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
1,962✔
541
        if err != nil {
1,962✔
542
                return err
×
543
        }
×
544

545
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
1,962✔
546
}
547

548
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
549
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
1,245✔
550
        var b bytes.Buffer
1,245✔
551
        if err := serializeLogUpdate(&b, htlc); err != nil {
1,245✔
552
                return err
×
553
        }
×
554

555
        return bkt.Put(uint16Key(idx), b.Bytes())
1,245✔
556
}
557

558
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
559
// processed, and returns their deserialized log updates in a map indexed by the
560
// remote commitment height at which the updates were locked in.
561
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error) {
467✔
562
        return loadChannelFwdPkgs(tx, p.source)
467✔
563
}
467✔
564

565
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
566
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
596✔
567
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
596✔
568
        if fwdPkgBkt == nil {
602✔
569
                return nil, nil
6✔
570
        }
6✔
571

572
        sourceKey := makeLogKey(source.ToUint64())
590✔
573
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
590✔
574
        if sourceBkt == nil {
1,132✔
575
                return nil, nil
542✔
576
        }
542✔
577

578
        var heights []uint64
48✔
579
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
98✔
580
                if len(k) != 8 {
50✔
581
                        return ErrCorruptedFwdPkg
×
582
                }
×
583

584
                heights = append(heights, byteOrder.Uint64(k))
50✔
585

50✔
586
                return nil
50✔
587
        }); err != nil {
×
588
                return nil, err
×
589
        }
×
590

591
        // Load the forwarding package for each retrieved height.
592
        fwdPkgs := make([]*FwdPkg, 0, len(heights))
48✔
593
        for _, height := range heights {
98✔
594
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
50✔
595
                if err != nil {
50✔
596
                        return nil, err
×
597
                }
×
598

599
                fwdPkgs = append(fwdPkgs, fwdPkg)
50✔
600
        }
601

602
        return fwdPkgs, nil
48✔
603
}
604

605
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
606
// appropriate FwdState.
607
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
608
        height uint64) (*FwdPkg, error) {
50✔
609

50✔
610
        sourceKey := makeLogKey(source.ToUint64())
50✔
611
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
50✔
612
        if sourceBkt == nil {
50✔
613
                return nil, ErrCorruptedFwdPkg
×
614
        }
×
615

616
        heightKey := makeLogKey(height)
50✔
617
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
50✔
618
        if heightBkt == nil {
50✔
619
                return nil, ErrCorruptedFwdPkg
×
620
        }
×
621

622
        // Load ADDs from disk.
623
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
50✔
624
        if addBkt == nil {
50✔
625
                return nil, ErrCorruptedFwdPkg
×
626
        }
×
627

628
        adds, err := loadHtlcs(addBkt)
50✔
629
        if err != nil {
50✔
630
                return nil, err
×
631
        }
×
632

633
        // Load ack filter from disk.
634
        ackFilterBytes := heightBkt.Get(ackFilterKey)
50✔
635
        if ackFilterBytes == nil {
50✔
636
                return nil, ErrCorruptedFwdPkg
×
637
        }
×
638
        ackFilterReader := bytes.NewReader(ackFilterBytes)
50✔
639

50✔
640
        ackFilter := &PkgFilter{}
50✔
641
        if err := ackFilter.Decode(ackFilterReader); err != nil {
50✔
642
                return nil, err
×
643
        }
×
644

645
        // Load SETTLE/FAILs from disk.
646
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
50✔
647
        if failSettleBkt == nil {
50✔
648
                return nil, ErrCorruptedFwdPkg
×
649
        }
×
650

651
        failSettles, err := loadHtlcs(failSettleBkt)
50✔
652
        if err != nil {
50✔
653
                return nil, err
×
654
        }
×
655

656
        // Load settle fail filter from disk.
657
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
50✔
658
        if settleFailFilterBytes == nil {
50✔
659
                return nil, ErrCorruptedFwdPkg
×
660
        }
×
661
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
50✔
662

50✔
663
        settleFailFilter := &PkgFilter{}
50✔
664
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
50✔
665
                return nil, err
×
666
        }
×
667

668
        // Initialize the fwding package, which always starts in the
669
        // FwdStateLockedIn. We can determine what state the package was left in
670
        // by examining constraints on the information loaded from disk.
671
        fwdPkg := &FwdPkg{
50✔
672
                Source:           source,
50✔
673
                State:            FwdStateLockedIn,
50✔
674
                Height:           height,
50✔
675
                Adds:             adds,
50✔
676
                AckFilter:        ackFilter,
50✔
677
                SettleFails:      failSettles,
50✔
678
                SettleFailFilter: settleFailFilter,
50✔
679
        }
50✔
680

50✔
681
        // Check to see if we have written the set exported filter adds to
50✔
682
        // disk. If we haven't, processing of this package was never started, or
50✔
683
        // failed during the last attempt.
50✔
684
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
50✔
685
        if fwdFilterBytes == nil {
63✔
686
                nAdds := uint16(len(adds))
13✔
687
                fwdPkg.FwdFilter = NewPkgFilter(nAdds)
13✔
688
                return fwdPkg, nil
13✔
689
        }
13✔
690

691
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
37✔
692
        fwdPkg.FwdFilter = &PkgFilter{}
37✔
693
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
37✔
694
                return nil, err
×
695
        }
×
696

697
        // Otherwise, a complete round of processing was completed, and we
698
        // advance the package to FwdStateProcessed.
699
        fwdPkg.State = FwdStateProcessed
37✔
700

37✔
701
        // If every add, settle, and fail has been fully acknowledged, we can
37✔
702
        // safely set the package's state to FwdStateCompleted, signalling that
37✔
703
        // it can be garbage collected.
37✔
704
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
52✔
705
                fwdPkg.State = FwdStateCompleted
15✔
706
        }
15✔
707

708
        return fwdPkg, nil
37✔
709
}
710

711
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
712
// them in order of the indexes they were written under.
713
func loadHtlcs(bkt kvdb.RBucket) ([]LogUpdate, error) {
100✔
714
        var htlcs []LogUpdate
100✔
715
        if err := bkt.ForEach(func(_, v []byte) error {
183✔
716
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
83✔
717
                if err != nil {
83✔
718
                        return err
×
719
                }
×
720

721
                htlcs = append(htlcs, *htlc)
83✔
722

83✔
723
                return nil
83✔
724
        }); err != nil {
×
725
                return nil, err
×
726
        }
×
727

728
        return htlcs, nil
100✔
729
}
730

731
// SetFwdFilter writes the set of indexes corresponding to Adds at the
732
// `height` that are to be forwarded to the switch. Calling this method causes
733
// the forwarding package at `height` to be in FwdStateProcessed. We write this
734
// forwarding decision so that we always arrive at the same behavior for HTLCs
735
// leaving this channel. After a restart, we skip validation of these Adds,
736
// since they are assumed to have already been validated, and make the switch or
737
// outgoing link responsible for handling replays.
738
func (p *ChannelPackager) SetFwdFilter(tx kvdb.RwTx, height uint64,
739
        fwdFilter *PkgFilter) error {
1,178✔
740

1,178✔
741
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
1,178✔
742
        if fwdPkgBkt == nil {
1,178✔
743
                return ErrCorruptedFwdPkg
×
744
        }
×
745

746
        source := makeLogKey(p.source.ToUint64())
1,178✔
747
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(source[:])
1,178✔
748
        if sourceBkt == nil {
1,178✔
749
                return ErrCorruptedFwdPkg
×
750
        }
×
751

752
        heightKey := makeLogKey(height)
1,178✔
753
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
1,178✔
754
        if heightBkt == nil {
1,178✔
755
                return ErrCorruptedFwdPkg
×
756
        }
×
757

758
        // If the fwd filter has already been written, we return early to avoid
759
        // modifying the persistent state.
760
        forwardedAddsBytes := heightBkt.Get(fwdFilterKey)
1,178✔
761
        if forwardedAddsBytes != nil {
1,178✔
762
                return nil
×
763
        }
×
764

765
        // Otherwise we serialize and write the provided fwd filter.
766
        var b bytes.Buffer
1,178✔
767
        if err := fwdFilter.Encode(&b); err != nil {
1,178✔
768
                return err
×
769
        }
×
770

771
        return heightBkt.Put(fwdFilterKey, b.Bytes())
1,178✔
772
}
773

774
// AckAddHtlcs accepts a list of references to add htlcs, and updates the
775
// AckAddFilter of those forwarding packages to indicate that a settle or fail
776
// has been received in response to the add.
777
func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error {
2,050✔
778
        if len(addRefs) == 0 {
3,760✔
779
                return nil
1,710✔
780
        }
1,710✔
781

782
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
340✔
783
        if fwdPkgBkt == nil {
340✔
784
                return ErrCorruptedFwdPkg
×
785
        }
×
786

787
        sourceKey := makeLogKey(p.source.ToUint64())
340✔
788
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceKey[:])
340✔
789
        if sourceBkt == nil {
340✔
790
                return ErrCorruptedFwdPkg
×
791
        }
×
792

793
        // Organize the forward references such that we just get a single slice
794
        // of indexes for each unique height.
795
        heightDiffs := make(map[uint64][]uint16)
340✔
796
        for _, addRef := range addRefs {
688✔
797
                heightDiffs[addRef.Height] = append(
348✔
798
                        heightDiffs[addRef.Height],
348✔
799
                        addRef.Index,
348✔
800
                )
348✔
801
        }
348✔
802

803
        // Load each height bucket once and remove all acked htlcs at that
804
        // height.
805
        for height, indexes := range heightDiffs {
680✔
806
                err := ackAddHtlcsAtHeight(sourceBkt, height, indexes)
340✔
807
                if err != nil {
340✔
808
                        return err
×
809
                }
×
810
        }
811

812
        return nil
340✔
813
}
814

815
// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package
816
// with a list of indexes, writing the resulting filter back in its place.
817
func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64,
818
        indexes []uint16) error {
340✔
819

340✔
820
        heightKey := makeLogKey(height)
340✔
821
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
340✔
822
        if heightBkt == nil {
340✔
823
                // If the height bucket isn't found, this could be because the
×
824
                // forwarding package was already removed. We'll return nil to
×
825
                // signal that the operation is successful, as there is nothing
×
826
                // to ack.
×
827
                return nil
×
828
        }
×
829

830
        // Load ack filter from disk.
831
        ackFilterBytes := heightBkt.Get(ackFilterKey)
340✔
832
        if ackFilterBytes == nil {
340✔
833
                return ErrCorruptedFwdPkg
×
834
        }
×
835

836
        ackFilter := &PkgFilter{}
340✔
837
        ackFilterReader := bytes.NewReader(ackFilterBytes)
340✔
838
        if err := ackFilter.Decode(ackFilterReader); err != nil {
340✔
839
                return err
×
840
        }
×
841

842
        // Update the ack filter for this height.
843
        for _, index := range indexes {
688✔
844
                ackFilter.Set(index)
348✔
845
        }
348✔
846

847
        // Write the resulting filter to disk.
848
        var ackFilterBuf bytes.Buffer
340✔
849
        if err := ackFilter.Encode(&ackFilterBuf); err != nil {
340✔
850
                return err
×
851
        }
×
852

853
        return heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes())
340✔
854
}
855

856
// AckSettleFails persistently acknowledges settles or fails from a remote forwarding
857
// package. This should only be called after the source of the Add has locked in
858
// the settle/fail, or it becomes otherwise safe to forgo retransmitting the
859
// settle/fail after a restart.
860
func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error {
2,049✔
861
        return ackSettleFails(tx, settleFailRefs)
2,049✔
862
}
2,049✔
863

864
// ackSettleFails persistently acknowledges a batch of settle fail references.
865
func ackSettleFails(tx kvdb.RwTx, settleFailRefs []SettleFailRef) error {
2,167✔
866
        if len(settleFailRefs) == 0 {
4,207✔
867
                return nil
2,040✔
868
        }
2,040✔
869

870
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
127✔
871
        if fwdPkgBkt == nil {
127✔
872
                return ErrCorruptedFwdPkg
×
873
        }
×
874

875
        // Organize the forward references such that we just get a single slice
876
        // of indexes for each unique destination-height pair.
877
        destHeightDiffs := make(map[lnwire.ShortChannelID]map[uint64][]uint16)
127✔
878
        for _, settleFailRef := range settleFailRefs {
254✔
879
                destHeights, ok := destHeightDiffs[settleFailRef.Source]
127✔
880
                if !ok {
254✔
881
                        destHeights = make(map[uint64][]uint16)
127✔
882
                        destHeightDiffs[settleFailRef.Source] = destHeights
127✔
883
                }
127✔
884

885
                destHeights[settleFailRef.Height] = append(
127✔
886
                        destHeights[settleFailRef.Height],
127✔
887
                        settleFailRef.Index,
127✔
888
                )
127✔
889
        }
890

891
        // With the references organized by destination and height, we now load
892
        // each remote bucket, and update the settle fail filter for any
893
        // settle/fail htlcs.
894
        for dest, destHeights := range destHeightDiffs {
254✔
895
                destKey := makeLogKey(dest.ToUint64())
127✔
896
                destBkt := fwdPkgBkt.NestedReadWriteBucket(destKey[:])
127✔
897
                if destBkt == nil {
130✔
898
                        // If the destination bucket is not found, this is
3✔
899
                        // likely the result of the destination channel being
3✔
900
                        // closed and having it's forwarding packages wiped. We
3✔
901
                        // won't treat this as an error, because the response
3✔
902
                        // will no longer be retransmitted internally.
3✔
903
                        continue
3✔
904
                }
905

906
                for height, indexes := range destHeights {
248✔
907
                        err := ackSettleFailsAtHeight(destBkt, height, indexes)
124✔
908
                        if err != nil {
124✔
909
                                return err
×
910
                        }
×
911
                }
912
        }
913

914
        return nil
127✔
915
}
916

917
// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes
918
// at particular a height by updating the settle fail filter.
919
func ackSettleFailsAtHeight(destBkt kvdb.RwBucket, height uint64,
920
        indexes []uint16) error {
124✔
921

124✔
922
        heightKey := makeLogKey(height)
124✔
923
        heightBkt := destBkt.NestedReadWriteBucket(heightKey[:])
124✔
924
        if heightBkt == nil {
124✔
925
                // If the height bucket isn't found, this could be because the
×
926
                // forwarding package was already removed. We'll return nil to
×
927
                // signal that the operation is as there is nothing to ack.
×
928
                return nil
×
929
        }
×
930

931
        // Load ack filter from disk.
932
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
124✔
933
        if settleFailFilterBytes == nil {
124✔
934
                return ErrCorruptedFwdPkg
×
935
        }
×
936

937
        settleFailFilter := &PkgFilter{}
124✔
938
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
124✔
939
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
124✔
940
                return err
×
941
        }
×
942

943
        // Update the ack filter for this height.
944
        for _, index := range indexes {
248✔
945
                settleFailFilter.Set(index)
124✔
946
        }
124✔
947

948
        // Write the resulting filter to disk.
949
        var settleFailFilterBuf bytes.Buffer
124✔
950
        if err := settleFailFilter.Encode(&settleFailFilterBuf); err != nil {
124✔
951
                return err
×
952
        }
×
953

954
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
124✔
955
}
956

957
// RemovePkg deletes the forwarding package at the given height from the
958
// packager's source bucket.
959
func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
6✔
960
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
6✔
961
        if fwdPkgBkt == nil {
6✔
962
                return nil
×
963
        }
×
964

965
        sourceBytes := makeLogKey(p.source.ToUint64())
6✔
966
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:])
6✔
967
        if sourceBkt == nil {
6✔
968
                return ErrCorruptedFwdPkg
×
969
        }
×
970

971
        heightKey := makeLogKey(height)
6✔
972

6✔
973
        return sourceBkt.DeleteNestedBucket(heightKey[:])
6✔
974
}
975

976
// Wipe deletes all the channel's forwarding packages, if any.
977
func (p *ChannelPackager) Wipe(tx kvdb.RwTx) error {
116✔
978
        // If the root bucket doesn't exist, there's no need to delete.
116✔
979
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
116✔
980
        if fwdPkgBkt == nil {
117✔
981
                return nil
1✔
982
        }
1✔
983

984
        sourceBytes := makeLogKey(p.source.ToUint64())
115✔
985

115✔
986
        // If the nested bucket doesn't exist, there's no need to delete.
115✔
987
        if fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:]) == nil {
224✔
988
                return nil
109✔
989
        }
109✔
990

991
        return fwdPkgBkt.DeleteNestedBucket(sourceBytes[:])
6✔
992
}
993

994
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
995
func uint16Key(i uint16) []byte {
1,245✔
996
        key := make([]byte, 2)
1,245✔
997
        byteOrder.PutUint16(key, i)
1,245✔
998
        return key
1,245✔
999
}
1,245✔
1000

1001
// Compile-time constraint to ensure that ChannelPackager implements the public
1002
// FwdPackager interface.
1003
var _ FwdPackager = (*ChannelPackager)(nil)
1004

1005
// Compile-time constraint to ensure that SwitchPackager implements the public
1006
// FwdOperator interface.
1007
var _ FwdOperator = (*SwitchPackager)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc