• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.54
/channeldb/forwarding_log.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "io"
6
        "sort"
7
        "time"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
var (
15
        // forwardingLogBucket is the bucket that we'll use to store the
16
        // forwarding log. The forwarding log contains a time series database
17
        // of the forwarding history of a lightning daemon. Each key within the
18
        // bucket is a timestamp (in nano seconds since the unix epoch), and
19
        // the value a slice of a forwarding event for that timestamp.
20
        forwardingLogBucket = []byte("circuit-fwd-log")
21
)
22

23
const (
24
        // forwardingEventSize is the size of a forwarding event. The breakdown
25
        // is as follows:
26
        //
27
        //  * 8 byte incoming chan ID || 8 byte outgoing chan ID || 8 byte value in
28
        //    || 8 byte value out
29
        //
30
        // From the value in and value out, callers can easily compute the
31
        // total fee extract from a forwarding event.
32
        forwardingEventSize = 32
33

34
        // MaxResponseEvents is the max number of forwarding events that will
35
        // be returned by a single query response. This size was selected to
36
        // safely remain under gRPC's 4MiB message size response limit. As each
37
        // full forwarding event (including the timestamp) is 40 bytes, we can
38
        // safely return 50k entries in a single response.
39
        MaxResponseEvents = 50000
40
)
41

42
// ForwardingLog returns an instance of the ForwardingLog object backed by the
43
// target database instance.
44
func (d *DB) ForwardingLog() *ForwardingLog {
3✔
45
        return &ForwardingLog{
3✔
46
                db: d,
3✔
47
        }
3✔
48
}
3✔
49

50
// ForwardingLog is a time series database that logs the fulfillment of payment
51
// circuits by a lightning network daemon. The log contains a series of
52
// forwarding events which map a timestamp to a forwarding event. A forwarding
53
// event describes which channels were used to create+settle a circuit, and the
54
// amount involved. Subtracting the outgoing amount from the incoming amount
55
// reveals the fee charged for the forwarding service.
56
type ForwardingLog struct {
57
        db *DB
58
}
59

60
// ForwardingEvent is an event in the forwarding log's time series. Each
61
// forwarding event logs the creation and tear-down of a payment circuit. A
62
// circuit is created once an incoming HTLC has been fully forwarded, and
63
// destroyed once the payment has been settled.
64
type ForwardingEvent struct {
65
        // Timestamp is the settlement time of this payment circuit.
66
        Timestamp time.Time
67

68
        // IncomingChanID is the incoming channel ID of the payment circuit.
69
        IncomingChanID lnwire.ShortChannelID
70

71
        // OutgoingChanID is the outgoing channel ID of the payment circuit.
72
        OutgoingChanID lnwire.ShortChannelID
73

74
        // AmtIn is the amount of the incoming HTLC. Subtracting this from the
75
        // outgoing amount gives the total fees of this payment circuit.
76
        AmtIn lnwire.MilliSatoshi
77

78
        // AmtOut is the amount of the outgoing HTLC. Subtracting the incoming
79
        // amount from this gives the total fees for this payment circuit.
80
        AmtOut lnwire.MilliSatoshi
81
}
82

83
// encodeForwardingEvent writes out the target forwarding event to the passed
84
// io.Writer, using the expected DB format. Note that the timestamp isn't
85
// serialized as this will be the key value within the bucket.
86
func encodeForwardingEvent(w io.Writer, f *ForwardingEvent) error {
3✔
87
        return WriteElements(
3✔
88
                w, f.IncomingChanID, f.OutgoingChanID, f.AmtIn, f.AmtOut,
3✔
89
        )
3✔
90
}
3✔
91

92
// decodeForwardingEvent attempts to decode the raw bytes of a serialized
93
// forwarding event into the target ForwardingEvent. Note that the timestamp
94
// won't be decoded, as the caller is expected to set this due to the bucket
95
// structure of the forwarding log.
96
func decodeForwardingEvent(r io.Reader, f *ForwardingEvent) error {
3✔
97
        return ReadElements(
3✔
98
                r, &f.IncomingChanID, &f.OutgoingChanID, &f.AmtIn, &f.AmtOut,
3✔
99
        )
3✔
100
}
3✔
101

102
// AddForwardingEvents adds a series of forwarding events to the database.
103
// Before inserting, the set of events will be sorted according to their
104
// timestamp. This ensures that all writes to disk are sequential.
105
func (f *ForwardingLog) AddForwardingEvents(events []ForwardingEvent) error {
3✔
106
        // Before we create the database transaction, we'll ensure that the set
3✔
107
        // of forwarding events are properly sorted according to their
3✔
108
        // timestamp and that no duplicate timestamps exist to avoid collisions
3✔
109
        // in the key we are going to store the events under.
3✔
110
        makeUniqueTimestamps(events)
3✔
111

3✔
112
        var timestamp [8]byte
3✔
113

3✔
114
        return kvdb.Batch(f.db.Backend, func(tx kvdb.RwTx) error {
6✔
115
                // First, we'll fetch the bucket that stores our time series
3✔
116
                // log.
3✔
117
                logBucket, err := tx.CreateTopLevelBucket(
3✔
118
                        forwardingLogBucket,
3✔
119
                )
3✔
120
                if err != nil {
3✔
121
                        return err
×
122
                }
×
123

124
                // With the bucket obtained, we can now begin to write out the
125
                // series of events.
126
                for _, event := range events {
6✔
127
                        err := storeEvent(logBucket, event, timestamp[:])
3✔
128
                        if err != nil {
3✔
129
                                return err
×
130
                        }
×
131
                }
132

133
                return nil
3✔
134
        })
135
}
136

137
// storeEvent tries to store a forwarding event into the given bucket by trying
138
// to avoid collisions. If a key for the event timestamp already exists in the
139
// database, the timestamp is incremented in nanosecond intervals until a "free"
140
// slot is found.
141
func storeEvent(bucket walletdb.ReadWriteBucket, event ForwardingEvent,
142
        timestampScratchSpace []byte) error {
3✔
143

3✔
144
        // First, we'll serialize this timestamp into our
3✔
145
        // timestamp buffer.
3✔
146
        byteOrder.PutUint64(
3✔
147
                timestampScratchSpace, uint64(event.Timestamp.UnixNano()),
3✔
148
        )
3✔
149

3✔
150
        // Next we'll loop until we find a "free" slot in the bucket to store
3✔
151
        // the event under. This should almost never happen unless we're running
3✔
152
        // on a system that has a very bad system clock that doesn't properly
3✔
153
        // resolve to nanosecond scale. We try up to 100 times (which would come
3✔
154
        // to a maximum shift of 0.1 microsecond which is acceptable for most
3✔
155
        // use cases). If we don't find a free slot, we just give up and let
3✔
156
        // the collision happen. Something must be wrong with the data in that
3✔
157
        // case, even on a very fast machine forwarding payments _will_ take a
3✔
158
        // few microseconds at least so we should find a nanosecond slot
3✔
159
        // somewhere.
3✔
160
        const maxTries = 100
3✔
161
        tries := 0
3✔
162
        for tries < maxTries {
6✔
163
                val := bucket.Get(timestampScratchSpace)
3✔
164
                if val == nil {
6✔
165
                        break
3✔
166
                }
167

168
                // Collision, try the next nanosecond timestamp.
UNCOV
169
                nextNano := event.Timestamp.UnixNano() + 1
×
UNCOV
170
                event.Timestamp = time.Unix(0, nextNano)
×
UNCOV
171
                byteOrder.PutUint64(timestampScratchSpace, uint64(nextNano))
×
UNCOV
172
                tries++
×
173
        }
174

175
        // With the key encoded, we'll then encode the event
176
        // into our buffer, then write it out to disk.
177
        var eventBytes [forwardingEventSize]byte
3✔
178
        eventBuf := bytes.NewBuffer(eventBytes[0:0:forwardingEventSize])
3✔
179
        err := encodeForwardingEvent(eventBuf, &event)
3✔
180
        if err != nil {
3✔
181
                return err
×
182
        }
×
183
        return bucket.Put(timestampScratchSpace, eventBuf.Bytes())
3✔
184
}
185

186
// ForwardingEventQuery represents a query to the forwarding log payment
187
// circuit time series database. The query allows a caller to retrieve all
188
// records for a particular time slice, offset in that time slice, limiting the
189
// total number of responses returned.
190
type ForwardingEventQuery struct {
191
        // StartTime is the start time of the time slice.
192
        StartTime time.Time
193

194
        // EndTime is the end time of the time slice.
195
        EndTime time.Time
196

197
        // IndexOffset is the offset within the time slice to start at. This
198
        // can be used to start the response at a particular record.
199
        IndexOffset uint32
200

201
        // NumMaxEvents is the max number of events to return.
202
        NumMaxEvents uint32
203
}
204

205
// ForwardingLogTimeSlice is the response to a forwarding query. It includes
206
// the original query, the set  events that match the query, and an integer
207
// which represents the offset index of the last item in the set of returned
208
// events. This integer allows callers to resume their query using this offset
209
// in the event that the query's response exceeds the max number of returnable
210
// events.
211
type ForwardingLogTimeSlice struct {
212
        ForwardingEventQuery
213

214
        // ForwardingEvents is the set of events in our time series that answer
215
        // the query embedded above.
216
        ForwardingEvents []ForwardingEvent
217

218
        // LastIndexOffset is the index of the last element in the set of
219
        // returned ForwardingEvents above. Callers can use this to resume
220
        // their query in the event that the time slice has too many events to
221
        // fit into a single response.
222
        LastIndexOffset uint32
223
}
224

225
// Query allows a caller to query the forwarding event time series for a
226
// particular time slice. The caller can control the precise time as well as
227
// the number of events to be returned.
228
//
229
// TODO(roasbeef): rename?
230
func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, error) {
3✔
231
        var resp ForwardingLogTimeSlice
3✔
232

3✔
233
        // If the user provided an index offset, then we'll not know how many
3✔
234
        // records we need to skip. We'll also keep track of the record offset
3✔
235
        // as that's part of the final return value.
3✔
236
        recordsToSkip := q.IndexOffset
3✔
237
        recordOffset := q.IndexOffset
3✔
238

3✔
239
        err := kvdb.View(f.db, func(tx kvdb.RTx) error {
6✔
240
                // If the bucket wasn't found, then there aren't any events to
3✔
241
                // be returned.
3✔
242
                logBucket := tx.ReadBucket(forwardingLogBucket)
3✔
243
                if logBucket == nil {
3✔
244
                        return ErrNoForwardingEvents
×
245
                }
×
246

247
                // We'll be using a cursor to seek into the database, so we'll
248
                // populate byte slices that represent the start of the key
249
                // space we're interested in, and the end.
250
                var startTime, endTime [8]byte
3✔
251
                byteOrder.PutUint64(startTime[:], uint64(q.StartTime.UnixNano()))
3✔
252
                byteOrder.PutUint64(endTime[:], uint64(q.EndTime.UnixNano()))
3✔
253

3✔
254
                // If we know that a set of log events exists, then we'll begin
3✔
255
                // our seek through the log in order to satisfy the query.
3✔
256
                // We'll continue until either we reach the end of the range,
3✔
257
                // or reach our max number of events.
3✔
258
                logCursor := logBucket.ReadCursor()
3✔
259
                timestamp, events := logCursor.Seek(startTime[:])
3✔
260
                for ; timestamp != nil && bytes.Compare(timestamp, endTime[:]) <= 0; timestamp, events = logCursor.Next() {
6✔
261
                        // If our current return payload exceeds the max number
3✔
262
                        // of events, then we'll exit now.
3✔
263
                        if uint32(len(resp.ForwardingEvents)) >= q.NumMaxEvents {
3✔
UNCOV
264
                                return nil
×
UNCOV
265
                        }
×
266

267
                        // If we're not yet past the user defined offset, then
268
                        // we'll continue to seek forward.
269
                        if recordsToSkip > 0 {
6✔
270
                                recordsToSkip--
3✔
271
                                continue
3✔
272
                        }
273

274
                        currentTime := time.Unix(
3✔
275
                                0, int64(byteOrder.Uint64(timestamp)),
3✔
276
                        )
3✔
277

3✔
278
                        // At this point, we've skipped enough records to start
3✔
279
                        // to collate our query. For each record, we'll
3✔
280
                        // increment the final record offset so the querier can
3✔
281
                        // utilize pagination to seek further.
3✔
282
                        readBuf := bytes.NewReader(events)
3✔
283
                        for readBuf.Len() != 0 {
6✔
284
                                var event ForwardingEvent
3✔
285
                                err := decodeForwardingEvent(readBuf, &event)
3✔
286
                                if err != nil {
3✔
287
                                        return err
×
288
                                }
×
289

290
                                event.Timestamp = currentTime
3✔
291
                                resp.ForwardingEvents = append(resp.ForwardingEvents, event)
3✔
292

3✔
293
                                recordOffset++
3✔
294
                        }
295
                }
296

297
                return nil
3✔
298
        }, func() {
3✔
299
                resp = ForwardingLogTimeSlice{
3✔
300
                        ForwardingEventQuery: q,
3✔
301
                }
3✔
302
        })
3✔
303
        if err != nil && err != ErrNoForwardingEvents {
3✔
304
                return ForwardingLogTimeSlice{}, err
×
305
        }
×
306

307
        resp.LastIndexOffset = recordOffset
3✔
308

3✔
309
        return resp, nil
3✔
310
}
311

312
// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
313
// event timestamps and then makes sure there are no duplicates in the
314
// timestamps. If duplicates are found, some of the timestamps are increased on
315
// the nanosecond scale until only unique values remain. This is a fix to
316
// address the problem that in some environments (looking at you, Windows) the
317
// system clock has such a bad resolution that two serial invocations of
318
// time.Now() might return the same timestamp, even if some time has elapsed
319
// between the calls.
320
func makeUniqueTimestamps(events []ForwardingEvent) {
3✔
321
        sort.Slice(events, func(i, j int) bool {
6✔
322
                return events[i].Timestamp.Before(events[j].Timestamp)
3✔
323
        })
3✔
324

325
        // Now that we know the events are sorted by timestamp, we can go
326
        // through the list and fix all duplicates until only unique values
327
        // remain.
328
        for outer := 0; outer < len(events)-1; outer++ {
6✔
329
                current := events[outer].Timestamp.UnixNano()
3✔
330
                next := events[outer+1].Timestamp.UnixNano()
3✔
331

3✔
332
                // We initially sorted the slice. So if the current is now
3✔
333
                // greater or equal to the next one, it's either because it's a
3✔
334
                // duplicate or because we increased the current in the last
3✔
335
                // iteration.
3✔
336
                if current >= next {
3✔
UNCOV
337
                        next = current + 1
×
UNCOV
338
                        events[outer+1].Timestamp = time.Unix(0, next)
×
UNCOV
339
                }
×
340
        }
341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc