• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.1
/channeldb/forwarding_log.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "io"
6
        "sort"
7
        "time"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
var (
15
        // forwardingLogBucket is the bucket that we'll use to store the
16
        // forwarding log. The forwarding log contains a time series database
17
        // of the forwarding history of a lightning daemon. Each key within the
18
        // bucket is a timestamp (in nano seconds since the unix epoch), and
19
        // the value a slice of a forwarding event for that timestamp.
20
        forwardingLogBucket = []byte("circuit-fwd-log")
21
)
22

23
const (
24
        // forwardingEventSize is the size of a forwarding event. The breakdown
25
        // is as follows:
26
        //
27
        //  * 8 byte incoming chan ID || 8 byte outgoing chan ID || 8 byte value in
28
        //    || 8 byte value out
29
        //
30
        // From the value in and value out, callers can easily compute the
31
        // total fee extract from a forwarding event.
32
        forwardingEventSize = 32
33

34
        // MaxResponseEvents is the max number of forwarding events that will
35
        // be returned by a single query response. This size was selected to
36
        // safely remain under gRPC's 4MiB message size response limit. As each
37
        // full forwarding event (including the timestamp) is 40 bytes, we can
38
        // safely return 50k entries in a single response.
39
        MaxResponseEvents = 50000
40
)
41

42
// ForwardingLog returns an instance of the ForwardingLog object backed by the
43
// target database instance.
44
func (d *DB) ForwardingLog() *ForwardingLog {
×
45
        return &ForwardingLog{
×
46
                db: d,
×
47
        }
×
48
}
×
49

50
// ForwardingLog is a time series database that logs the fulfillment of payment
51
// circuits by a lightning network daemon. The log contains a series of
52
// forwarding events which map a timestamp to a forwarding event. A forwarding
53
// event describes which channels were used to create+settle a circuit, and the
54
// amount involved. Subtracting the outgoing amount from the incoming amount
55
// reveals the fee charged for the forwarding service.
56
type ForwardingLog struct {
57
        db *DB
58
}
59

60
// ForwardingEvent is an event in the forwarding log's time series. Each
61
// forwarding event logs the creation and tear-down of a payment circuit. A
62
// circuit is created once an incoming HTLC has been fully forwarded, and
63
// destroyed once the payment has been settled.
64
type ForwardingEvent struct {
65
        // Timestamp is the settlement time of this payment circuit.
66
        Timestamp time.Time
67

68
        // IncomingChanID is the incoming channel ID of the payment circuit.
69
        IncomingChanID lnwire.ShortChannelID
70

71
        // OutgoingChanID is the outgoing channel ID of the payment circuit.
72
        OutgoingChanID lnwire.ShortChannelID
73

74
        // AmtIn is the amount of the incoming HTLC. Subtracting this from the
75
        // outgoing amount gives the total fees of this payment circuit.
76
        AmtIn lnwire.MilliSatoshi
77

78
        // AmtOut is the amount of the outgoing HTLC. Subtracting the incoming
79
        // amount from this gives the total fees for this payment circuit.
80
        AmtOut lnwire.MilliSatoshi
81
}
82

83
// encodeForwardingEvent writes out the target forwarding event to the passed
84
// io.Writer, using the expected DB format. Note that the timestamp isn't
85
// serialized as this will be the key value within the bucket.
86
func encodeForwardingEvent(w io.Writer, f *ForwardingEvent) error {
360✔
87
        return WriteElements(
360✔
88
                w, f.IncomingChanID, f.OutgoingChanID, f.AmtIn, f.AmtOut,
360✔
89
        )
360✔
90
}
360✔
91

92
// decodeForwardingEvent attempts to decode the raw bytes of a serialized
93
// forwarding event into the target ForwardingEvent. Note that the timestamp
94
// won't be decoded, as the caller is expected to set this due to the bucket
95
// structure of the forwarding log.
96
func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
260✔
97
        return ReadElements(
260✔
98
                r, &f.IncomingChanID, &f.OutgoingChanID, &f.AmtIn, &f.AmtOut,
260✔
99
        )
260✔
100
}
260✔
101

102
// AddForwardingEvents adds a series of forwarding events to the database.
103
// Before inserting, the set of events will be sorted according to their
104
// timestamp. This ensures that all writes to disk are sequential.
105
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
5✔
106
        // Before we create the database transaction, we'll ensure that the set
5✔
107
        // of forwarding events are properly sorted according to their
5✔
108
        // timestamp and that no duplicate timestamps exist to avoid collisions
5✔
109
        // in the key we are going to store the events under.
5✔
110
        makeUniqueTimestamps(events)
5✔
111

5✔
112
        var timestamp [8]byte
5✔
113

5✔
114
        return kvdb.Batch(f.db.Backend, func(tx kvdb.RwTx) error {
10✔
115
                // First, we'll fetch the bucket that stores our time series
5✔
116
                // log.
5✔
117
                logBucket, err := tx.CreateTopLevelBucket(
5✔
118
                        forwardingLogBucket,
5✔
119
                )
5✔
120
                if err != nil {
5✔
121
                        return err
×
122
                }
×
123

124
                // With the bucket obtained, we can now begin to write out the
125
                // series of events.
126
                for _, event := range events {
365✔
127
                        err := storeEvent(logBucket, event, timestamp[:])
360✔
128
                        if err != nil {
360✔
129
                                return err
×
130
                        }
×
131
                }
132

133
                return nil
5✔
134
        })
135
}
136

137
// storeEvent tries to store a forwarding event into the given bucket by trying
138
// to avoid collisions. If a key for the event timestamp already exists in the
139
// database, the timestamp is incremented in nanosecond intervals until a "free"
140
// slot is found.
141
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
142
        timestampScratchSpace []byte) error {
360✔
143

360✔
144
        // First, we'll serialize this timestamp into our
360✔
145
        // timestamp buffer.
360✔
146
        byteOrder.PutUint64(
360✔
147
                timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
360✔
148
        )
360✔
149

360✔
150
        // Next we'll loop until we find a "free" slot in the bucket to store
360✔
151
        // the event under. This should almost never happen unless we're running
360✔
152
        // on a system that has a very bad system clock that doesn't properly
360✔
153
        // resolve to nanosecond scale. We try up to 100 times (which would come
360✔
154
        // to a maximum shift of 0.1 microsecond which is acceptable for most
360✔
155
        // use cases). If we don't find a free slot, we just give up and let
360✔
156
        // the collision happen. Something must be wrong with the data in that
360✔
157
        // case, even on a very fast machine forwarding payments _will_ take a
360✔
158
        // few microseconds at least so we should find a nanosecond slot
360✔
159
        // somewhere.
360✔
160
        const maxTries = 100
360✔
161
        tries := 0
360✔
162
        for tries < maxTries {
1,120✔
163
                val := bucket.Get(timestampScratchSpace)
760✔
164
                if val == nil {
1,120✔
165
                        break
360✔
166
                }
167

168
                // Collision, try the next nanosecond timestamp.
169
                nextNano := event.Timestamp.UnixNano() + 1
400✔
170
                event.Timestamp = time.Unix(0, nextNano)
400✔
171
                byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
400✔
172
                tries++
400✔
173
        }
174

175
        // With the key encoded, we'll then encode the event
176
        // into our buffer, then write it out to disk.
177
        var eventBytes [forwardingEventSize]byte
360✔
178
        eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
360✔
179
        err := encodeForwardingEvent(eventBuf, &event)
360✔
180
        if err != nil {
360✔
181
                return err
×
182
        }
×
183
        return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
360✔
184
}
185

186
// ForwardingEventQuery represents a query to the forwarding log payment
187
// circuit time series database. The query allows a caller to retrieve all
188
// records for a particular time slice, offset in that time slice, limiting the
189
// total number of responses returned.
190
type ForwardingEventQuery struct {
191
        // StartTime is the start time of the time slice.
192
        StartTime time.Time
193

194
        // EndTime is the end time of the time slice.
195
        EndTime time.Time
196

197
        // IndexOffset is the offset within the time slice to start at. This
198
        // can be used to start the response at a particular record.
199
        IndexOffset uint32
200

201
        // NumMaxEvents is the max number of events to return.
202
        NumMaxEvents uint32
203
}
204

205
// ForwardingLogTimeSlice is the response to a forwarding query. It includes
206
// the original query, the set  events that match the query, and an integer
207
// which represents the offset index of the last item in the set of returned
208
// events. This integer allows callers to resume their query using this offset
209
// in the event that the query's response exceeds the max number of returnable
210
// events.
211
type ForwardingLogTimeSlice struct {
212
        ForwardingEventQuery
213

214
        // ForwardingEvents is the set of events in our time series that answer
215
        // the query embedded above.
216
        ForwardingEvents []ForwardingEvent
217

218
        // LastIndexOffset is the index of the last element in the set of
219
        // returned ForwardingEvents above. Callers can use this to resume
220
        // their query in the event that the time slice has too many events to
221
        // fit into a single response.
222
        LastIndexOffset uint32
223
}
224

225
// Query allows a caller to query the forwarding event time series for a
226
// particular time slice. The caller can control the precise time as well as
227
// the number of events to be returned.
228
//
229
// TODO(roasbeef): rename?
230
func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, error) {
5✔
231
        var resp ForwardingLogTimeSlice
5✔
232

5✔
233
        // If the user provided an index offset, then we'll not know how many
5✔
234
        // records we need to skip. We'll also keep track of the record offset
5✔
235
        // as that's part of the final return value.
5✔
236
        recordsToSkip := q.IndexOffset
5✔
237
        recordOffset := q.IndexOffset
5✔
238

5✔
239
        err := kvdb.View(f.db, func(tx kvdb.RTx) error {
10✔
240
                // If the bucket wasn't found, then there aren't any events to
5✔
241
                // be returned.
5✔
242
                logBucket := tx.ReadBucket(forwardingLogBucket)
5✔
243
                if logBucket == nil {
5✔
244
                        return ErrNoForwardingEvents
×
245
                }
×
246

247
                // We'll be using a cursor to seek into the database, so we'll
248
                // populate byte slices that represent the start of the key
249
                // space we're interested in, and the end.
250
                var startTime, endTime [8]byte
5✔
251
                byteOrder.PutUint64(startTime[:], uint64(q.StartTime.UnixNano()))
5✔
252
                byteOrder.PutUint64(endTime[:], uint64(q.EndTime.UnixNano()))
5✔
253

5✔
254
                // If we know that a set of log events exists, then we'll begin
5✔
255
                // our seek through the log in order to satisfy the query.
5✔
256
                // We'll continue until either we reach the end of the range,
5✔
257
                // or reach our max number of events.
5✔
258
                logCursor := logBucket.ReadCursor()
5✔
259
                timestamp, events := logCursor.Seek(startTime[:])
5✔
260
                for ; timestamp != nil && bytes.Compare(timestamp, endTime[:]) <= 0; timestamp, events = logCursor.Next() {
277✔
261
                        // If our current return payload exceeds the max number
272✔
262
                        // of events, then we'll exit now.
272✔
263
                        if uint32(len(resp.ForwardingEvents)) >= q.NumMaxEvents {
274✔
264
                                return nil
2✔
265
                        }
2✔
266

267
                        // If we're not yet past the user defined offset, then
268
                        // we'll continue to seek forward.
269
                        if recordsToSkip > 0 {
280✔
270
                                recordsToSkip--
10✔
271
                                continue
10✔
272
                        }
273

274
                        currentTime := time.Unix(
260✔
275
                                0, int64(byteOrder.Uint64(timestamp)),
260✔
276
                        )
260✔
277

260✔
278
                        // At this point, we've skipped enough records to start
260✔
279
                        // to collate our query. For each record, we'll
260✔
280
                        // increment the final record offset so the querier can
260✔
281
                        // utilize pagination to seek further.
260✔
282
                        readBuf := bytes.NewReader(events)
260✔
283
                        for readBuf.Len() != 0 {
520✔
284
                                var event ForwardingEvent
260✔
285
                                err := decodeForwardingEvent(readBuf, &event)
260✔
286
                                if err != nil {
260✔
287
                                        return err
×
288
                                }
×
289

290
                                event.Timestamp = currentTime
260✔
291
                                resp.ForwardingEvents = append(resp.ForwardingEvents, event)
260✔
292

260✔
293
                                recordOffset++
260✔
294
                        }
295
                }
296

297
                return nil
3✔
298
        }, func() {
5✔
299
                resp = ForwardingLogTimeSlice{
5✔
300
                        ForwardingEventQuery: q,
5✔
301
                }
5✔
302
        })
5✔
303
        if err != nil && err != ErrNoForwardingEvents {
5✔
304
                return ForwardingLogTimeSlice{}, err
×
305
        }
×
306

307
        resp.LastIndexOffset = recordOffset
5✔
308

5✔
309
        return resp, nil
5✔
310
}
311

312
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
313
// event timestamps and then makes sure there are no duplicates in the
314
// timestamps. If duplicates are found, some of the timestamps are increased on
315
// the nanosecond scale until only unique values remain. This is a fix to
316
// address the problem that in some environments (looking at you, Windows) the
317
// system clock has such a bad resolution that two serial invocations of
318
// time.Now() might return the same timestamp, even if some time has elapsed
319
// between the calls.
320
func makeUniqueTimestamps(events []ForwardingEvent) {
6✔
321
        sort.Slice(events, func(i, j int) bool {
411✔
322
                return events[i].Timestamp.Before(events[j].Timestamp)
405✔
323
        })
405✔
324

325
        // Now that we know the events are sorted by timestamp, we can go
326
        // through the list and fix all duplicates until only unique values
327
        // remain.
328
        for outer := 0; outer < len(events)-1; outer++ {
368✔
329
                current := events[outer].Timestamp.UnixNano()
362✔
330
                next := events[outer+1].Timestamp.UnixNano()
362✔
331

362✔
332
                // We initially sorted the slice. So if the current is now
362✔
333
                // greater or equal to the next one, it's either because it's a
362✔
334
                // duplicate or because we increased the current in the last
362✔
335
                // iteration.
362✔
336
                if current >= next {
367✔
337
                        next = current + 1
5✔
338
                        events[outer+1].Timestamp = time.Unix(0, next)
5✔
339
                }
5✔
340
        }
341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc