• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.08
/htlcswitch/payment_result.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8
        "sync"
9

10
        "github.com/lightningnetwork/lnd/channeldb"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/multimutex"
14
)
15

16
var (
17

18
        // networkResultStoreBucketKey is used for the root level bucket that
19
        // stores the network result for each payment ID.
20
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
21

22
        // ErrPaymentIDNotFound is an error returned if the given paymentID is
23
        // not found.
24
        ErrPaymentIDNotFound = errors.New("paymentID not found")
25

26
        // ErrPaymentIDAlreadyExists is returned if we try to write a pending
27
        // payment whose paymentID already exists.
28
        ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
29
)
30

31
// PaymentResult wraps a decoded result received from the network after a
32
// payment attempt was made. This is what is eventually handed to the router
33
// for processing.
34
type PaymentResult struct {
35
        // Preimage is set by the switch in case a sent HTLC was settled.
36
        Preimage [32]byte
37

38
        // Error is non-nil in case a HTLC send failed, and the HTLC is now
39
        // irrevocably canceled. If the payment failed during forwarding, this
40
        // error will be a *ForwardingError.
41
        Error error
42
}
43

44
// networkResult is the raw result received from the network after a payment
45
// attempt has been made. Since the switch doesn't always have the necessary
46
// data to decode the raw message, we store it together with some meta data,
47
// and decode it when the router query for the final result.
48
type networkResult struct {
49
        // msg is the received result. This should be of type UpdateFulfillHTLC
50
        // or UpdateFailHTLC.
51
        msg lnwire.Message
52

53
        // unencrypted indicates whether the failure encoded in the message is
54
        // unencrypted, and hence doesn't need to be decrypted.
55
        unencrypted bool
56

57
        // isResolution indicates whether this is a resolution message, in
58
        // which the failure reason might not be included.
59
        isResolution bool
60
}
61

62
// serializeNetworkResult serializes the networkResult.
63
func serializeNetworkResult(w io.Writer, n *networkResult) error {
3✔
64
        return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
3✔
65
}
3✔
66

67
// deserializeNetworkResult deserializes the networkResult.
68
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
3✔
69
        n := &networkResult{}
3✔
70

3✔
71
        if err := channeldb.ReadElements(r,
3✔
72
                &n.msg, &n.unencrypted, &n.isResolution,
3✔
73
        ); err != nil {
3✔
74
                return nil, err
×
75
        }
×
76

77
        return n, nil
3✔
78
}
79

80
// networkResultStore is a persistent store that stores any results of HTLCs in
81
// flight on the network. Since payment results are inherently asynchronous, it
82
// is used as a common access point for senders of HTLCs, to know when a result
83
// is back. The Switch will checkpoint any received result to the store, and
84
// the store will keep results and notify the callers about them.
85
type networkResultStore struct {
86
        backend kvdb.Backend
87

88
        // results is a map from paymentIDs to channels where subscribers to
89
        // payment results will be notified.
90
        results    map[uint64][]chan *networkResult
91
        resultsMtx sync.Mutex
92

93
        // paymentIDMtx is a multimutex used to make sure the database and
94
        // result subscribers map is consistent for each payment ID in case of
95
        // concurrent callers.
96
        paymentIDMtx *multimutex.Mutex[uint64]
97
}
98

99
func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
3✔
100
        return &networkResultStore{
3✔
101
                backend:      db,
3✔
102
                results:      make(map[uint64][]chan *networkResult),
3✔
103
                paymentIDMtx: multimutex.NewMutex[uint64](),
3✔
104
        }
3✔
105
}
3✔
106

107
// storeResult stores the networkResult for the given paymentID, and
108
// notifies any subscribers.
109
func (store *networkResultStore) storeResult(paymentID uint64,
110
        result *networkResult) error {
3✔
111

3✔
112
        // We get a mutex for this payment ID. This is needed to ensure
3✔
113
        // consistency between the database state and the subscribers in case
3✔
114
        // of concurrent calls.
3✔
115
        store.paymentIDMtx.Lock(paymentID)
3✔
116
        defer store.paymentIDMtx.Unlock(paymentID)
3✔
117

3✔
118
        log.Debugf("Storing result for paymentID=%v", paymentID)
3✔
119

3✔
120
        // Serialize the payment result.
3✔
121
        var b bytes.Buffer
3✔
122
        if err := serializeNetworkResult(&b, result); err != nil {
3✔
123
                return err
×
124
        }
×
125

126
        var paymentIDBytes [8]byte
3✔
127
        binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID)
3✔
128

3✔
129
        err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
6✔
130
                networkResults, err := tx.CreateTopLevelBucket(
3✔
131
                        networkResultStoreBucketKey,
3✔
132
                )
3✔
133
                if err != nil {
3✔
134
                        return err
×
135
                }
×
136

137
                return networkResults.Put(paymentIDBytes[:], b.Bytes())
3✔
138
        })
139
        if err != nil {
3✔
140
                return err
×
141
        }
×
142

143
        // Now that the result is stored in the database, we can notify any
144
        // active subscribers.
145
        store.resultsMtx.Lock()
3✔
146
        for _, res := range store.results[paymentID] {
6✔
147
                res <- result
3✔
148
        }
3✔
149
        delete(store.results, paymentID)
3✔
150
        store.resultsMtx.Unlock()
3✔
151

3✔
152
        return nil
3✔
153
}
154

155
// subscribeResult is used to get the payment result for the given
156
// payment ID. It returns a channel on which the result will be delivered when
157
// ready.
158
func (store *networkResultStore) subscribeResult(paymentID uint64) (
159
        <-chan *networkResult, error) {
3✔
160

3✔
161
        // We get a mutex for this payment ID. This is needed to ensure
3✔
162
        // consistency between the database state and the subscribers in case
3✔
163
        // of concurrent calls.
3✔
164
        store.paymentIDMtx.Lock(paymentID)
3✔
165
        defer store.paymentIDMtx.Unlock(paymentID)
3✔
166

3✔
167
        log.Debugf("Subscribing to result for paymentID=%v", paymentID)
3✔
168

3✔
169
        var (
3✔
170
                result     *networkResult
3✔
171
                resultChan = make(chan *networkResult, 1)
3✔
172
        )
3✔
173

3✔
174
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
175
                var err error
3✔
176
                result, err = fetchResult(tx, paymentID)
3✔
177
                switch {
3✔
178

179
                // Result not yet available, we will notify once a result is
180
                // available.
181
                case err == ErrPaymentIDNotFound:
3✔
182
                        return nil
3✔
183

184
                case err != nil:
×
185
                        return err
×
186

187
                // The result was found, and will be returned immediately.
188
                default:
3✔
189
                        return nil
3✔
190
                }
191
        }, func() {
3✔
192
                result = nil
3✔
193
        })
3✔
194
        if err != nil {
3✔
195
                return nil, err
×
196
        }
×
197

198
        // If the result was found, we can send it on the result channel
199
        // imemdiately.
200
        if result != nil {
6✔
201
                resultChan <- result
3✔
202
                return resultChan, nil
3✔
203
        }
3✔
204

205
        // Otherwise we store the result channel for when the result is
206
        // available.
207
        store.resultsMtx.Lock()
3✔
208
        store.results[paymentID] = append(
3✔
209
                store.results[paymentID], resultChan,
3✔
210
        )
3✔
211
        store.resultsMtx.Unlock()
3✔
212

3✔
213
        return resultChan, nil
3✔
214
}
215

216
// getResult attempts to immediately fetch the result for the given pid from
217
// the store. If no result is available, ErrPaymentIDNotFound is returned.
218
func (store *networkResultStore) getResult(pid uint64) (
219
        *networkResult, error) {
2✔
220

2✔
221
        var result *networkResult
2✔
222
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
4✔
223
                var err error
2✔
224
                result, err = fetchResult(tx, pid)
2✔
225
                return err
2✔
226
        }, func() {
4✔
227
                result = nil
2✔
228
        })
2✔
229
        if err != nil {
2✔
230
                return nil, err
×
231
        }
×
232

233
        return result, nil
2✔
234
}
235

236
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
3✔
237
        var paymentIDBytes [8]byte
3✔
238
        binary.BigEndian.PutUint64(paymentIDBytes[:], pid)
3✔
239

3✔
240
        networkResults := tx.ReadBucket(networkResultStoreBucketKey)
3✔
241
        if networkResults == nil {
3✔
242
                return nil, ErrPaymentIDNotFound
×
243
        }
×
244

245
        // Check whether a result is already available.
246
        resultBytes := networkResults.Get(paymentIDBytes[:])
3✔
247
        if resultBytes == nil {
6✔
248
                return nil, ErrPaymentIDNotFound
3✔
249
        }
3✔
250

251
        // Decode the result we found.
252
        r := bytes.NewReader(resultBytes)
3✔
253

3✔
254
        return deserializeNetworkResult(r)
3✔
255
}
256

257
// cleanStore removes all entries from the store, except the payment IDs given.
258
// NOTE: Since every result not listed in the keep map will be deleted, care
259
// should be taken to ensure no new payment attempts are being made
260
// concurrently while this process is ongoing, as its result might end up being
261
// deleted.
262
func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
3✔
263
        return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
6✔
264
                networkResults, err := tx.CreateTopLevelBucket(
3✔
265
                        networkResultStoreBucketKey,
3✔
266
                )
3✔
267
                if err != nil {
3✔
268
                        return err
×
269
                }
×
270

271
                // Iterate through the bucket, deleting all items not in the
272
                // keep map.
273
                var toClean [][]byte
3✔
274
                if err := networkResults.ForEach(func(k, _ []byte) error {
6✔
275
                        pid := binary.BigEndian.Uint64(k)
3✔
276
                        if _, ok := keep[pid]; ok {
3✔
277
                                return nil
×
278
                        }
×
279

280
                        toClean = append(toClean, k)
3✔
281
                        return nil
3✔
282
                }); err != nil {
×
283
                        return err
×
284
                }
×
285

286
                for _, k := range toClean {
6✔
287
                        err := networkResults.Delete(k)
3✔
288
                        if err != nil {
3✔
289
                                return err
×
290
                        }
×
291
                }
292

293
                if len(toClean) > 0 {
6✔
294
                        log.Infof("Removed %d stale entries from network "+
3✔
295
                                "result store", len(toClean))
3✔
296
                }
3✔
297

298
                return nil
3✔
299
        }, func() {})
3✔
300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc