• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12313112765

13 Dec 2024 09:20AM UTC coverage: 58.62% (+1.1%) from 57.486%
12313112765

Pull #9356

github

funyug
lnrpc: add filters to forwardhistoryrequest

This commit adds incoming and outgoing channel ids filter to forwarding history request to filter events received/forwarded from/to a particular channel
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

43 of 71 new or added lines in 4 files covered. (60.56%)

23 existing lines in 7 files now uncovered.

134438 of 229338 relevant lines covered (58.62%)

19176.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.35
/channeldb/forwarding_log.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "io"
6
        "sort"
7
        "time"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/fn/v2"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
)
14

15
var (
16
        // forwardingLogBucket is the bucket that we'll use to store the
17
        // forwarding log. The forwarding log contains a time series database
18
        // of the forwarding history of a lightning daemon. Each key within the
19
        // bucket is a timestamp (in nano seconds since the unix epoch), and
20
        // the value a slice of a forwarding event for that timestamp.
21
        forwardingLogBucket = []byte("circuit-fwd-log")
22
)
23

24
const (
25
        // forwardingEventSize is the size of a forwarding event. The breakdown
26
        // is as follows:
27
        //
28
        //  * 8 byte incoming chan ID || 8 byte outgoing chan ID || 8 byte value in
29
        //    || 8 byte value out
30
        //
31
        // From the value in and value out, callers can easily compute the
32
        // total fee extract from a forwarding event.
33
        forwardingEventSize = 32
34

35
        // MaxResponseEvents is the max number of forwarding events that will
36
        // be returned by a single query response. This size was selected to
37
        // safely remain under gRPC's 4MiB message size response limit. As each
38
        // full forwarding event (including the timestamp) is 40 bytes, we can
39
        // safely return 50k entries in a single response.
40
        MaxResponseEvents = 50000
41
)
42

43
// ForwardingLog returns an instance of the ForwardingLog object backed by the
44
// target database instance.
45
func (d *DB) ForwardingLog() *ForwardingLog {
4✔
46
        return &ForwardingLog{
4✔
47
                db: d,
4✔
48
        }
4✔
49
}
4✔
50

51
// ForwardingLog is a time series database that logs the fulfillment of payment
52
// circuits by a lightning network daemon. The log contains a series of
53
// forwarding events which map a timestamp to a forwarding event. A forwarding
54
// event describes which channels were used to create+settle a circuit, and the
55
// amount involved. Subtracting the outgoing amount from the incoming amount
56
// reveals the fee charged for the forwarding service.
57
type ForwardingLog struct {
58
        db *DB
59
}
60

61
// ForwardingEvent is an event in the forwarding log's time series. Each
62
// forwarding event logs the creation and tear-down of a payment circuit. A
63
// circuit is created once an incoming HTLC has been fully forwarded, and
64
// destroyed once the payment has been settled.
65
type ForwardingEvent struct {
66
        // Timestamp is the settlement time of this payment circuit.
67
        Timestamp time.Time
68

69
        // IncomingChanID is the incoming channel ID of the payment circuit.
70
        IncomingChanID lnwire.ShortChannelID
71

72
        // OutgoingChanID is the outgoing channel ID of the payment circuit.
73
        OutgoingChanID lnwire.ShortChannelID
74

75
        // AmtIn is the amount of the incoming HTLC. Subtracting this from the
76
        // outgoing amount gives the total fees of this payment circuit.
77
        AmtIn lnwire.MilliSatoshi
78

79
        // AmtOut is the amount of the outgoing HTLC. Subtracting the incoming
80
        // amount from this gives the total fees for this payment circuit.
81
        AmtOut lnwire.MilliSatoshi
82
}
83

84
// encodeForwardingEvent writes out the target forwarding event to the passed
85
// io.Writer, using the expected DB format. Note that the timestamp isn't
86
// serialized as this will be the key value within the bucket.
87
func encodeForwardingEvent(w io.Writer, f *ForwardingEvent) error {
384✔
88
        return WriteElements(
384✔
89
                w, f.IncomingChanID, f.OutgoingChanID, f.AmtIn, f.AmtOut,
384✔
90
        )
384✔
91
}
384✔
92

93
// decodeForwardingEvent attempts to decode the raw bytes of a serialized
94
// forwarding event into the target ForwardingEvent. Note that the timestamp
95
// won't be decoded, as the caller is expected to set this due to the bucket
96
// structure of the forwarding log.
97
func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
284✔
98
        return ReadElements(
284✔
99
                r, &f.IncomingChanID, &f.OutgoingChanID, &f.AmtIn, &f.AmtOut,
284✔
100
        )
284✔
101
}
284✔
102

103
// AddForwardingEvents adds a series of forwarding events to the database.
104
// Before inserting, the set of events will be sorted according to their
105
// timestamp. This ensures that all writes to disk are sequential.
106
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
11✔
107
        // Before we create the database transaction, we'll ensure that the set
11✔
108
        // of forwarding events are properly sorted according to their
11✔
109
        // timestamp and that no duplicate timestamps exist to avoid collisions
11✔
110
        // in the key we are going to store the events under.
11✔
111
        makeUniqueTimestamps(events)
11✔
112

11✔
113
        var timestamp [8]byte
11✔
114

11✔
115
        return kvdb.Batch(f.db.Backend, func(tx kvdb.RwTx) error {
22✔
116
                // First, we'll fetch the bucket that stores our time series
11✔
117
                // log.
11✔
118
                logBucket, err := tx.CreateTopLevelBucket(
11✔
119
                        forwardingLogBucket,
11✔
120
                )
11✔
121
                if err != nil {
11✔
122
                        return err
×
123
                }
×
124

125
                // With the bucket obtained, we can now begin to write out the
126
                // series of events.
127
                for _, event := range events {
395✔
128
                        err := storeEvent(logBucket, event, timestamp[:])
384✔
129
                        if err != nil {
384✔
130
                                return err
×
131
                        }
×
132
                }
133

134
                return nil
11✔
135
        })
136
}
137

138
// storeEvent tries to store a forwarding event into the given bucket by trying
139
// to avoid collisions. If a key for the event timestamp already exists in the
140
// database, the timestamp is incremented in nanosecond intervals until a "free"
141
// slot is found.
142
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
143
        timestampScratchSpace []byte) error {
384✔
144

384✔
145
        // First, we'll serialize this timestamp into our
384✔
146
        // timestamp buffer.
384✔
147
        byteOrder.PutUint64(
384✔
148
                timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
384✔
149
        )
384✔
150

384✔
151
        // Next we'll loop until we find a "free" slot in the bucket to store
384✔
152
        // the event under. This should almost never happen unless we're running
384✔
153
        // on a system that has a very bad system clock that doesn't properly
384✔
154
        // resolve to nanosecond scale. We try up to 100 times (which would come
384✔
155
        // to a maximum shift of 0.1 microsecond which is acceptable for most
384✔
156
        // use cases). If we don't find a free slot, we just give up and let
384✔
157
        // the collision happen. Something must be wrong with the data in that
384✔
158
        // case, even on a very fast machine forwarding payments _will_ take a
384✔
159
        // few microseconds at least so we should find a nanosecond slot
384✔
160
        // somewhere.
384✔
161
        const maxTries = 100
384✔
162
        tries := 0
384✔
163
        for tries < maxTries {
1,168✔
164
                val := bucket.Get(timestampScratchSpace)
784✔
165
                if val == nil {
1,168✔
166
                        break
384✔
167
                }
168

169
                // Collision, try the next nanosecond timestamp.
170
                nextNano := event.Timestamp.UnixNano() + 1
400✔
171
                event.Timestamp = time.Unix(0, nextNano)
400✔
172
                byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
400✔
173
                tries++
400✔
174
        }
175

176
        // With the key encoded, we'll then encode the event
177
        // into our buffer, then write it out to disk.
178
        var eventBytes [forwardingEventSize]byte
384✔
179
        eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
384✔
180
        err := encodeForwardingEvent(eventBuf, &event)
384✔
181
        if err != nil {
384✔
182
                return err
×
183
        }
×
184
        return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
384✔
185
}
186

187
// ForwardingEventQuery represents a query to the forwarding log payment
188
// circuit time series database. The query allows a caller to retrieve all
189
// records for a particular time slice, offset in that time slice, limiting the
190
// total number of responses returned.
191
type ForwardingEventQuery struct {
192
        // StartTime is the start time of the time slice.
193
        StartTime time.Time
194

195
        // EndTime is the end time of the time slice.
196
        EndTime time.Time
197

198
        // IndexOffset is the offset within the time slice to start at. This
199
        // can be used to start the response at a particular record.
200
        IndexOffset uint32
201

202
        // NumMaxEvents is the max number of events to return.
203
        NumMaxEvents uint32
204

205
        // IncomingChanIds is the list of channels to filter HTLCs being
206
        // received from a particular channel.
207
        // If the list is empty, then it is ignored.
208
        IncomingChanIDs fn.Set[uint64]
209

210
        // OutgoingChanIds is the list of channels to filter HTLCs being
211
        // forwarded to a particular channel.
212
        // If the list is empty, then it is ignored.
213
        OutgoingChanIDs fn.Set[uint64]
214
}
215

216
// ForwardingLogTimeSlice is the response to a forwarding query. It includes
217
// the original query, the set  events that match the query, and an integer
218
// which represents the offset index of the last item in the set of returned
219
// events. This integer allows callers to resume their query using this offset
220
// in the event that the query's response exceeds the max number of returnable
221
// events.
222
type ForwardingLogTimeSlice struct {
223
        ForwardingEventQuery
224

225
        // ForwardingEvents is the set of events in our time series that answer
226
        // the query embedded above.
227
        ForwardingEvents []ForwardingEvent
228

229
        // LastIndexOffset is the index of the last element in the set of
230
        // returned ForwardingEvents above. Callers can use this to resume
231
        // their query in the event that the time slice has too many events to
232
        // fit into a single response.
233
        LastIndexOffset uint32
234
}
235

236
// Query allows a caller to query the forwarding event time series for a
237
// particular time slice. The caller can control the precise time as well as
238
// the number of events to be returned.
239
//
240
// TODO(roasbeef): rename?
241
func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, error) {
11✔
242
        var resp ForwardingLogTimeSlice
11✔
243

11✔
244
        // If the user provided an index offset, then we'll not know how many
11✔
245
        // records we need to skip. We'll also keep track of the record offset
11✔
246
        // as that's part of the final return value.
11✔
247
        recordsToSkip := q.IndexOffset
11✔
248
        recordOffset := q.IndexOffset
11✔
249

11✔
250
        err := kvdb.View(f.db, func(tx kvdb.RTx) error {
22✔
251
                // If the bucket wasn't found, then there aren't any events to
11✔
252
                // be returned.
11✔
253
                logBucket := tx.ReadBucket(forwardingLogBucket)
11✔
254
                if logBucket == nil {
11✔
255
                        return ErrNoForwardingEvents
×
256
                }
×
257

258
                // We'll be using a cursor to seek into the database, so we'll
259
                // populate byte slices that represent the start of the key
260
                // space we're interested in, and the end.
261
                var startTime, endTime [8]byte
11✔
262
                byteOrder.PutUint64(startTime[:], uint64(q.StartTime.UnixNano()))
11✔
263
                byteOrder.PutUint64(endTime[:], uint64(q.EndTime.UnixNano()))
11✔
264

11✔
265
                // If we know that a set of log events exists, then we'll begin
11✔
266
                // our seek through the log in order to satisfy the query.
11✔
267
                // We'll continue until either we reach the end of the range,
11✔
268
                // or reach our max number of events.
11✔
269
                logCursor := logBucket.ReadCursor()
11✔
270
                timestamp, events := logCursor.Seek(startTime[:])
11✔
271
                for ; timestamp != nil && bytes.Compare(timestamp, endTime[:]) <= 0; timestamp, events = logCursor.Next() {
307✔
272
                        // If our current return payload exceeds the max number
296✔
273
                        // of events, then we'll exit now.
296✔
274
                        if uint32(len(resp.ForwardingEvents)) >= q.NumMaxEvents {
298✔
275
                                return nil
2✔
276
                        }
2✔
277

278
                        // If no incoming or outgoing channel IDs were provided
279
                        // and we're not yet past the user defined offset, then
280
                        // we'll continue to seek forward.
281
                        if recordsToSkip > 0 &&
294✔
282
                                q.IncomingChanIDs.IsEmpty() &&
294✔
283
                                q.OutgoingChanIDs.IsEmpty() {
308✔
284
                                recordsToSkip--
14✔
285
                                continue
14✔
286
                        }
287

288
                        currentTime := time.Unix(
284✔
289
                                0, int64(byteOrder.Uint64(timestamp)),
284✔
290
                        )
284✔
291

284✔
292
                        // At this point, we've skipped enough records to start
284✔
293
                        // to collate our query. For each record, we'll
284✔
294
                        // increment the final record offset so the querier can
284✔
295
                        // utilize pagination to seek further.
284✔
296
                        readBuf := bytes.NewReader(events)
284✔
297
                        for readBuf.Len() != 0 {
568✔
298
                                var event ForwardingEvent
284✔
299
                                err := decodeForwardingEvent(readBuf, &event)
284✔
300
                                if err != nil {
284✔
301
                                        return err
×
302
                                }
×
303

304
                                // Check if the incoming channel ID matches the
305
                                // filter criteria.
306
                                // Either no filtering is applied (IsEmpty), or
307
                                // the ID is explicitly included.
308
                                incomingMatch := q.IncomingChanIDs.IsEmpty() ||
284✔
309
                                        q.IncomingChanIDs.Contains(
284✔
310
                                                event.IncomingChanID.ToUint64(),
284✔
311
                                        )
284✔
312

284✔
313
                                // Check if the outgoing channel ID matches the
284✔
314
                                // filter criteria.
284✔
315
                                // Either no filtering is applied (IsEmpty), or
284✔
316
                                // the ID is explicitly included.
284✔
317
                                outgoingMatch := q.OutgoingChanIDs.IsEmpty() ||
284✔
318
                                        q.OutgoingChanIDs.Contains(
284✔
319
                                                event.OutgoingChanID.ToUint64(),
284✔
320
                                        )
284✔
321

284✔
322
                                // If both conditions are met, then we'll add
284✔
323
                                // the event to our return payload.
284✔
324
                                if incomingMatch && outgoingMatch {
562✔
325
                                        // If we're not yet past the user
278✔
326
                                        // defined offset , then we'll continue
278✔
327
                                        // to seek forward.
278✔
328
                                        if recordsToSkip > 0 {
278✔
NEW
329
                                                recordsToSkip--
×
NEW
330
                                                continue
×
331
                                        }
332

333
                                        event.Timestamp = currentTime
278✔
334
                                        resp.ForwardingEvents = append(
278✔
335
                                                resp.ForwardingEvents,
278✔
336
                                                event,
278✔
337
                                        )
278✔
338
                                        recordOffset++
278✔
339
                                }
340
                        }
341
                }
342

343
                return nil
9✔
344
        }, func() {
11✔
345
                resp = ForwardingLogTimeSlice{
11✔
346
                        ForwardingEventQuery: q,
11✔
347
                }
11✔
348
        })
11✔
349
        if err != nil && err != ErrNoForwardingEvents {
11✔
350
                return ForwardingLogTimeSlice{}, err
×
351
        }
×
352

353
        resp.LastIndexOffset = recordOffset
11✔
354

11✔
355
        return resp, nil
11✔
356
}
357

358
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
359
// event timestamps and then makes sure there are no duplicates in the
360
// timestamps. If duplicates are found, some of the timestamps are increased on
361
// the nanosecond scale until only unique values remain. This is a fix to
362
// address the problem that in some environments (looking at you, Windows) the
363
// system clock has such a bad resolution that two serial invocations of
364
// time.Now() might return the same timestamp, even if some time has elapsed
365
// between the calls.
366
func makeUniqueTimestamps(events []ForwardingEvent) {
12✔
367
        sort.Slice(events, func(i, j int) bool {
439✔
368
                return events[i].Timestamp.Before(events[j].Timestamp)
427✔
369
        })
427✔
370

371
        // Now that we know the events are sorted by timestamp, we can go
372
        // through the list and fix all duplicates until only unique values
373
        // remain.
374
        for outer := 0; outer < len(events)-1; outer++ {
396✔
375
                current := events[outer].Timestamp.UnixNano()
384✔
376
                next := events[outer+1].Timestamp.UnixNano()
384✔
377

384✔
378
                // We initially sorted the slice. So if the current is now
384✔
379
                // greater or equal to the next one, it's either because it's a
384✔
380
                // duplicate or because we increased the current in the last
384✔
381
                // iteration.
384✔
382
                if current >= next {
389✔
383
                        next = current + 1
5✔
384
                        events[outer+1].Timestamp = time.Unix(0, next)
5✔
385
                }
5✔
386
        }
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc