• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15031268339

14 May 2025 09:15PM UTC coverage: 58.592% (-10.4%) from 68.997%
15031268339

Pull #9801

github

web-flow
Merge 748c3fe22 into b0cba7dd0
Pull Request #9801: peer+lnd: add new CLI option to control if we D/C on slow pongs

5 of 79 new or added lines in 3 files covered. (6.33%)

28199 existing lines in 450 files now uncovered.

97428 of 166282 relevant lines covered (58.59%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.54
/channeldb/forwarding_log.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "io"
6
        "sort"
7
        "time"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
var (
15
        // forwardingLogBucket is the bucket that we'll use to store the
16
        // forwarding log. The forwarding log contains a time series database
17
        // of the forwarding history of a lightning daemon. Each key within the
18
        // bucket is a timestamp (in nano seconds since the unix epoch), and
19
        // the value a slice of a forwarding event for that timestamp.
20
        forwardingLogBucket = []byte("circuit-fwd-log")
21
)
22

23
const (
24
        // forwardingEventSize is the size of a forwarding event. The breakdown
25
        // is as follows:
26
        //
27
        //  * 8 byte incoming chan ID || 8 byte outgoing chan ID || 8 byte value in
28
        //    || 8 byte value out
29
        //
30
        // From the value in and value out, callers can easily compute the
31
        // total fee extract from a forwarding event.
32
        forwardingEventSize = 32
33

34
        // MaxResponseEvents is the max number of forwarding events that will
35
        // be returned by a single query response. This size was selected to
36
        // safely remain under gRPC's 4MiB message size response limit. As each
37
        // full forwarding event (including the timestamp) is 40 bytes, we can
38
        // safely return 50k entries in a single response.
39
        MaxResponseEvents = 50000
40
)
41

42
// ForwardingLog returns an instance of the ForwardingLog object backed by the
43
// target database instance.
44
func (d *DB) ForwardingLog() *ForwardingLog {
3✔
45
        return &ForwardingLog{
3✔
46
                db: d,
3✔
47
        }
3✔
48
}
3✔
49

50
// ForwardingLog is a time series database that logs the fulfillment of payment
51
// circuits by a lightning network daemon. The log contains a series of
52
// forwarding events which map a timestamp to a forwarding event. A forwarding
53
// event describes which channels were used to create+settle a circuit, and the
54
// amount involved. Subtracting the outgoing amount from the incoming amount
55
// reveals the fee charged for the forwarding service.
56
type ForwardingLog struct {
57
        db *DB
58
}
59

60
// ForwardingEvent is an event in the forwarding log's time series. Each
61
// forwarding event logs the creation and tear-down of a payment circuit. A
62
// circuit is created once an incoming HTLC has been fully forwarded, and
63
// destroyed once the payment has been settled.
64
type ForwardingEvent struct {
65
        // Timestamp is the settlement time of this payment circuit.
66
        Timestamp time.Time
67

68
        // IncomingChanID is the incoming channel ID of the payment circuit.
69
        IncomingChanID lnwire.ShortChannelID
70

71
        // OutgoingChanID is the outgoing channel ID of the payment circuit.
72
        OutgoingChanID lnwire.ShortChannelID
73

74
        // AmtIn is the amount of the incoming HTLC. Subtracting this from the
75
        // outgoing amount gives the total fees of this payment circuit.
76
        AmtIn lnwire.MilliSatoshi
77

78
        // AmtOut is the amount of the outgoing HTLC. Subtracting the incoming
79
        // amount from this gives the total fees for this payment circuit.
80
        AmtOut lnwire.MilliSatoshi
81
}
82

83
// encodeForwardingEvent writes out the target forwarding event to the passed
84
// io.Writer, using the expected DB format. Note that the timestamp isn't
85
// serialized as this will be the key value within the bucket.
86
func encodeForwardingEvent(w io.Writer, f *ForwardingEvent) error {
3✔
87
        return WriteElements(
3✔
88
                w, f.IncomingChanID, f.OutgoingChanID, f.AmtIn, f.AmtOut,
3✔
89
        )
3✔
90
}
3✔
91

92
// decodeForwardingEvent attempts to decode the raw bytes of a serialized
93
// forwarding event into the target ForwardingEvent. Note that the timestamp
94
// won't be decoded, as the caller is expected to set this due to the bucket
95
// structure of the forwarding log.
96
func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
3✔
97
        return ReadElements(
3✔
98
                r, &f.IncomingChanID, &f.OutgoingChanID, &f.AmtIn, &f.AmtOut,
3✔
99
        )
3✔
100
}
3✔
101

102
// AddForwardingEvents adds a series of forwarding events to the database.
103
// Before inserting, the set of events will be sorted according to their
104
// timestamp. This ensures that all writes to disk are sequential.
105
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
3✔
106
        // Before we create the database transaction, we'll ensure that the set
3✔
107
        // of forwarding events are properly sorted according to their
3✔
108
        // timestamp and that no duplicate timestamps exist to avoid collisions
3✔
109
        // in the key we are going to store the events under.
3✔
110
        makeUniqueTimestamps(events)
3✔
111

3✔
112
        var timestamp [8]byte
3✔
113

3✔
114
        return kvdb.Batch(f.db.Backend, func(tx kvdb.RwTx) error {
6✔
115
                // First, we'll fetch the bucket that stores our time series
3✔
116
                // log.
3✔
117
                logBucket, err := tx.CreateTopLevelBucket(
3✔
118
                        forwardingLogBucket,
3✔
119
                )
3✔
120
                if err != nil {
3✔
121
                        return err
×
122
                }
×
123

124
                // With the bucket obtained, we can now begin to write out the
125
                // series of events.
126
                for _, event := range events {
6✔
127
                        err := storeEvent(logBucket, event, timestamp[:])
3✔
128
                        if err != nil {
3✔
129
                                return err
×
130
                        }
×
131
                }
132

133
                return nil
3✔
134
        })
135
}
136

137
// storeEvent tries to store a forwarding event into the given bucket by trying
138
// to avoid collisions. If a key for the event timestamp already exists in the
139
// database, the timestamp is incremented in nanosecond intervals until a "free"
140
// slot is found.
141
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
142
        timestampScratchSpace []byte) error {
3✔
143

3✔
144
        // First, we'll serialize this timestamp into our
3✔
145
        // timestamp buffer.
3✔
146
        byteOrder.PutUint64(
3✔
147
                timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
3✔
148
        )
3✔
149

3✔
150
        // Next we'll loop until we find a "free" slot in the bucket to store
3✔
151
        // the event under. This should almost never happen unless we're running
3✔
152
        // on a system that has a very bad system clock that doesn't properly
3✔
153
        // resolve to nanosecond scale. We try up to 100 times (which would come
3✔
154
        // to a maximum shift of 0.1 microsecond which is acceptable for most
3✔
155
        // use cases). If we don't find a free slot, we just give up and let
3✔
156
        // the collision happen. Something must be wrong with the data in that
3✔
157
        // case, even on a very fast machine forwarding payments _will_ take a
3✔
158
        // few microseconds at least so we should find a nanosecond slot
3✔
159
        // somewhere.
3✔
160
        const maxTries = 100
3✔
161
        tries := 0
3✔
162
        for tries < maxTries {
6✔
163
                val := bucket.Get(timestampScratchSpace)
3✔
164
                if val == nil {
6✔
165
                        break
3✔
166
                }
167

168
                // Collision, try the next nanosecond timestamp.
UNCOV
169
                nextNano := event.Timestamp.UnixNano() + 1
×
UNCOV
170
                event.Timestamp = time.Unix(0, nextNano)
×
UNCOV
171
                byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
×
UNCOV
172
                tries++
×
173
        }
174

175
        // With the key encoded, we'll then encode the event
176
        // into our buffer, then write it out to disk.
177
        var eventBytes [forwardingEventSize]byte
3✔
178
        eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
3✔
179
        err := encodeForwardingEvent(eventBuf, &event)
3✔
180
        if err != nil {
3✔
181
                return err
×
182
        }
×
183
        return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
3✔
184
}
185

186
// ForwardingEventQuery represents a query to the forwarding log payment
187
// circuit time series database. The query allows a caller to retrieve all
188
// records for a particular time slice, offset in that time slice, limiting the
189
// total number of responses returned.
190
type ForwardingEventQuery struct {
191
        // StartTime is the start time of the time slice.
192
        StartTime time.Time
193

194
        // EndTime is the end time of the time slice.
195
        EndTime time.Time
196

197
        // IndexOffset is the offset within the time slice to start at. This
198
        // can be used to start the response at a particular record.
199
        IndexOffset uint32
200

201
        // NumMaxEvents is the max number of events to return.
202
        NumMaxEvents uint32
203
}
204

205
// ForwardingLogTimeSlice is the response to a forwarding query. It includes
206
// the original query, the set  events that match the query, and an integer
207
// which represents the offset index of the last item in the set of returned
208
// events. This integer allows callers to resume their query using this offset
209
// in the event that the query's response exceeds the max number of returnable
210
// events.
211
type ForwardingLogTimeSlice struct {
212
        ForwardingEventQuery
213

214
        // ForwardingEvents is the set of events in our time series that answer
215
        // the query embedded above.
216
        ForwardingEvents []ForwardingEvent
217

218
        // LastIndexOffset is the index of the last element in the set of
219
        // returned ForwardingEvents above. Callers can use this to resume
220
        // their query in the event that the time slice has too many events to
221
        // fit into a single response.
222
        LastIndexOffset uint32
223
}
224

225
// Query allows a caller to query the forwarding event time series for a
226
// particular time slice. The caller can control the precise time as well as
227
// the number of events to be returned.
228
//
229
// TODO(roasbeef): rename?
230
func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, error) {
3✔
231
        var resp ForwardingLogTimeSlice
3✔
232

3✔
233
        // If the user provided an index offset, then we'll not know how many
3✔
234
        // records we need to skip. We'll also keep track of the record offset
3✔
235
        // as that's part of the final return value.
3✔
236
        recordsToSkip := q.IndexOffset
3✔
237
        recordOffset := q.IndexOffset
3✔
238

3✔
239
        err := kvdb.View(f.db, func(tx kvdb.RTx) error {
6✔
240
                // If the bucket wasn't found, then there aren't any events to
3✔
241
                // be returned.
3✔
242
                logBucket := tx.ReadBucket(forwardingLogBucket)
3✔
243
                if logBucket == nil {
3✔
244
                        return ErrNoForwardingEvents
×
245
                }
×
246

247
                // We'll be using a cursor to seek into the database, so we'll
248
                // populate byte slices that represent the start of the key
249
                // space we're interested in, and the end.
250
                var startTime, endTime [8]byte
3✔
251
                byteOrder.PutUint64(startTime[:], uint64(q.StartTime.UnixNano()))
3✔
252
                byteOrder.PutUint64(endTime[:], uint64(q.EndTime.UnixNano()))
3✔
253

3✔
254
                // If we know that a set of log events exists, then we'll begin
3✔
255
                // our seek through the log in order to satisfy the query.
3✔
256
                // We'll continue until either we reach the end of the range,
3✔
257
                // or reach our max number of events.
3✔
258
                logCursor := logBucket.ReadCursor()
3✔
259
                timestamp, events := logCursor.Seek(startTime[:])
3✔
260
                for ; timestamp != nil && bytes.Compare(timestamp, endTime[:]) <= 0; timestamp, events = logCursor.Next() {
6✔
261
                        // If our current return payload exceeds the max number
3✔
262
                        // of events, then we'll exit now.
3✔
263
                        if uint32(len(resp.ForwardingEvents)) >= q.NumMaxEvents {
3✔
UNCOV
264
                                return nil
×
UNCOV
265
                        }
×
266

267
                        // If we're not yet past the user defined offset, then
268
                        // we'll continue to seek forward.
269
                        if recordsToSkip > 0 {
6✔
270
                                recordsToSkip--
3✔
271
                                continue
3✔
272
                        }
273

274
                        currentTime := time.Unix(
3✔
275
                                0, int64(byteOrder.Uint64(timestamp)),
3✔
276
                        )
3✔
277

3✔
278
                        // At this point, we've skipped enough records to start
3✔
279
                        // to collate our query. For each record, we'll
3✔
280
                        // increment the final record offset so the querier can
3✔
281
                        // utilize pagination to seek further.
3✔
282
                        readBuf := bytes.NewReader(events)
3✔
283
                        for readBuf.Len() != 0 {
6✔
284
                                var event ForwardingEvent
3✔
285
                                err := decodeForwardingEvent(readBuf, &event)
3✔
286
                                if err != nil {
3✔
287
                                        return err
×
288
                                }
×
289

290
                                event.Timestamp = currentTime
3✔
291
                                resp.ForwardingEvents = append(resp.ForwardingEvents, event)
3✔
292

3✔
293
                                recordOffset++
3✔
294
                        }
295
                }
296

297
                return nil
3✔
298
        }, func() {
3✔
299
                resp = ForwardingLogTimeSlice{
3✔
300
                        ForwardingEventQuery: q,
3✔
301
                }
3✔
302
        })
3✔
303
        if err != nil && err != ErrNoForwardingEvents {
3✔
304
                return ForwardingLogTimeSlice{}, err
×
305
        }
×
306

307
        resp.LastIndexOffset = recordOffset
3✔
308

3✔
309
        return resp, nil
3✔
310
}
311

312
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
313
// event timestamps and then makes sure there are no duplicates in the
314
// timestamps. If duplicates are found, some of the timestamps are increased on
315
// the nanosecond scale until only unique values remain. This is a fix to
316
// address the problem that in some environments (looking at you, Windows) the
317
// system clock has such a bad resolution that two serial invocations of
318
// time.Now() might return the same timestamp, even if some time has elapsed
319
// between the calls.
320
func makeUniqueTimestamps(events []ForwardingEvent) {
3✔
321
        sort.Slice(events, func(i, j int) bool {
6✔
322
                return events[i].Timestamp.Before(events[j].Timestamp)
3✔
323
        })
3✔
324

325
        // Now that we know the events are sorted by timestamp, we can go
326
        // through the list and fix all duplicates until only unique values
327
        // remain.
328
        for outer := 0; outer < len(events)-1; outer++ {
6✔
329
                current := events[outer].Timestamp.UnixNano()
3✔
330
                next := events[outer+1].Timestamp.UnixNano()
3✔
331

3✔
332
                // We initially sorted the slice. So if the current is now
3✔
333
                // greater or equal to the next one, it's either because it's a
3✔
334
                // duplicate or because we increased the current in the last
3✔
335
                // iteration.
3✔
336
                if current >= next {
3✔
UNCOV
337
                        next = current + 1
×
UNCOV
338
                        events[outer+1].Timestamp = time.Unix(0, next)
×
UNCOV
339
                }
×
340
        }
341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc