• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16918135633

12 Aug 2025 06:56PM UTC coverage: 56.955% (-9.9%) from 66.9%
16918135633

push

github

web-flow
Merge pull request #9871 from GeorgeTsagk/htlc-noop-add

Add `NoopAdd` HTLCs

48 of 147 new or added lines in 3 files covered. (32.65%)

29154 existing lines in 462 files now uncovered.

98265 of 172532 relevant lines covered (56.95%)

1.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.41
/channeldb/forwarding_log.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "io"
7
        "sort"
8
        "time"
9

10
        "github.com/btcsuite/btcwallet/walletdb"
11
        "github.com/lightningnetwork/lnd/fn/v2"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
)
15

16
var (
17
        // forwardingLogBucket is the bucket that we'll use to store the
18
        // forwarding log. The forwarding log contains a time series database
19
        // of the forwarding history of a lightning daemon. Each key within the
20
        // bucket is a timestamp (in nano seconds since the unix epoch), and
21
        // the value a slice of a forwarding event for that timestamp.
22
        forwardingLogBucket = []byte("circuit-fwd-log")
23
)
24

25
const (
26
        // forwardingEventSize is the size of a forwarding event. The breakdown
27
        // is as follows:
28
        //
29
        //  * 8 byte incoming chan ID || 8 byte outgoing chan ID || 8 byte value in
30
        //    || 8 byte value out || 8 byte incoming htlc id || 8 byte
31
        //    outgoing htlc id
32
        //
33
        // From the value in and value out, callers can easily compute the
34
        // total fee extract from a forwarding event.
35
        forwardingEventSize = 48
36

37
        // MaxResponseEvents is the max number of forwarding events that will
38
        // be returned by a single query response. This size was selected to
39
        // safely remain under gRPC's 4MiB message size response limit. As each
40
        // full forwarding event (including the timestamp) is 40 bytes, we can
41
        // safely return 50k entries in a single response.
42
        MaxResponseEvents = 50000
43
)
44

45
// ForwardingLog returns an instance of the ForwardingLog object backed by the
46
// target database instance.
47
func (d *DB) ForwardingLog() *ForwardingLog {
2✔
48
        return &ForwardingLog{
2✔
49
                db: d,
2✔
50
        }
2✔
51
}
2✔
52

53
// ForwardingLog is a time series database that logs the fulfillment of payment
54
// circuits by a lightning network daemon. The log contains a series of
55
// forwarding events which map a timestamp to a forwarding event. A forwarding
56
// event describes which channels were used to create+settle a circuit, and the
57
// amount involved. Subtracting the outgoing amount from the incoming amount
58
// reveals the fee charged for the forwarding service.
59
type ForwardingLog struct {
60
        db *DB
61
}
62

63
// ForwardingEvent is an event in the forwarding log's time series. Each
64
// forwarding event logs the creation and tear-down of a payment circuit. A
65
// circuit is created once an incoming HTLC has been fully forwarded, and
66
// destroyed once the payment has been settled.
67
type ForwardingEvent struct {
68
        // Timestamp is the settlement time of this payment circuit.
69
        Timestamp time.Time
70

71
        // IncomingChanID is the incoming channel ID of the payment circuit.
72
        IncomingChanID lnwire.ShortChannelID
73

74
        // OutgoingChanID is the outgoing channel ID of the payment circuit.
75
        OutgoingChanID lnwire.ShortChannelID
76

77
        // AmtIn is the amount of the incoming HTLC. Subtracting this from the
78
        // outgoing amount gives the total fees of this payment circuit.
79
        AmtIn lnwire.MilliSatoshi
80

81
        // AmtOut is the amount of the outgoing HTLC. Subtracting the incoming
82
        // amount from this gives the total fees for this payment circuit.
83
        AmtOut lnwire.MilliSatoshi
84

85
        // IncomingHtlcID is the ID of the incoming HTLC in the payment circuit.
86
        // If this is not set, the value will be nil. This field is added in
87
        // v0.20 and is made optional to make it backward compatible with
88
        // existing forwarding events created before it's introduction.
89
        IncomingHtlcID fn.Option[uint64]
90

91
        // OutgoingHtlcID is the ID of the outgoing HTLC in the payment circuit.
92
        // If this is not set, the value will be nil. This field is added in
93
        // v0.20 and is made optional to make it backward compatible with
94
        // existing forwarding events created before it's introduction.
95
        OutgoingHtlcID fn.Option[uint64]
96
}
97

98
// encodeForwardingEvent writes out the target forwarding event to the passed
99
// io.Writer, using the expected DB format. Note that the timestamp isn't
100
// serialized as this will be the key value within the bucket.
101
func encodeForwardingEvent(w io.Writer, f *ForwardingEvent) error {
2✔
102
        // We check for the HTLC IDs if they are set. If they are not,
2✔
103
        // from v0.20 upward, we return an error to make it clear they are
2✔
104
        // required.
2✔
105
        incomingID, err := f.IncomingHtlcID.UnwrapOrErr(
2✔
106
                errors.New("incoming HTLC ID must be set"),
2✔
107
        )
2✔
108
        if err != nil {
2✔
109
                return err
×
110
        }
×
111

112
        outgoingID, err := f.OutgoingHtlcID.UnwrapOrErr(
2✔
113
                errors.New("outgoing HTLC ID must be set"),
2✔
114
        )
2✔
115
        if err != nil {
2✔
116
                return err
×
117
        }
×
118

119
        return WriteElements(
2✔
120
                w, f.IncomingChanID, f.OutgoingChanID, f.AmtIn, f.AmtOut,
2✔
121
                incomingID, outgoingID,
2✔
122
        )
2✔
123
}
124

125
// decodeForwardingEvent attempts to decode the raw bytes of a serialized
126
// forwarding event into the target ForwardingEvent. Note that the timestamp
127
// won't be decoded, as the caller is expected to set this due to the bucket
128
// structure of the forwarding log.
129
func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
2✔
130
        // Decode the original fields of the forwarding event.
2✔
131
        err := ReadElements(
2✔
132
                r, &f.IncomingChanID, &f.OutgoingChanID, &f.AmtIn, &f.AmtOut,
2✔
133
        )
2✔
134
        if err != nil {
2✔
135
                return err
×
136
        }
×
137

138
        // Decode the incoming and outgoing htlc IDs. For backward compatibility
139
        // with older records that don't have these fields, we handle EOF by
140
        // setting the ID to nil. Any other error is treated as a read failure.
141
        var incomingHtlcID, outgoingHtlcID uint64
2✔
142
        err = ReadElements(r, &incomingHtlcID, &outgoingHtlcID)
2✔
143
        switch {
2✔
144
        case err == nil:
2✔
145
                f.IncomingHtlcID = fn.Some(incomingHtlcID)
2✔
146
                f.OutgoingHtlcID = fn.Some(outgoingHtlcID)
2✔
147

2✔
148
                return nil
2✔
149

UNCOV
150
        case errors.Is(err, io.EOF):
×
UNCOV
151
                return nil
×
152

153
        default:
×
154
                return err
×
155
        }
156
}
157

158
// AddForwardingEvents adds a series of forwarding events to the database.
159
// Before inserting, the set of events will be sorted according to their
160
// timestamp. This ensures that all writes to disk are sequential.
161
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
2✔
162
        // Before we create the database transaction, we'll ensure that the set
2✔
163
        // of forwarding events are properly sorted according to their
2✔
164
        // timestamp and that no duplicate timestamps exist to avoid collisions
2✔
165
        // in the key we are going to store the events under.
2✔
166
        makeUniqueTimestamps(events)
2✔
167

2✔
168
        var timestamp [8]byte
2✔
169

2✔
170
        return kvdb.Batch(f.db.Backend, func(tx kvdb.RwTx) error {
4✔
171
                // First, we'll fetch the bucket that stores our time series
2✔
172
                // log.
2✔
173
                logBucket, err := tx.CreateTopLevelBucket(
2✔
174
                        forwardingLogBucket,
2✔
175
                )
2✔
176
                if err != nil {
2✔
177
                        return err
×
178
                }
×
179

180
                // With the bucket obtained, we can now begin to write out the
181
                // series of events.
182
                for _, event := range events {
4✔
183
                        err := storeEvent(logBucket, event, timestamp[:])
2✔
184
                        if err != nil {
2✔
185
                                return err
×
186
                        }
×
187
                }
188

189
                return nil
2✔
190
        })
191
}
192

193
// storeEvent tries to store a forwarding event into the given bucket by trying
194
// to avoid collisions. If a key for the event timestamp already exists in the
195
// database, the timestamp is incremented in nanosecond intervals until a "free"
196
// slot is found.
197
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
198
        timestampScratchSpace []byte) error {
2✔
199

2✔
200
        // First, we'll serialize this timestamp into our
2✔
201
        // timestamp buffer.
2✔
202
        byteOrder.PutUint64(
2✔
203
                timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
2✔
204
        )
2✔
205

2✔
206
        // Next we'll loop until we find a "free" slot in the bucket to store
2✔
207
        // the event under. This should almost never happen unless we're running
2✔
208
        // on a system that has a very bad system clock that doesn't properly
2✔
209
        // resolve to nanosecond scale. We try up to 100 times (which would come
2✔
210
        // to a maximum shift of 0.1 microsecond which is acceptable for most
2✔
211
        // use cases). If we don't find a free slot, we just give up and let
2✔
212
        // the collision happen. Something must be wrong with the data in that
2✔
213
        // case, even on a very fast machine forwarding payments _will_ take a
2✔
214
        // few microseconds at least so we should find a nanosecond slot
2✔
215
        // somewhere.
2✔
216
        const maxTries = 100
2✔
217
        tries := 0
2✔
218
        for tries < maxTries {
4✔
219
                val := bucket.Get(timestampScratchSpace)
2✔
220
                if val == nil {
4✔
221
                        break
2✔
222
                }
223

224
                // Collision, try the next nanosecond timestamp.
UNCOV
225
                nextNano := event.Timestamp.UnixNano() + 1
×
UNCOV
226
                event.Timestamp = time.Unix(0, nextNano)
×
UNCOV
227
                byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
×
UNCOV
228
                tries++
×
229
        }
230

231
        // With the key encoded, we'll then encode the event
232
        // into our buffer, then write it out to disk.
233
        var eventBytes [forwardingEventSize]byte
2✔
234
        eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
2✔
235
        err := encodeForwardingEvent(eventBuf, &event)
2✔
236
        if err != nil {
2✔
237
                return err
×
238
        }
×
239
        return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
2✔
240
}
241

242
// ForwardingEventQuery represents a query to the forwarding log payment
243
// circuit time series database. The query allows a caller to retrieve all
244
// records for a particular time slice, offset in that time slice, limiting the
245
// total number of responses returned.
246
type ForwardingEventQuery struct {
247
        // StartTime is the start time of the time slice.
248
        StartTime time.Time
249

250
        // EndTime is the end time of the time slice.
251
        EndTime time.Time
252

253
        // IndexOffset is the offset within the time slice to start at. This
254
        // can be used to start the response at a particular record.
255
        IndexOffset uint32
256

257
        // NumMaxEvents is the max number of events to return.
258
        NumMaxEvents uint32
259

260
        // IncomingChanIds is the list of channels to filter HTLCs being
261
        // received from a particular channel.
262
        // If the list is empty, then it is ignored.
263
        IncomingChanIDs fn.Set[uint64]
264

265
        // OutgoingChanIds is the list of channels to filter HTLCs being
266
        // forwarded to a particular channel.
267
        // If the list is empty, then it is ignored.
268
        OutgoingChanIDs fn.Set[uint64]
269
}
270

271
// ForwardingLogTimeSlice is the response to a forwarding query. It includes
272
// the original query, the set  events that match the query, and an integer
273
// which represents the offset index of the last item in the set of returned
274
// events. This integer allows callers to resume their query using this offset
275
// in the event that the query's response exceeds the max number of returnable
276
// events.
277
type ForwardingLogTimeSlice struct {
278
        ForwardingEventQuery
279

280
        // ForwardingEvents is the set of events in our time series that answer
281
        // the query embedded above.
282
        ForwardingEvents []ForwardingEvent
283

284
        // LastIndexOffset is the index of the last element in the set of
285
        // returned ForwardingEvents above. Callers can use this to resume
286
        // their query in the event that the time slice has too many events to
287
        // fit into a single response.
288
        LastIndexOffset uint32
289
}
290

291
// Query allows a caller to query the forwarding event time series for a
292
// particular time slice. The caller can control the precise time as well as
293
// the number of events to be returned.
294
//
295
// TODO(roasbeef): rename?
296
func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice,
297
        error) {
2✔
298

2✔
299
        var resp ForwardingLogTimeSlice
2✔
300

2✔
301
        // If the user provided an index offset, then we'll not know how many
2✔
302
        // records we need to skip. We'll also keep track of the record offset
2✔
303
        // as that's part of the final return value.
2✔
304
        recordsToSkip := q.IndexOffset
2✔
305
        recordOffset := q.IndexOffset
2✔
306

2✔
307
        err := kvdb.View(f.db, func(tx kvdb.RTx) error {
4✔
308
                // If the bucket wasn't found, then there aren't any events to
2✔
309
                // be returned.
2✔
310
                logBucket := tx.ReadBucket(forwardingLogBucket)
2✔
311
                if logBucket == nil {
2✔
312
                        return ErrNoForwardingEvents
×
313
                }
×
314

315
                // We'll be using a cursor to seek into the database, so we'll
316
                // populate byte slices that represent the start of the key
317
                // space we're interested in, and the end.
318
                var startTime, endTime [8]byte
2✔
319
                byteOrder.PutUint64(startTime[:], uint64(q.StartTime.UnixNano()))
2✔
320
                byteOrder.PutUint64(endTime[:], uint64(q.EndTime.UnixNano()))
2✔
321

2✔
322
                // If we know that a set of log events exists, then we'll begin
2✔
323
                // our seek through the log in order to satisfy the query.
2✔
324
                // We'll continue until either we reach the end of the range,
2✔
325
                // or reach our max number of events.
2✔
326
                logCursor := logBucket.ReadCursor()
2✔
327
                timestamp, eventBytes := logCursor.Seek(startTime[:])
2✔
328
                //nolint:ll
2✔
329
                for ; timestamp != nil && bytes.Compare(timestamp, endTime[:]) <= 0; timestamp, eventBytes = logCursor.Next() {
4✔
330
                        // If our current return payload exceeds the max number
2✔
331
                        // of events, then we'll exit now.
2✔
332
                        if uint32(len(resp.ForwardingEvents)) >= q.NumMaxEvents {
2✔
UNCOV
333
                                return nil
×
UNCOV
334
                        }
×
335

336
                        // If no incoming or outgoing channel IDs were provided
337
                        // and we're not yet past the user defined offset, then
338
                        // we'll continue to seek forward.
339
                        if recordsToSkip > 0 &&
2✔
340
                                q.IncomingChanIDs.IsEmpty() &&
2✔
341
                                q.OutgoingChanIDs.IsEmpty() {
4✔
342

2✔
343
                                recordsToSkip--
2✔
344
                                continue
2✔
345
                        }
346

347
                        // At this point, we've skipped enough records to start
348
                        // to collate our query. For each record, we'll
349
                        // increment the final record offset so the querier can
350
                        // utilize pagination to seek further.
351
                        readBuf := bytes.NewReader(eventBytes)
2✔
352
                        if readBuf.Len() == 0 {
2✔
353
                                continue
×
354
                        }
355

356
                        currentTime := time.Unix(
2✔
357
                                0, int64(byteOrder.Uint64(timestamp)),
2✔
358
                        )
2✔
359

2✔
360
                        var event ForwardingEvent
2✔
361
                        err := decodeForwardingEvent(readBuf, &event)
2✔
362
                        if err != nil {
2✔
363
                                return err
×
364
                        }
×
365

366
                        // Check if the incoming channel ID matches the
367
                        // filter criteria. Either no filtering is
368
                        // applied (IsEmpty), or the ID is explicitly
369
                        // included.
370
                        incomingMatch := q.IncomingChanIDs.IsEmpty() ||
2✔
371
                                q.IncomingChanIDs.Contains(
2✔
372
                                        event.IncomingChanID.ToUint64(),
2✔
373
                                )
2✔
374

2✔
375
                        // Check if the outgoing channel ID matches the
2✔
376
                        // filter criteria. Either no filtering is
2✔
377
                        // applied (IsEmpty), or  the ID is explicitly
2✔
378
                        // included.
2✔
379
                        outgoingMatch := q.OutgoingChanIDs.IsEmpty() ||
2✔
380
                                q.OutgoingChanIDs.Contains(
2✔
381
                                        event.OutgoingChanID.ToUint64(),
2✔
382
                                )
2✔
383

2✔
384
                        // Skip this event if it doesn't match the
2✔
385
                        // filters.
2✔
386
                        if !incomingMatch || !outgoingMatch {
2✔
UNCOV
387
                                continue
×
388
                        }
389
                        // If we're not yet past the user defined offset
390
                        // then we'll continue to seek forward.
391
                        if recordsToSkip > 0 {
2✔
392
                                recordsToSkip--
×
393
                                continue
×
394
                        }
395

396
                        event.Timestamp = currentTime
2✔
397
                        resp.ForwardingEvents = append(
2✔
398
                                resp.ForwardingEvents,
2✔
399
                                event,
2✔
400
                        )
2✔
401
                        recordOffset++
2✔
402
                }
403

404
                return nil
2✔
405
        }, func() {
2✔
406
                resp = ForwardingLogTimeSlice{
2✔
407
                        ForwardingEventQuery: q,
2✔
408
                }
2✔
409
        })
2✔
410
        if err != nil && err != ErrNoForwardingEvents {
2✔
411
                return ForwardingLogTimeSlice{}, err
×
412
        }
×
413

414
        resp.LastIndexOffset = recordOffset
2✔
415

2✔
416
        return resp, nil
2✔
417
}
418

419
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
420
// event timestamps and then makes sure there are no duplicates in the
421
// timestamps. If duplicates are found, some of the timestamps are increased on
422
// the nanosecond scale until only unique values remain. This is a fix to
423
// address the problem that in some environments (looking at you, Windows) the
424
// system clock has such a bad resolution that two serial invocations of
425
// time.Now() might return the same timestamp, even if some time has elapsed
426
// between the calls.
427
func makeUniqueTimestamps(events []ForwardingEvent) {
2✔
428
        sort.Slice(events, func(i, j int) bool {
4✔
429
                return events[i].Timestamp.Before(events[j].Timestamp)
2✔
430
        })
2✔
431

432
        // Now that we know the events are sorted by timestamp, we can go
433
        // through the list and fix all duplicates until only unique values
434
        // remain.
435
        for outer := 0; outer < len(events)-1; outer++ {
4✔
436
                current := events[outer].Timestamp.UnixNano()
2✔
437
                next := events[outer+1].Timestamp.UnixNano()
2✔
438

2✔
439
                // We initially sorted the slice. So if the current is now
2✔
440
                // greater or equal to the next one, it's either because it's a
2✔
441
                // duplicate or because we increased the current in the last
2✔
442
                // iteration.
2✔
443
                if current >= next {
2✔
UNCOV
444
                        next = current + 1
×
UNCOV
445
                        events[outer+1].Timestamp = time.Unix(0, next)
×
UNCOV
446
                }
×
447
        }
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc