• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9111774206

pending completion
9111774206

Pull #8765

github

hieblmi
routing: log edge when skipping it
Pull Request #8765: routing: log edge when skipping it

1 of 1 new or added line in 1 file covered. (100.0%)

104 existing lines in 27 files now uncovered.

122984 of 210570 relevant lines covered (58.41%)

28065.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.95
/channeldb/forwarding_package.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9

10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding
15
// package has potentially been mangled.
16
var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted")
17

18
// FwdState is an enum used to describe the lifecycle of a FwdPkg.
19
type FwdState byte
20

21
const (
22
        // FwdStateLockedIn is the starting state for all forwarding packages.
23
        // Packages in this state have not yet committed to the exact set of
24
        // Adds to forward to the switch.
25
        FwdStateLockedIn FwdState = iota
26

27
        // FwdStateProcessed marks the state in which all Adds have been
28
        // locally processed and the forwarding decision to the switch has been
29
        // persisted.
30
        FwdStateProcessed
31

32
        // FwdStateCompleted signals that all Adds have been acked, and that all
33
        // settles and fails have been delivered to their sources. Packages in
34
        // this state can be removed permanently.
35
        FwdStateCompleted
36
)
37

38
var (
39
        // fwdPackagesKey is the root-level bucket that all forwarding packages
40
        // are written. This bucket is further subdivided based on the short
41
        // channel ID of each channel.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // fwdPackagesKey(root-bucket)
46
        //             |
47
        //             |-- <shortChannelID>
48
        //             |       |
49
        //             |       |-- <height>
50
        //             |       |       |-- ackFilterKey: <encoded bytes of PkgFilter>
51
        //             |       |       |-- settleFailFilterKey: <encoded bytes of PkgFilter>
52
        //             |       |       |-- fwdFilterKey: <encoded bytes of PkgFilter>
53
        //             |       |       |
54
        //             |       |       |-- addBucketKey
55
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
56
        //             |       |       |        |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
57
        //             |       |       |        ...
58
        //             |       |       |
59
        //             |       |       |-- failSettleBucketKey
60
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
61
        //             |       |                |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
62
        //             |       |                ...
63
        //             |       |
64
        //             |       |-- <height>
65
        //             |       |       |
66
        //             |       ...     ...
67
        //             |
68
        //             |
69
        //             |-- <shortChannelID>
70
        //             |       |
71
        //        |       ...
72
        //         ...
73
        //
74
        fwdPackagesKey = []byte("fwd-packages")
75

76
        // addBucketKey is the bucket to which all Add log updates are written.
77
        addBucketKey = []byte("add-updates")
78

79
        // failSettleBucketKey is the bucket to which all Settle/Fail log
80
        // updates are written.
81
        failSettleBucketKey = []byte("fail-settle-updates")
82

83
        // fwdFilterKey is a key used to write the set of Adds that passed
84
        // validation and are to be forwarded to the switch.
85
        // NOTE: The presence of this key within a forwarding package indicates
86
        // that the package has reached FwdStateProcessed.
87
        fwdFilterKey = []byte("fwd-filter-key")
88

89
        // ackFilterKey is a key used to access the PkgFilter indicating which
90
        // Adds have received a Settle/Fail. This response may come from a
91
        // number of sources, including: exitHop settle/fails, switch failures,
92
        // chain arbiter interjections, as well as settle/fails from the
93
        // next hop in the route.
94
        ackFilterKey = []byte("ack-filter-key")
95

96
        // settleFailFilterKey is a key used to access the PkgFilter indicating
97
        // which Settles/Fails in have been received and processed by the link
98
        // that originally received the Add.
99
        settleFailFilterKey = []byte("settle-fail-filter-key")
100
)
101

102
// PkgFilter is used to compactly represent a particular subset of the Adds in a
103
// forwarding package. Each filter is represented as a simple, statically-sized
104
// bitvector, where the elements are intended to be the indices of the Adds as
105
// they are written in the FwdPkg.
106
type PkgFilter struct {
107
        count  uint16
108
        filter []byte
109
}
110

111
// NewPkgFilter initializes an empty PkgFilter supporting `count` elements.
112
func NewPkgFilter(count uint16) *PkgFilter {
9,721✔
113
        // We add 7 to ensure that the integer division yields properly rounded
9,721✔
114
        // values.
9,721✔
115
        filterLen := (count + 7) / 8
9,721✔
116

9,721✔
117
        return &PkgFilter{
9,721✔
118
                count:  count,
9,721✔
119
                filter: make([]byte, filterLen),
9,721✔
120
        }
9,721✔
121
}
9,721✔
122

123
// Count returns the number of elements represented by this PkgFilter.
124
func (f *PkgFilter) Count() uint16 {
1,004✔
125
        return f.count
1,004✔
126
}
1,004✔
127

128
// Set marks the `i`-th element as included by this filter.
129
// NOTE: It is assumed that i is always less than count.
130
func (f *PkgFilter) Set(i uint16) {
500,901✔
131
        byt := i / 8
500,901✔
132
        bit := i % 8
500,901✔
133

500,901✔
134
        // Set the i-th bit in the filter.
500,901✔
135
        // TODO(conner): ignore if > count to prevent panic?
500,901✔
136
        f.filter[byt] |= byte(1 << (7 - bit))
500,901✔
137
}
500,901✔
138

139
// Contains queries the filter for membership of index `i`.
140
// NOTE: It is assumed that i is always less than count.
141
func (f *PkgFilter) Contains(i uint16) bool {
1,014,400✔
142
        byt := i / 8
1,014,400✔
143
        bit := i % 8
1,014,400✔
144

1,014,400✔
145
        // Read the i-th bit in the filter.
1,014,400✔
146
        // TODO(conner): ignore if > count to prevent panic?
1,014,400✔
147
        return f.filter[byt]&(1<<(7-bit)) != 0
1,014,400✔
148
}
1,014,400✔
149

150
// Equal checks two PkgFilters for equality.
151
func (f *PkgFilter) Equal(f2 *PkgFilter) bool {
501,534✔
152
        if f == f2 {
501,534✔
153
                return true
×
154
        }
×
155
        if f.count != f2.count {
501,534✔
156
                return false
×
157
        }
×
158

159
        return bytes.Equal(f.filter, f2.filter)
501,534✔
160
}
161

162
// IsFull returns true if every element in the filter has been Set, and false
163
// otherwise.
164
func (f *PkgFilter) IsFull() bool {
501,561✔
165
        // Batch validate bytes that are fully used.
501,561✔
166
        for i := uint16(0); i < f.count/8; i++ {
21,611,645✔
167
                if f.filter[i] != 0xFF {
21,605,142✔
168
                        return false
495,058✔
169
                }
495,058✔
170
        }
171

172
        // If the count is not a multiple of 8, check that the filter contains
173
        // all remaining bits.
174
        rem := f.count % 8
6,507✔
175
        for idx := f.count - rem; idx < f.count; idx++ {
21,114✔
176
                if !f.Contains(idx) {
18,213✔
177
                        return false
3,606✔
178
                }
3,606✔
179
        }
180

181
        return true
2,905✔
182
}
183

184
// Size returns number of bytes produced when the PkgFilter is serialized.
185
func (f *PkgFilter) Size() uint16 {
1,007,492✔
186
        // 2 bytes for uint16 `count`, then round up number of bytes required to
1,007,492✔
187
        // represent `count` bits.
1,007,492✔
188
        return 2 + (f.count+7)/8
1,007,492✔
189
}
1,007,492✔
190

191
// Encode writes the filter to the provided io.Writer.
192
func (f *PkgFilter) Encode(w io.Writer) error {
510,844✔
193
        if err := binary.Write(w, binary.BigEndian, f.count); err != nil {
510,844✔
194
                return err
×
195
        }
×
196

197
        _, err := w.Write(f.filter)
510,844✔
198

510,844✔
199
        return err
510,844✔
200
}
201

202
// Decode reads the filter from the provided io.Reader.
203
func (f *PkgFilter) Decode(r io.Reader) error {
505,958✔
204
        if err := binary.Read(r, binary.BigEndian, &f.count); err != nil {
505,958✔
205
                return err
×
206
        }
×
207

208
        f.filter = make([]byte, f.Size()-2)
505,958✔
209
        _, err := io.ReadFull(r, f.filter)
505,958✔
210

505,958✔
211
        return err
505,958✔
212
}
213

214
// FwdPkg records all adds, settles, and fails that were locked in as a result
215
// of the remote peer sending us a revocation. Each package is identified by
216
// the short chanid and remote commitment height corresponding to the revocation
217
// that locked in the HTLCs. For everything except a locally initiated payment,
218
// settles and fails in a forwarding package must have a corresponding Add in
219
// another package, and can be removed individually once the source link has
220
// received the fail/settle.
221
//
222
// Adds cannot be removed, as we need to present the same batch of Adds to
223
// properly handle replay protection. Instead, we use a PkgFilter to mark that
224
// we have finished processing a particular Add. A FwdPkg should only be deleted
225
// after the AckFilter is full and all settles and fails have been persistently
226
// removed.
227
type FwdPkg struct {
228
        // Source identifies the channel that wrote this forwarding package.
229
        Source lnwire.ShortChannelID
230

231
        // Height is the height of the remote commitment chain that locked in
232
        // this forwarding package.
233
        Height uint64
234

235
        // State signals the persistent condition of the package and directs how
236
        // to reprocess the package in the event of failures.
237
        State FwdState
238

239
        // Adds contains all add messages which need to be processed and
240
        // forwarded to the switch. Adds does not change over the life of a
241
        // forwarding package.
242
        Adds []LogUpdate
243

244
        // FwdFilter is a filter containing the indices of all Adds that were
245
        // forwarded to the switch.
246
        FwdFilter *PkgFilter
247

248
        // AckFilter is a filter containing the indices of all Adds for which
249
        // the source has received a settle or fail and is reflected in the next
250
        // commitment txn. A package should not be removed until IsFull()
251
        // returns true.
252
        AckFilter *PkgFilter
253

254
        // SettleFails contains all settle and fail messages that should be
255
        // forwarded to the switch.
256
        SettleFails []LogUpdate
257

258
        // SettleFailFilter is a filter containing the indices of all Settle or
259
        // Fails originating in this package that have been received and locked
260
        // into the incoming link's commitment state.
261
        SettleFailFilter *PkgFilter
262
}
263

264
// NewFwdPkg initializes a new forwarding package in FwdStateLockedIn. This
265
// should be used to create a package at the time we receive a revocation.
266
func NewFwdPkg(source lnwire.ShortChannelID, height uint64,
267
        addUpdates, settleFailUpdates []LogUpdate) *FwdPkg {
2,905✔
268

2,905✔
269
        nAddUpdates := uint16(len(addUpdates))
2,905✔
270
        nSettleFailUpdates := uint16(len(settleFailUpdates))
2,905✔
271

2,905✔
272
        return &FwdPkg{
2,905✔
273
                Source:           source,
2,905✔
274
                Height:           height,
2,905✔
275
                State:            FwdStateLockedIn,
2,905✔
276
                Adds:             addUpdates,
2,905✔
277
                FwdFilter:        NewPkgFilter(nAddUpdates),
2,905✔
278
                AckFilter:        NewPkgFilter(nAddUpdates),
2,905✔
279
                SettleFails:      settleFailUpdates,
2,905✔
280
                SettleFailFilter: NewPkgFilter(nSettleFailUpdates),
2,905✔
281
        }
2,905✔
282
}
2,905✔
283

284
// ID returns an unique identifier for this package, used to ensure that sphinx
285
// replay processing of this batch is idempotent.
286
func (f *FwdPkg) ID() []byte {
2,174✔
287
        var id = make([]byte, 16)
2,174✔
288
        byteOrder.PutUint64(id[:8], f.Source.ToUint64())
2,174✔
289
        byteOrder.PutUint64(id[8:], f.Height)
2,174✔
290
        return id
2,174✔
291
}
2,174✔
292

293
// String returns a human-readable description of the forwarding package.
294
func (f *FwdPkg) String() string {
×
295
        return fmt.Sprintf("%T(src=%v, height=%v, nadds=%v, nfailsettles=%v)",
×
296
                f, f.Source, f.Height, len(f.Adds), len(f.SettleFails))
×
297
}
×
298

299
// AddRef is used to identify a particular Add in a FwdPkg. The short channel ID
300
// is assumed to be that of the packager.
301
type AddRef struct {
302
        // Height is the remote commitment height that locked in the Add.
303
        Height uint64
304

305
        // Index is the index of the Add within the fwd pkg's Adds.
306
        //
307
        // NOTE: This index is static over the lifetime of a forwarding package.
308
        Index uint16
309
}
310

311
// Encode serializes the AddRef to the given io.Writer.
312
func (a *AddRef) Encode(w io.Writer) error {
1,631✔
313
        if err := binary.Write(w, binary.BigEndian, a.Height); err != nil {
1,631✔
314
                return err
×
315
        }
×
316

317
        return binary.Write(w, binary.BigEndian, a.Index)
1,631✔
318
}
319

320
// Decode deserializes the AddRef from the given io.Reader.
321
func (a *AddRef) Decode(r io.Reader) error {
106✔
322
        if err := binary.Read(r, binary.BigEndian, &a.Height); err != nil {
106✔
323
                return err
×
324
        }
×
325

326
        return binary.Read(r, binary.BigEndian, &a.Index)
106✔
327
}
328

329
// SettleFailRef is used to locate a Settle/Fail in another channel's FwdPkg. A
330
// channel does not remove its own Settle/Fail htlcs, so the source is provided
331
// to locate a db bucket belonging to another channel.
332
type SettleFailRef struct {
333
        // Source identifies the outgoing link that locked in the settle or
334
        // fail. This is then used by the *incoming* link to find the settle
335
        // fail in another link's forwarding packages.
336
        Source lnwire.ShortChannelID
337

338
        // Height is the remote commitment height that locked in this
339
        // Settle/Fail.
340
        Height uint64
341

342
        // Index is the index of the Add with the fwd pkg's SettleFails.
343
        //
344
        // NOTE: This index is static over the lifetime of a forwarding package.
345
        Index uint16
346
}
347

348
// SettleFailAcker is a generic interface providing the ability to acknowledge
349
// settle/fail HTLCs stored in forwarding packages.
350
type SettleFailAcker interface {
351
        // AckSettleFails atomically updates the settle-fail filters in *other*
352
        // channels' forwarding packages.
353
        AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error
354
}
355

356
// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages
357
// of any active channel.
358
type GlobalFwdPkgReader interface {
359
        // LoadChannelFwdPkgs loads all known forwarding packages for the given
360
        // channel.
361
        LoadChannelFwdPkgs(tx kvdb.RTx,
362
                source lnwire.ShortChannelID) ([]*FwdPkg, error)
363
}
364

365
// FwdOperator defines the interfaces for managing forwarding packages that are
366
// external to a particular channel. This interface is used by the switch to
367
// read forwarding packages from arbitrary channels, and acknowledge settles and
368
// fails for locally-sourced payments.
369
type FwdOperator interface {
370
        // GlobalFwdPkgReader provides read access to all known forwarding
371
        // packages
372
        GlobalFwdPkgReader
373

374
        // SettleFailAcker grants the ability to acknowledge settles or fails
375
        // residing in arbitrary forwarding packages.
376
        SettleFailAcker
377
}
378

379
// SwitchPackager is a concrete implementation of the FwdOperator interface.
380
// A SwitchPackager offers the ability to read any forwarding package, and ack
381
// arbitrary settle and fail HTLCs.
382
type SwitchPackager struct{}
383

384
// NewSwitchPackager instantiates a new SwitchPackager.
385
func NewSwitchPackager() *SwitchPackager {
343✔
386
        return &SwitchPackager{}
343✔
387
}
343✔
388

389
// AckSettleFails atomically updates the settle-fail filters in *other*
390
// channels' forwarding packages, to mark that the switch has received a settle
391
// or fail residing in the forwarding package of a link.
392
func (*SwitchPackager) AckSettleFails(tx kvdb.RwTx,
393
        settleFailRefs ...SettleFailRef) error {
128✔
394

128✔
395
        return ackSettleFails(tx, settleFailRefs)
128✔
396
}
128✔
397

398
// LoadChannelFwdPkgs loads all forwarding packages for a particular channel.
399
func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx,
400
        source lnwire.ShortChannelID) ([]*FwdPkg, error) {
131✔
401

131✔
402
        return loadChannelFwdPkgs(tx, source)
131✔
403
}
131✔
404

405
// FwdPackager supports all operations required to modify fwd packages, such as
406
// creation, updates, reading, and removal. The interfaces are broken down in
407
// this way to support future delegation of the subinterfaces.
408
type FwdPackager interface {
409
        // AddFwdPkg serializes and writes a FwdPkg for this channel at the
410
        // remote commitment height included in the forwarding package.
411
        AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error
412

413
        // SetFwdFilter looks up the forwarding package at the remote `height`
414
        // and sets the `fwdFilter`, marking the Adds for which:
415
        // 1) We are not the exit node
416
        // 2) Passed all validation
417
        // 3) Should be forwarded to the switch immediately after a failure
418
        SetFwdFilter(tx kvdb.RwTx, height uint64, fwdFilter *PkgFilter) error
419

420
        // AckAddHtlcs atomically updates the add filters in this channel's
421
        // forwarding packages to mark the resolution of an Add that was
422
        // received from the remote party.
423
        AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error
424

425
        // SettleFailAcker allows a link to acknowledge settle/fail HTLCs
426
        // belonging to other channels.
427
        SettleFailAcker
428

429
        // LoadFwdPkgs loads all known forwarding packages owned by this
430
        // channel.
431
        LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error)
432

433
        // RemovePkg deletes a forwarding package owned by this channel at
434
        // the provided remote `height`.
435
        RemovePkg(tx kvdb.RwTx, height uint64) error
436

437
        // Wipe deletes all the forwarding packages owned by this channel.
438
        Wipe(tx kvdb.RwTx) error
439
}
440

441
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
442
// packages. The packager is tied to a particular source channel ID, allowing it
443
// to create and edit its own packages. Each packager also has the ability to
444
// remove fail/settle htlcs that correspond to an add contained in one of
445
// source's packages.
446
type ChannelPackager struct {
447
        source lnwire.ShortChannelID
448
}
449

450
// NewChannelPackager creates a new packager for a single channel.
451
func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
20,789✔
452
        return &ChannelPackager{
20,789✔
453
                source: source,
20,789✔
454
        }
20,789✔
455
}
20,789✔
456

457
// AddFwdPkg writes a newly locked in forwarding package to disk.
458
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error { // nolint: dupl
2,904✔
459
        fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
2,904✔
460
        if err != nil {
2,904✔
461
                return err
×
462
        }
×
463

464
        source := makeLogKey(fwdPkg.Source.ToUint64())
2,904✔
465
        sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:])
2,904✔
466
        if err != nil {
2,904✔
467
                return err
×
468
        }
×
469

470
        heightKey := makeLogKey(fwdPkg.Height)
2,904✔
471
        heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:])
2,904✔
472
        if err != nil {
2,904✔
473
                return err
×
474
        }
×
475

476
        // Write ADD updates we received at this commit height.
477
        addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey)
2,904✔
478
        if err != nil {
2,904✔
479
                return err
×
480
        }
×
481

482
        // Write SETTLE/FAIL updates we received at this commit height.
483
        failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey)
2,904✔
484
        if err != nil {
2,904✔
485
                return err
×
486
        }
×
487

488
        for i := range fwdPkg.Adds {
4,749✔
489
                err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i])
1,845✔
490
                if err != nil {
1,845✔
491
                        return err
×
492
                }
×
493
        }
494

495
        // Persist the initialized pkg filter, which will be used to determine
496
        // when we can remove this forwarding package from disk.
497
        var ackFilterBuf bytes.Buffer
2,904✔
498
        if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil {
2,904✔
499
                return err
×
500
        }
×
501

502
        if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil {
2,904✔
503
                return err
×
504
        }
×
505

506
        for i := range fwdPkg.SettleFails {
3,745✔
507
                err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i])
841✔
508
                if err != nil {
841✔
509
                        return err
×
510
                }
×
511
        }
512

513
        var settleFailFilterBuf bytes.Buffer
2,904✔
514
        err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf)
2,904✔
515
        if err != nil {
2,904✔
516
                return err
×
517
        }
×
518

519
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
2,904✔
520
}
521

522
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
523
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
2,682✔
524
        var b bytes.Buffer
2,682✔
525
        if err := serializeLogUpdate(&b, htlc); err != nil {
2,682✔
526
                return err
×
527
        }
×
528

529
        return bkt.Put(uint16Key(idx), b.Bytes())
2,682✔
530
}
531

532
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
533
// processed, and returns their deserialized log updates in a map indexed by the
534
// remote commitment height at which the updates were locked in.
535
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.RTx) ([]*FwdPkg, error) {
481✔
536
        return loadChannelFwdPkgs(tx, p.source)
481✔
537
}
481✔
538

539
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
540
func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
608✔
541
        fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
608✔
542
        if fwdPkgBkt == nil {
614✔
543
                return nil, nil
6✔
544
        }
6✔
545

546
        sourceKey := makeLogKey(source.ToUint64())
602✔
547
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
602✔
548
        if sourceBkt == nil {
1,144✔
549
                return nil, nil
542✔
550
        }
542✔
551

552
        var heights []uint64
64✔
553
        if err := sourceBkt.ForEach(func(k, _ []byte) error {
1,101✔
554
                if len(k) != 8 {
1,037✔
555
                        return ErrCorruptedFwdPkg
×
556
                }
×
557

558
                heights = append(heights, byteOrder.Uint64(k))
1,037✔
559

1,037✔
560
                return nil
1,037✔
561
        }); err != nil {
×
562
                return nil, err
×
563
        }
×
564

565
        // Load the forwarding package for each retrieved height.
566
        fwdPkgs := make([]*FwdPkg, 0, len(heights))
64✔
567
        for _, height := range heights {
1,101✔
568
                fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
1,037✔
569
                if err != nil {
1,037✔
570
                        return nil, err
×
571
                }
×
572

573
                fwdPkgs = append(fwdPkgs, fwdPkg)
1,037✔
574
        }
575

576
        return fwdPkgs, nil
64✔
577
}
578

579
// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
580
// appropriate FwdState.
581
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
582
        height uint64) (*FwdPkg, error) {
1,037✔
583

1,037✔
584
        sourceKey := makeLogKey(source.ToUint64())
1,037✔
585
        sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
1,037✔
586
        if sourceBkt == nil {
1,037✔
587
                return nil, ErrCorruptedFwdPkg
×
588
        }
×
589

590
        heightKey := makeLogKey(height)
1,037✔
591
        heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
1,037✔
592
        if heightBkt == nil {
1,037✔
593
                return nil, ErrCorruptedFwdPkg
×
594
        }
×
595

596
        // Load ADDs from disk.
597
        addBkt := heightBkt.NestedReadBucket(addBucketKey)
1,037✔
598
        if addBkt == nil {
1,037✔
599
                return nil, ErrCorruptedFwdPkg
×
600
        }
×
601

602
        adds, err := loadHtlcs(addBkt)
1,037✔
603
        if err != nil {
1,037✔
604
                return nil, err
×
605
        }
×
606

607
        // Load ack filter from disk.
608
        ackFilterBytes := heightBkt.Get(ackFilterKey)
1,037✔
609
        if ackFilterBytes == nil {
1,037✔
610
                return nil, ErrCorruptedFwdPkg
×
611
        }
×
612
        ackFilterReader := bytes.NewReader(ackFilterBytes)
1,037✔
613

1,037✔
614
        ackFilter := &PkgFilter{}
1,037✔
615
        if err := ackFilter.Decode(ackFilterReader); err != nil {
1,037✔
616
                return nil, err
×
617
        }
×
618

619
        // Load SETTLE/FAILs from disk.
620
        failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
1,037✔
621
        if failSettleBkt == nil {
1,037✔
622
                return nil, ErrCorruptedFwdPkg
×
623
        }
×
624

625
        failSettles, err := loadHtlcs(failSettleBkt)
1,037✔
626
        if err != nil {
1,037✔
627
                return nil, err
×
628
        }
×
629

630
        // Load settle fail filter from disk.
631
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
1,037✔
632
        if settleFailFilterBytes == nil {
1,037✔
633
                return nil, ErrCorruptedFwdPkg
×
634
        }
×
635
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
1,037✔
636

1,037✔
637
        settleFailFilter := &PkgFilter{}
1,037✔
638
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
1,037✔
639
                return nil, err
×
640
        }
×
641

642
        // Initialize the fwding package, which always starts in the
643
        // FwdStateLockedIn. We can determine what state the package was left in
644
        // by examining constraints on the information loaded from disk.
645
        fwdPkg := &FwdPkg{
1,037✔
646
                Source:           source,
1,037✔
647
                State:            FwdStateLockedIn,
1,037✔
648
                Height:           height,
1,037✔
649
                Adds:             adds,
1,037✔
650
                AckFilter:        ackFilter,
1,037✔
651
                SettleFails:      failSettles,
1,037✔
652
                SettleFailFilter: settleFailFilter,
1,037✔
653
        }
1,037✔
654

1,037✔
655
        // Check to see if we have written the set exported filter adds to
1,037✔
656
        // disk. If we haven't, processing of this package was never started, or
1,037✔
657
        // failed during the last attempt.
1,037✔
658
        fwdFilterBytes := heightBkt.Get(fwdFilterKey)
1,037✔
659
        if fwdFilterBytes == nil {
1,050✔
660
                nAdds := uint16(len(adds))
13✔
661
                fwdPkg.FwdFilter = NewPkgFilter(nAdds)
13✔
662
                return fwdPkg, nil
13✔
663
        }
13✔
664

665
        fwdFilterReader := bytes.NewReader(fwdFilterBytes)
1,024✔
666
        fwdPkg.FwdFilter = &PkgFilter{}
1,024✔
667
        if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil {
1,024✔
668
                return nil, err
×
669
        }
×
670

671
        // Otherwise, a complete round of processing was completed, and we
672
        // advance the package to FwdStateProcessed.
673
        fwdPkg.State = FwdStateProcessed
1,024✔
674

1,024✔
675
        // If every add, settle, and fail has been fully acknowledged, we can
1,024✔
676
        // safely set the package's state to FwdStateCompleted, signalling that
1,024✔
677
        // it can be garbage collected.
1,024✔
678
        if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() {
1,930✔
679
                fwdPkg.State = FwdStateCompleted
906✔
680
        }
906✔
681

682
        return fwdPkg, nil
1,024✔
683
}
684

685
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
686
// them in order of the indexes they were written under.
687
func loadHtlcs(bkt kvdb.RBucket) ([]LogUpdate, error) {
2,070✔
688
        var htlcs []LogUpdate
2,070✔
689
        if err := bkt.ForEach(func(_, v []byte) error {
5,490✔
690
                htlc, err := deserializeLogUpdate(bytes.NewReader(v))
3,420✔
691
                if err != nil {
3,420✔
692
                        return err
×
693
                }
×
694

695
                htlcs = append(htlcs, *htlc)
3,420✔
696

3,420✔
697
                return nil
3,420✔
698
        }); err != nil {
×
699
                return nil, err
×
700
        }
×
701

702
        return htlcs, nil
2,070✔
703
}
704

705
// SetFwdFilter writes the set of indexes corresponding to Adds at the
706
// `height` that are to be forwarded to the switch. Calling this method causes
707
// the forwarding package at `height` to be in FwdStateProcessed. We write this
708
// forwarding decision so that we always arrive at the same behavior for HTLCs
709
// leaving this channel. After a restart, we skip validation of these Adds,
710
// since they are assumed to have already been validated, and make the switch or
711
// outgoing link responsible for handling replays.
712
func (p *ChannelPackager) SetFwdFilter(tx kvdb.RwTx, height uint64,
713
        fwdFilter *PkgFilter) error {
2,176✔
714

2,176✔
715
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
2,176✔
716
        if fwdPkgBkt == nil {
2,176✔
717
                return ErrCorruptedFwdPkg
×
718
        }
×
719

720
        source := makeLogKey(p.source.ToUint64())
2,176✔
721
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(source[:])
2,176✔
722
        if sourceBkt == nil {
2,176✔
723
                return ErrCorruptedFwdPkg
×
724
        }
×
725

726
        heightKey := makeLogKey(height)
2,176✔
727
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
2,176✔
728
        if heightBkt == nil {
2,176✔
729
                return ErrCorruptedFwdPkg
×
730
        }
×
731

732
        // If the fwd filter has already been written, we return early to avoid
733
        // modifying the persistent state.
734
        forwardedAddsBytes := heightBkt.Get(fwdFilterKey)
2,176✔
735
        if forwardedAddsBytes != nil {
2,176✔
736
                return nil
×
737
        }
×
738

739
        // Otherwise we serialize and write the provided fwd filter.
740
        var b bytes.Buffer
2,176✔
741
        if err := fwdFilter.Encode(&b); err != nil {
2,176✔
742
                return err
×
743
        }
×
744

745
        return heightBkt.Put(fwdFilterKey, b.Bytes())
2,176✔
746
}
747

748
// AckAddHtlcs accepts a list of references to add htlcs, and updates the
749
// AckAddFilter of those forwarding packages to indicate that a settle or fail
750
// has been received in response to the add.
751
func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error {
2,990✔
752
        if len(addRefs) == 0 {
5,208✔
753
                return nil
2,218✔
754
        }
2,218✔
755

756
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
776✔
757
        if fwdPkgBkt == nil {
776✔
758
                return ErrCorruptedFwdPkg
×
759
        }
×
760

761
        sourceKey := makeLogKey(p.source.ToUint64())
776✔
762
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceKey[:])
776✔
763
        if sourceBkt == nil {
776✔
764
                return ErrCorruptedFwdPkg
×
765
        }
×
766

767
        // Organize the forward references such that we just get a single slice
768
        // of indexes for each unique height.
769
        heightDiffs := make(map[uint64][]uint16)
776✔
770
        for _, addRef := range addRefs {
1,561✔
771
                heightDiffs[addRef.Height] = append(
785✔
772
                        heightDiffs[addRef.Height],
785✔
773
                        addRef.Index,
785✔
774
                )
785✔
775
        }
785✔
776

777
        // Load each height bucket once and remove all acked htlcs at that
778
        // height.
779
        for height, indexes := range heightDiffs {
1,552✔
780
                err := ackAddHtlcsAtHeight(sourceBkt, height, indexes)
776✔
781
                if err != nil {
776✔
782
                        return err
×
783
                }
×
784
        }
785

786
        return nil
776✔
787
}
788

789
// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package
790
// with a list of indexes, writing the resulting filter back in its place.
791
func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64,
792
        indexes []uint16) error {
776✔
793

776✔
794
        heightKey := makeLogKey(height)
776✔
795
        heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
776✔
796
        if heightBkt == nil {
776✔
797
                // If the height bucket isn't found, this could be because the
×
798
                // forwarding package was already removed. We'll return nil to
×
799
                // signal that the operation is successful, as there is nothing
×
800
                // to ack.
×
801
                return nil
×
802
        }
×
803

804
        // Load ack filter from disk.
805
        ackFilterBytes := heightBkt.Get(ackFilterKey)
776✔
806
        if ackFilterBytes == nil {
776✔
807
                return ErrCorruptedFwdPkg
×
808
        }
×
809

810
        ackFilter := &PkgFilter{}
776✔
811
        ackFilterReader := bytes.NewReader(ackFilterBytes)
776✔
812
        if err := ackFilter.Decode(ackFilterReader); err != nil {
776✔
813
                return err
×
814
        }
×
815

816
        // Update the ack filter for this height.
817
        for _, index := range indexes {
1,561✔
818
                ackFilter.Set(index)
785✔
819
        }
785✔
820

821
        // Write the resulting filter to disk.
822
        var ackFilterBuf bytes.Buffer
776✔
823
        if err := ackFilter.Encode(&ackFilterBuf); err != nil {
776✔
824
                return err
×
825
        }
×
826

827
        return heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes())
776✔
828
}
829

830
// AckSettleFails persistently acknowledges settles or fails from a remote forwarding
831
// package. This should only be called after the source of the Add has locked in
832
// the settle/fail, or it becomes otherwise safe to forgo retransmitting the
833
// settle/fail after a restart.
834
func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error {
2,989✔
835
        return ackSettleFails(tx, settleFailRefs)
2,989✔
836
}
2,989✔
837

838
// ackSettleFails persistently acknowledges a batch of settle fail references.
839
func ackSettleFails(tx kvdb.RwTx, settleFailRefs []SettleFailRef) error {
3,113✔
840
        if len(settleFailRefs) == 0 {
6,093✔
841
                return nil
2,980✔
842
        }
2,980✔
843

844
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
137✔
845
        if fwdPkgBkt == nil {
137✔
846
                return ErrCorruptedFwdPkg
×
847
        }
×
848

849
        // Organize the forward references such that we just get a single slice
850
        // of indexes for each unique destination-height pair.
851
        destHeightDiffs := make(map[lnwire.ShortChannelID]map[uint64][]uint16)
137✔
852
        for _, settleFailRef := range settleFailRefs {
707✔
853
                destHeights, ok := destHeightDiffs[settleFailRef.Source]
570✔
854
                if !ok {
707✔
855
                        destHeights = make(map[uint64][]uint16)
137✔
856
                        destHeightDiffs[settleFailRef.Source] = destHeights
137✔
857
                }
137✔
858

859
                destHeights[settleFailRef.Height] = append(
570✔
860
                        destHeights[settleFailRef.Height],
570✔
861
                        settleFailRef.Index,
570✔
862
                )
570✔
863
        }
864

865
        // With the references organized by destination and height, we now load
866
        // each remote bucket, and update the settle fail filter for any
867
        // settle/fail htlcs.
868
        for dest, destHeights := range destHeightDiffs {
274✔
869
                destKey := makeLogKey(dest.ToUint64())
137✔
870
                destBkt := fwdPkgBkt.NestedReadWriteBucket(destKey[:])
137✔
871
                if destBkt == nil {
144✔
872
                        // If the destination bucket is not found, this is
7✔
873
                        // likely the result of the destination channel being
7✔
874
                        // closed and having it's forwarding packages wiped. We
7✔
875
                        // won't treat this as an error, because the response
7✔
876
                        // will no longer be retransmitted internally.
7✔
877
                        continue
7✔
878
                }
879

880
                for height, indexes := range destHeights {
700✔
881
                        err := ackSettleFailsAtHeight(destBkt, height, indexes)
566✔
882
                        if err != nil {
566✔
883
                                return err
×
884
                        }
×
885
                }
886
        }
887

888
        return nil
137✔
889
}
890

891
// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes
892
// at particular a height by updating the settle fail filter.
893
func ackSettleFailsAtHeight(destBkt kvdb.RwBucket, height uint64,
894
        indexes []uint16) error {
566✔
895

566✔
896
        heightKey := makeLogKey(height)
566✔
897
        heightBkt := destBkt.NestedReadWriteBucket(heightKey[:])
566✔
898
        if heightBkt == nil {
566✔
899
                // If the height bucket isn't found, this could be because the
×
900
                // forwarding package was already removed. We'll return nil to
×
901
                // signal that the operation is as there is nothing to ack.
×
902
                return nil
×
903
        }
×
904

905
        // Load ack filter from disk.
906
        settleFailFilterBytes := heightBkt.Get(settleFailFilterKey)
566✔
907
        if settleFailFilterBytes == nil {
566✔
908
                return ErrCorruptedFwdPkg
×
909
        }
×
910

911
        settleFailFilter := &PkgFilter{}
566✔
912
        settleFailFilterReader := bytes.NewReader(settleFailFilterBytes)
566✔
913
        if err := settleFailFilter.Decode(settleFailFilterReader); err != nil {
566✔
914
                return err
×
915
        }
×
916

917
        // Update the ack filter for this height.
918
        for _, index := range indexes {
1,133✔
919
                settleFailFilter.Set(index)
567✔
920
        }
567✔
921

922
        // Write the resulting filter to disk.
923
        var settleFailFilterBuf bytes.Buffer
566✔
924
        if err := settleFailFilter.Encode(&settleFailFilterBuf); err != nil {
566✔
925
                return err
×
926
        }
×
927

928
        return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes())
566✔
929
}
930

931
// RemovePkg deletes the forwarding package at the given height from the
932
// packager's source bucket.
933
func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
897✔
934
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
897✔
935
        if fwdPkgBkt == nil {
897✔
936
                return nil
×
937
        }
×
938

939
        sourceBytes := makeLogKey(p.source.ToUint64())
897✔
940
        sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:])
897✔
941
        if sourceBkt == nil {
897✔
UNCOV
942
                return ErrCorruptedFwdPkg
×
UNCOV
943
        }
×
944

945
        heightKey := makeLogKey(height)
897✔
946

897✔
947
        return sourceBkt.DeleteNestedBucket(heightKey[:])
897✔
948
}
949

950
// Wipe deletes all the channel's forwarding packages, if any.
951
func (p *ChannelPackager) Wipe(tx kvdb.RwTx) error {
120✔
952
        // If the root bucket doesn't exist, there's no need to delete.
120✔
953
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
120✔
954
        if fwdPkgBkt == nil {
121✔
955
                return nil
1✔
956
        }
1✔
957

958
        sourceBytes := makeLogKey(p.source.ToUint64())
119✔
959

119✔
960
        // If the nested bucket doesn't exist, there's no need to delete.
119✔
961
        if fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:]) == nil {
232✔
962
                return nil
113✔
963
        }
113✔
964

965
        return fwdPkgBkt.DeleteNestedBucket(sourceBytes[:])
10✔
966
}
967

968
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
969
func uint16Key(i uint16) []byte {
2,682✔
970
        key := make([]byte, 2)
2,682✔
971
        byteOrder.PutUint16(key, i)
2,682✔
972
        return key
2,682✔
973
}
2,682✔
974

975
// Compile-time constraint to ensure that ChannelPackager implements the public
976
// FwdPackager interface.
977
var _ FwdPackager = (*ChannelPackager)(nil)
978

979
// Compile-time constraint to ensure that SwitchPackager implements the public
980
// FwdOperator interface.
981
var _ FwdOperator = (*SwitchPackager)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc