• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17257443467

27 Aug 2025 04:37AM UTC coverage: 57.349% (+0.03%) from 57.321%
17257443467

Pull #10049

github

web-flow
Merge cd958d383 into 0c2f045f5
Pull Request #10049: multi: prevent duplicates for locally dispatched HTLC attempts

88 of 123 new or added lines in 4 files covered. (71.54%)

62 existing lines in 14 files now uncovered.

99293 of 173139 relevant lines covered (57.35%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.65
/htlcswitch/payment_result.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8
        "sync"
9

10
        "github.com/lightningnetwork/lnd/channeldb"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/multimutex"
14
)
15

16
var (
17

18
        // networkResultStoreBucketKey is used for the root level bucket that
19
        // stores the network result for each payment ID.
20
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
21

22
        // ErrPaymentIDNotFound is an error returned if the given paymentID is
23
        // not found.
24
        ErrPaymentIDNotFound = errors.New("paymentID not found")
25

26
        // ErrPaymentIDAlreadyExists is returned if we try to write a pending
27
        // payment whose paymentID already exists.
28
        ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
29
)
30

31
// PaymentResult wraps a decoded result received from the network after a
32
// payment attempt was made. This is what is eventually handed to the router
33
// for processing.
34
type PaymentResult struct {
35
        // Preimage is set by the switch in case a sent HTLC was settled.
36
        Preimage [32]byte
37

38
        // Error is non-nil in case a HTLC send failed, and the HTLC is now
39
        // irrevocably canceled. If the payment failed during forwarding, this
40
        // error will be a *ForwardingError.
41
        Error error
42
}
43

44
// networkResult is the raw result received from the network after a payment
45
// attempt has been made. Since the switch doesn't always have the necessary
46
// data to decode the raw message, we store it together with some meta data,
47
// and decode it when the router query for the final result.
48
type networkResult struct {
49
        // msg is the received result. This should be of type UpdateFulfillHTLC
50
        // or UpdateFailHTLC.
51
        msg lnwire.Message
52

53
        // unencrypted indicates whether the failure encoded in the message is
54
        // unencrypted, and hence doesn't need to be decrypted.
55
        unencrypted bool
56

57
        // isResolution indicates whether this is a resolution message, in
58
        // which the failure reason might not be included.
59
        isResolution bool
60
}
61

62
// serializeNetworkResult serializes the networkResult.
63
func serializeNetworkResult(w io.Writer, n *networkResult) error {
3✔
64
        return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
3✔
65
}
3✔
66

67
// deserializeNetworkResult deserializes the networkResult.
68
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
3✔
69
        n := &networkResult{}
3✔
70

3✔
71
        if err := channeldb.ReadElements(r,
3✔
72
                &n.msg, &n.unencrypted, &n.isResolution,
3✔
73
        ); err != nil {
3✔
74
                return nil, err
×
75
        }
×
76

77
        return n, nil
3✔
78
}
79

80
// networkResultStore is a persistent store that stores any results of HTLCs in
81
// flight on the network. Since payment results are inherently asynchronous, it
82
// is used as a common access point for senders of HTLCs, to know when a result
83
// is back. The Switch will checkpoint any received result to the store, and
84
// the store will keep results and notify the callers about them.
85
type networkResultStore struct {
86
        backend kvdb.Backend
87

88
        // results is a map from paymentIDs to channels where subscribers to
89
        // payment results will be notified.
90
        results    map[uint64][]chan *networkResult
91
        resultsMtx sync.Mutex
92

93
        // attemptIDMtx is a multimutex used to make sure the database and
94
        // result subscribers map is consistent for each attempt ID in case of
95
        // concurrent callers.
96
        attemptIDMtx *multimutex.Mutex[uint64]
97
}
98

99
func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
3✔
100
        return &networkResultStore{
3✔
101
                backend:      db,
3✔
102
                results:      make(map[uint64][]chan *networkResult),
3✔
103
                attemptIDMtx: multimutex.NewMutex[uint64](),
3✔
104
        }
3✔
105
}
3✔
106

107
// InitAttempt initializes the payment attempt with the given attemptID.
108
//
109
// If any record (even a pending result placeholder) already exists in the
110
// store, this method returns ErrPaymentIDAlreadyExists. This guarantees that
111
// only one HTLC will be initialized and dispatched for a given attempt ID until
112
// the ID is explicitly cleaned from attempt store.
113
//
114
// NOTE: Subscribed clients do not receive notice of this initialization.
115
func (store *networkResultStore) InitAttempt(attemptID uint64) error {
3✔
116

3✔
117
        // We get a mutex for this attempt ID. This is needed to ensure
3✔
118
        // consistency between the database state and the subscribers in case
3✔
119
        // of concurrent calls.
3✔
120
        store.attemptIDMtx.Lock(attemptID)
3✔
121
        defer store.attemptIDMtx.Unlock(attemptID)
3✔
122

3✔
123
        // Check if any attempt by this ID is already initialized or whether a
3✔
124
        // a result for the attempt exists in the store.
3✔
125
        var existingResult *networkResult
3✔
126
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
127
                var err error
3✔
128
                existingResult, err = fetchResult(tx, attemptID)
3✔
129
                if err != nil && !errors.Is(err, ErrPaymentIDNotFound) {
3✔
NEW
130
                        return err
×
NEW
131
                }
×
132
                return nil
3✔
133
        }, func() {
3✔
134
                existingResult = nil
3✔
135
        })
3✔
136
        if err != nil {
3✔
NEW
137
                return err
×
NEW
138
        }
×
139

140
        // If the result is already in-progress, return an error indicating that
141
        // the attempt already exists.
142
        if existingResult != nil {
3✔
NEW
143
                log.Warnf("Already initialized attempt for ID=%v", attemptID)
×
NEW
144

×
NEW
145
                return ErrPaymentIDAlreadyExists
×
NEW
146
        }
×
147

148
        // Create an empty networkResult to serve as place holder until a result
149
        // from the network is received.
150
        inProgressResult := &networkResult{
3✔
151
                msg:          &lnwire.PendingNetworkResult{},
3✔
152
                unencrypted:  true,
3✔
153
                isResolution: false,
3✔
154
        }
3✔
155

3✔
156
        var b bytes.Buffer
3✔
157
        if err := serializeNetworkResult(&b, inProgressResult); err != nil {
3✔
NEW
158
                return err
×
NEW
159
        }
×
160

161
        var attemptIDBytes [8]byte
3✔
162
        binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
3✔
163

3✔
164
        // Mark an attempt with this ID as having been seen. No network result
3✔
165
        // is available yet.
3✔
166
        //
3✔
167
        // NOTE: Subscribed clients expecting to block until a network result is
3✔
168
        // available must not be notified of this initialization.
3✔
169
        err = kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
6✔
170
                networkResults, err := tx.CreateTopLevelBucket(
3✔
171
                        networkResultStoreBucketKey,
3✔
172
                )
3✔
173
                if err != nil {
3✔
NEW
174
                        return err
×
NEW
175
                }
×
176

177
                // Store the in-progress result.
178
                return networkResults.Put(attemptIDBytes[:], b.Bytes())
3✔
179
        })
180
        if err != nil {
3✔
NEW
181
                return err
×
NEW
182
        }
×
183

184
        log.Debugf("Initialized attempt for local payment with attemptID=%v",
3✔
185
                attemptID)
3✔
186

3✔
187
        return nil
3✔
188
}
189

190
// storeResult stores the networkResult for the given attemptID, and notifies
191
// any subscribers.
192
func (store *networkResultStore) StoreResult(attemptID uint64,
193
        result *networkResult) error {
3✔
194

3✔
195
        // We get a mutex for this attempt ID. This is needed to ensure
3✔
196
        // consistency between the database state and the subscribers in case
3✔
197
        // of concurrent calls.
3✔
198
        store.attemptIDMtx.Lock(attemptID)
3✔
199
        defer store.attemptIDMtx.Unlock(attemptID)
3✔
200

3✔
201
        log.Debugf("Storing result for attemptID=%v", attemptID)
3✔
202

3✔
203
        // Handle finalized result (success or failure).
3✔
204
        var b bytes.Buffer
3✔
205
        if err := serializeNetworkResult(&b, result); err != nil {
3✔
206
                return err
×
207
        }
×
208

209
        var attemptIDBytes [8]byte
3✔
210
        binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
3✔
211

3✔
212
        err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
6✔
213
                networkResults, err := tx.CreateTopLevelBucket(
3✔
214
                        networkResultStoreBucketKey,
3✔
215
                )
3✔
216
                if err != nil {
3✔
217
                        return err
×
218
                }
×
219

220
                return networkResults.Put(attemptIDBytes[:], b.Bytes())
3✔
221
        })
222
        if err != nil {
3✔
223
                return err
×
224
        }
×
225

226
        // Now that the result is stored in the database, we can notify any
227
        // active subscribers - but only if this isn't an initialized attempt
228
        // awaiting a settle/fail result from the network.
229
        if result.msg.MsgType() != lnwire.MsgPendingNetworkResult {
6✔
230
                store.resultsMtx.Lock()
3✔
231
                for _, res := range store.results[attemptID] {
6✔
232
                        res <- result
3✔
233
                }
3✔
234
                delete(store.results, attemptID)
3✔
235
                store.resultsMtx.Unlock()
3✔
236
        }
237

238
        return nil
3✔
239
}
240

241
// subscribeResult is used to get the HTLC attempt result for the given attempt
242
// ID.  It returns a channel on which the result will be delivered when ready.
243
func (store *networkResultStore) SubscribeResult(attemptID uint64) (
244
        <-chan *networkResult, error) {
3✔
245

3✔
246
        // We get a mutex for this payment ID. This is needed to ensure
3✔
247
        // consistency between the database state and the subscribers in case
3✔
248
        // of concurrent calls.
3✔
249
        store.attemptIDMtx.Lock(attemptID)
3✔
250
        defer store.attemptIDMtx.Unlock(attemptID)
3✔
251

3✔
252
        log.Debugf("Subscribing to result for attemptID=%v", attemptID)
3✔
253

3✔
254
        var (
3✔
255
                result     *networkResult
3✔
256
                resultChan = make(chan *networkResult, 1)
3✔
257
        )
3✔
258

3✔
259
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
260
                var err error
3✔
261
                result, err = fetchResult(tx, attemptID)
3✔
262
                switch {
3✔
263

264
                // Result not yet available, we will notify once a result is
265
                // available.
UNCOV
266
                case err == ErrPaymentIDNotFound:
×
UNCOV
267
                        return nil
×
268

269
                case err != nil:
×
270
                        return err
×
271

272
                // The result was found, and will be returned immediately.
273
                default:
3✔
274
                        return nil
3✔
275
                }
276
        }, func() {
3✔
277
                result = nil
3✔
278
        })
3✔
279
        if err != nil {
3✔
280
                return nil, err
×
281
        }
×
282

283
        // If a result is back from the network, we can send it on the result
284
        // channel immemdiately. If the result is still our initialized place
285
        // holder, then treat it as not yet available.
286
        if result != nil {
6✔
287
                if result.msg.MsgType() != lnwire.MsgPendingNetworkResult {
3✔
NEW
288
                        log.Debugf("Obtained full result for attemptID=%v",
×
NEW
289
                                attemptID)
×
NEW
290

×
NEW
291
                        resultChan <- result
×
NEW
292
                        return resultChan, nil
×
NEW
293
                }
×
294

295
                log.Debugf("Awaiting result (settle/fail) for attemptID=%v",
3✔
296
                        attemptID)
3✔
297
        }
298

299
        // Otherwise we store the result channel for when the result is
300
        // available.
301
        store.resultsMtx.Lock()
3✔
302
        store.results[attemptID] = append(
3✔
303
                store.results[attemptID], resultChan,
3✔
304
        )
3✔
305
        store.resultsMtx.Unlock()
3✔
306

3✔
307
        return resultChan, nil
3✔
308
}
309

310
// GetResult attempts to immediately fetch the *final* network result for the
311
// given attempt ID from the store. If no result is available, or if the only
312
// entry is an initialization placeholder (e.g. created via InitAttempt),
313
// ErrPaymentIDNotFound is returned to signal that the result is not yet
314
// available.
315
//
316
// NOTE: This method does not currently acquire the result subscription mutex.
317
func (store *networkResultStore) GetResult(pid uint64) (
318
        *networkResult, error) {
3✔
319

3✔
320
        var result *networkResult
3✔
321
        err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
6✔
322
                var err error
3✔
323
                result, err = fetchResult(tx, pid)
3✔
324
                if err != nil {
3✔
NEW
325
                        return err
×
NEW
326
                }
×
327

328
                // If the attempt is still in-flight, treat it as not yet
329
                // available to preserve existing expectation for the behavior
330
                // of this method.
331
                if result.msg.MsgType() == lnwire.MsgPendingNetworkResult {
6✔
332
                        return ErrPaymentIDNotFound
3✔
333
                }
3✔
334

NEW
335
                return nil
×
336
        }, func() {
3✔
337
                result = nil
3✔
338
        })
3✔
339
        if err != nil {
6✔
340
                return nil, err
3✔
341
        }
3✔
342

343
        return result, nil
×
344
}
345

346
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
3✔
347
        var attemptIDBytes [8]byte
3✔
348
        binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
3✔
349

3✔
350
        networkResults := tx.ReadBucket(networkResultStoreBucketKey)
3✔
351
        if networkResults == nil {
3✔
352
                return nil, ErrPaymentIDNotFound
×
353
        }
×
354

355
        // Check whether a result is already available.
356
        resultBytes := networkResults.Get(attemptIDBytes[:])
3✔
357
        if resultBytes == nil {
6✔
358
                return nil, ErrPaymentIDNotFound
3✔
359
        }
3✔
360

361
        // Decode the result we found.
362
        r := bytes.NewReader(resultBytes)
3✔
363

3✔
364
        return deserializeNetworkResult(r)
3✔
365
}
366

367
// cleanStore removes all entries from the store, except the payment IDs given.
368
// NOTE: Since every result not listed in the keep map will be deleted, care
369
// should be taken to ensure no new payment attempts are being made
370
// concurrently while this process is ongoing, as its result might end up being
371
// deleted.
372
func (store *networkResultStore) CleanStore(keep map[uint64]struct{}) error {
3✔
373
        return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
6✔
374
                networkResults, err := tx.CreateTopLevelBucket(
3✔
375
                        networkResultStoreBucketKey,
3✔
376
                )
3✔
377
                if err != nil {
3✔
378
                        return err
×
379
                }
×
380

381
                // Iterate through the bucket, deleting all items not in the
382
                // keep map.
383
                var toClean [][]byte
3✔
384
                if err := networkResults.ForEach(func(k, _ []byte) error {
6✔
385
                        pid := binary.BigEndian.Uint64(k)
3✔
386
                        if _, ok := keep[pid]; ok {
6✔
387
                                return nil
3✔
388
                        }
3✔
389

390
                        toClean = append(toClean, k)
3✔
391
                        return nil
3✔
392
                }); err != nil {
×
393
                        return err
×
394
                }
×
395

396
                for _, k := range toClean {
6✔
397
                        err := networkResults.Delete(k)
3✔
398
                        if err != nil {
3✔
399
                                return err
×
400
                        }
×
401
                }
402

403
                if len(toClean) > 0 {
6✔
404
                        log.Infof("Removed %d stale entries from network "+
3✔
405
                                "result store", len(toClean))
3✔
406
                }
3✔
407

408
                return nil
3✔
409
        }, func() {})
3✔
410
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc