• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/htlcswitch/payment_result.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8
        "sync"
9

10
        "github.com/lightningnetwork/lnd/channeldb"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/multimutex"
14
)
15

16
var (
17

18
        // networkResultStoreBucketKey is used for the root level bucket that
19
        // stores the network result for each payment ID.
20
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
21

22
        // ErrPaymentIDNotFound is an error returned if the given paymentID is
23
        // not found.
24
        ErrPaymentIDNotFound = errors.New("paymentID not found")
25

26
        // ErrPaymentIDAlreadyExists is returned if we try to write a pending
27
        // payment whose paymentID already exists.
28
        ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
29
)
30

31
// PaymentResult wraps a decoded result received from the network after a
32
// payment attempt was made. This is what is eventually handed to the router
33
// for processing.
34
type PaymentResult struct {
35
        // Preimage is set by the switch in case a sent HTLC was settled.
36
        Preimage [32]byte
37

38
        // Error is non-nil in case a HTLC send failed, and the HTLC is now
39
        // irrevocably canceled. If the payment failed during forwarding, this
40
        // error will be a *ForwardingError.
41
        Error error
42
}
43

44
// networkResult is the raw result received from the network after a payment
45
// attempt has been made. Since the switch doesn't always have the necessary
46
// data to decode the raw message, we store it together with some meta data,
47
// and decode it when the router query for the final result.
48
type networkResult struct {
49
        // msg is the received result. This should be of type UpdateFulfillHTLC
50
        // or UpdateFailHTLC.
51
        msg lnwire.Message
52

53
        // unencrypted indicates whether the failure encoded in the message is
54
        // unencrypted, and hence doesn't need to be decrypted.
55
        unencrypted bool
56

57
        // isResolution indicates whether this is a resolution message, in
58
        // which the failure reason might not be included.
59
        isResolution bool
60
}
61

62
// serializeNetworkResult serializes the networkResult.
63
func serializeNetworkResult(w io.Writer, n *networkResult) error {
3✔
64
        return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
3✔
65
}
3✔
66

67
// deserializeNetworkResult deserializes the networkResult.
68
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
2✔
69
        n := &networkResult{}
2✔
70

2✔
71
        if err := channeldb.ReadElements(r,
2✔
72
                &n.msg, &n.unencrypted, &n.isResolution,
2✔
73
        ); err != nil {
2✔
74
                return nil, err
×
75
        }
×
76

77
        return n, nil
2✔
78
}
79

80
// networkResultStore is a persistent store that stores any results of HTLCs in
81
// flight on the network. Since payment results are inherently asynchronous, it
82
// is used as a common access point for senders of HTLCs, to know when a result
83
// is back. The Switch will checkpoint any received result to the store, and
84
// the store will keep results and notify the callers about them.
85
type networkResultStore struct {
86
        backend kvdb.Backend
87

88
        // results is a map from paymentIDs to channels where subscribers to
89
        // payment results will be notified.
90
        results    map[uint64][]chan *networkResult
91
        resultsMtx sync.Mutex
92

93
        // attemptIDMtx is a multimutex used to make sure the database and
94
        // result subscribers map is consistent for each attempt ID in case of
95
        // concurrent callers.
96
        attemptIDMtx *multimutex.Mutex[uint64]
97
}
98

99
func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
3✔
100
        return &networkResultStore{
3✔
101
                backend:      db,
3✔
102
                results:      make(map[uint64][]chan *networkResult),
3✔
103
                attemptIDMtx: multimutex.NewMutex[uint64](),
3✔
104
        }
3✔
105
}
3✔
106

107
// storeResult stores the networkResult for the given attemptID, and notifies
108
// any subscribers.
109
func (store *networkResultStore) storeResult(attemptID uint64,
110
        result *networkResult) error {
3✔
111

3✔
112
        // We get a mutex for this attempt ID. This is needed to ensure
3✔
113
        // consistency between the database state and the subscribers in case
3✔
114
        // of concurrent calls.
3✔
115
        store.attemptIDMtx.Lock(attemptID)
3✔
116
        defer store.attemptIDMtx.Unlock(attemptID)
3✔
117

3✔
118
        log.Debugf("Storing result for attemptID=%v", attemptID)
3✔
119

3✔
120
        // Serialize the payment result.
3✔
121
        var b bytes.Buffer
3✔
122
        if err := serializeNetworkResult(&b, result); err != nil {
3✔
123
                return err
×
124
        }
×
125

126
        var attemptIDBytes [8]byte
3✔
127
        binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
3✔
128

3✔
129
        err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
6✔
130
                networkResults, err := tx.CreateTopLevelBucket(
3✔
131
                        networkResultStoreBucketKey,
3✔
132
                )
3✔
133
                if err != nil {
3✔
134
                        return err
×
135
                }
×
136

137
                return networkResults.Put(attemptIDBytes[:], b.Bytes())
3✔
138
        })
139
        if err != nil {
3✔
140
                return err
×
141
        }
×
142

143
        // Now that the result is stored in the database, we can notify any
144
        // active subscribers.
145
        store.resultsMtx.Lock()
3✔
146
        for _, res := range store.results[attemptID] {
6✔
147
                res <- result
3✔
148
        }
3✔
149
        delete(store.results, attemptID)
3✔
150
        store.resultsMtx.Unlock()
3✔
151

3✔
152
        return nil
3✔
153
}
154

155
// subscribeResult is used to get the HTLC attempt result for the given attempt
156
// ID.  It returns a channel on which the result will be delivered when ready.
157
func (store *networkResultStore) subscribeResult(attemptID uint64) (
158
        <-chan *networkResult, error) {
3✔
159

3✔
160
        // We get a mutex for this payment ID. This is needed to ensure
3✔
161
        // consistency between the database state and the subscribers in case
3✔
162
        // of concurrent calls.
3✔
163
        store.attemptIDMtx.Lock(attemptID)
3✔
164
        defer store.attemptIDMtx.Unlock(attemptID)
3✔
165

3✔
166
        log.Debugf("Subscribing to result for attemptID=%v", attemptID)
3✔
167

3✔
168
        var (
3✔
169
                result     *networkResult
3✔
170
                resultChan = make(chan *networkResult, 1)
3✔
171
        )
3✔
172

3✔
173
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
174
                var err error
3✔
175
                result, err = fetchResult(tx, attemptID)
3✔
176
                switch {
3✔
177

178
                // Result not yet available, we will notify once a result is
179
                // available.
180
                case err == ErrPaymentIDNotFound:
3✔
181
                        return nil
3✔
182

183
                case err != nil:
×
184
                        return err
×
185

186
                // The result was found, and will be returned immediately.
187
                default:
2✔
188
                        return nil
2✔
189
                }
190
        }, func() {
3✔
191
                result = nil
3✔
192
        })
3✔
193
        if err != nil {
3✔
194
                return nil, err
×
195
        }
×
196

197
        // If the result was found, we can send it on the result channel
198
        // imemdiately.
199
        if result != nil {
5✔
200
                resultChan <- result
2✔
201
                return resultChan, nil
2✔
202
        }
2✔
203

204
        // Otherwise we store the result channel for when the result is
205
        // available.
206
        store.resultsMtx.Lock()
3✔
207
        store.results[attemptID] = append(
3✔
208
                store.results[attemptID], resultChan,
3✔
209
        )
3✔
210
        store.resultsMtx.Unlock()
3✔
211

3✔
212
        return resultChan, nil
3✔
213
}
214

215
// getResult attempts to immediately fetch the result for the given pid from
216
// the store. If no result is available, ErrPaymentIDNotFound is returned.
217
func (store *networkResultStore) getResult(pid uint64) (
218
        *networkResult, error) {
3✔
219

3✔
220
        var result *networkResult
3✔
221
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
222
                var err error
3✔
223
                result, err = fetchResult(tx, pid)
3✔
224
                return err
3✔
225
        }, func() {
6✔
226
                result = nil
3✔
227
        })
3✔
228
        if err != nil {
6✔
229
                return nil, err
3✔
230
        }
3✔
231

UNCOV
232
        return result, nil
×
233
}
234

235
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
3✔
236
        var attemptIDBytes [8]byte
3✔
237
        binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
3✔
238

3✔
239
        networkResults := tx.ReadBucket(networkResultStoreBucketKey)
3✔
240
        if networkResults == nil {
3✔
UNCOV
241
                return nil, ErrPaymentIDNotFound
×
UNCOV
242
        }
×
243

244
        // Check whether a result is already available.
245
        resultBytes := networkResults.Get(attemptIDBytes[:])
3✔
246
        if resultBytes == nil {
6✔
247
                return nil, ErrPaymentIDNotFound
3✔
248
        }
3✔
249

250
        // Decode the result we found.
251
        r := bytes.NewReader(resultBytes)
2✔
252

2✔
253
        return deserializeNetworkResult(r)
2✔
254
}
255

256
// cleanStore removes all entries from the store, except the payment IDs given.
257
// NOTE: Since every result not listed in the keep map will be deleted, care
258
// should be taken to ensure no new payment attempts are being made
259
// concurrently while this process is ongoing, as its result might end up being
260
// deleted.
261
func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
3✔
262
        return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
6✔
263
                networkResults, err := tx.CreateTopLevelBucket(
3✔
264
                        networkResultStoreBucketKey,
3✔
265
                )
3✔
266
                if err != nil {
3✔
267
                        return err
×
268
                }
×
269

270
                // Iterate through the bucket, deleting all items not in the
271
                // keep map.
272
                var toClean [][]byte
3✔
273
                if err := networkResults.ForEach(func(k, _ []byte) error {
6✔
274
                        pid := binary.BigEndian.Uint64(k)
3✔
275
                        if _, ok := keep[pid]; ok {
3✔
UNCOV
276
                                return nil
×
UNCOV
277
                        }
×
278

279
                        toClean = append(toClean, k)
3✔
280
                        return nil
3✔
281
                }); err != nil {
×
282
                        return err
×
283
                }
×
284

285
                for _, k := range toClean {
6✔
286
                        err := networkResults.Delete(k)
3✔
287
                        if err != nil {
3✔
288
                                return err
×
289
                        }
×
290
                }
291

292
                if len(toClean) > 0 {
6✔
293
                        log.Infof("Removed %d stale entries from network "+
3✔
294
                                "result store", len(toClean))
3✔
295
                }
3✔
296

297
                return nil
3✔
298
        }, func() {})
3✔
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc