• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15155511119

21 May 2025 06:52AM UTC coverage: 57.389% (-11.6%) from 68.996%
15155511119

Pull #9844

github

web-flow
Merge 8658c8597 into c52a6ddeb
Pull Request #9844: Refactor Payment PR 3

346 of 493 new or added lines in 4 files covered. (70.18%)

30172 existing lines in 456 files now uncovered.

95441 of 166305 relevant lines covered (57.39%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.5
/routing/control_tower.go
1
package routing
2

3
import (
4
        "sync"
5

6
        "github.com/lightningnetwork/lnd/channeldb"
7
        "github.com/lightningnetwork/lnd/lntypes"
8
        "github.com/lightningnetwork/lnd/multimutex"
9
        "github.com/lightningnetwork/lnd/payments"
10
        pymtpkg "github.com/lightningnetwork/lnd/payments"
11
        "github.com/lightningnetwork/lnd/queue"
12
)
13

14
// DBMPPayment is an interface derived from channeldb.MPPayment that is used by
15
// the payment lifecycle.
16
type DBMPPayment interface {
17
        // GetState returns the current state of the payment.
18
        GetState() *channeldb.MPPaymentState
19

20
        // Terminated returns true if the payment is in a final state.
21
        Terminated() bool
22

23
        // GetStatus returns the current status of the payment.
24
        GetStatus() channeldb.PaymentStatus
25

26
        // NeedWaitAttempts specifies whether the payment needs to wait for the
27
        // outcome of an attempt.
28
        NeedWaitAttempts() (bool, error)
29

30
        // GetHTLCs returns all HTLCs of this payment.
31
        GetHTLCs() []channeldb.HTLCAttempt
32

33
        // InFlightHTLCs returns all HTLCs that are in flight.
34
        InFlightHTLCs() []channeldb.HTLCAttempt
35

36
        // AllowMoreAttempts is used to decide whether we can safely attempt
37
        // more HTLCs for a given payment state. Return an error if the payment
38
        // is in an unexpected state.
39
        AllowMoreAttempts() (bool, error)
40

41
        // TerminalInfo returns the settled HTLC attempt or the payment's
42
        // failure reason.
43
        TerminalInfo() (*channeldb.HTLCAttempt, *channeldb.FailureReason)
44
}
45

46
// ControlTower tracks all outgoing payments made, whose primary purpose is to
47
// prevent duplicate payments to the same payment hash. In production, a
48
// persistent implementation is preferred so that tracking can survive across
49
// restarts. Payments are transitioned through various payment states, and the
50
// ControlTower interface provides access to driving the state transitions.
51
type ControlTower interface {
52
        // This method checks that no succeeded payment exist for this payment
53
        // hash.
54
        InitPayment(lntypes.Hash, *channeldb.PaymentCreationInfo) error
55

56
        // DeleteFailedAttempts removes all failed HTLCs from the db. It should
57
        // be called for a given payment whenever all inflight htlcs are
58
        // completed, and the payment has reached a final settled state.
59
        DeleteFailedAttempts(lntypes.Hash) error
60

61
        // RegisterAttempt atomically records the provided HTLCAttemptInfo.
62
        RegisterAttempt(lntypes.Hash, *channeldb.HTLCAttemptInfo) error
63

64
        // SettleAttempt marks the given attempt settled with the preimage. If
65
        // this is a multi shard payment, this might implicitly mean the the
66
        // full payment succeeded.
67
        //
68
        // After invoking this method, InitPayment should always return an
69
        // error to prevent us from making duplicate payments to the same
70
        // payment hash. The provided preimage is atomically saved to the DB
71
        // for record keeping.
72
        SettleAttempt(lntypes.Hash, uint64, *channeldb.HTLCSettleInfo) (
73
                *channeldb.HTLCAttempt, error)
74

75
        // FailAttempt marks the given payment attempt failed.
76
        FailAttempt(lntypes.Hash, uint64, *channeldb.HTLCFailInfo) (
77
                *channeldb.HTLCAttempt, error)
78

79
        // FetchPayment fetches the payment corresponding to the given payment
80
        // hash.
81
        FetchPayment(paymentHash lntypes.Hash) (DBMPPayment, error)
82

83
        // FailPayment transitions a payment into the Failed state, and records
84
        // the ultimate reason the payment failed. Note that this should only
85
        // be called when all active attempts are already failed. After
86
        // invoking this method, InitPayment should return nil on its next call
87
        // for this payment hash, allowing the user to make a subsequent
88
        // payment.
89
        FailPayment(lntypes.Hash, channeldb.FailureReason) error
90

91
        // FetchInFlightPayments returns all payments with status InFlight.
92
        FetchInFlightPayments() ([]*channeldb.MPPayment, error)
93

94
        // SubscribePayment subscribes to updates for the payment with the given
95
        // hash. A first update with the current state of the payment is always
96
        // sent out immediately.
97
        SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
98
                error)
99

100
        // SubscribeAllPayments subscribes to updates for all payments. A first
101
        // update with the current state of every inflight payment is always
102
        // sent out immediately.
103
        SubscribeAllPayments() (ControlTowerSubscriber, error)
104
}
105

106
// ControlTowerSubscriber contains the state for a payment update subscriber.
107
type ControlTowerSubscriber interface {
108
        // Updates is the channel over which *channeldb.MPPayment updates can be
109
        // received.
110
        Updates() <-chan interface{}
111

112
        // Close signals that the subscriber is no longer interested in updates.
113
        Close()
114
}
115

116
// ControlTowerSubscriberImpl contains the state for a payment update
117
// subscriber.
118
type controlTowerSubscriberImpl struct {
119
        updates <-chan interface{}
120
        queue   *queue.ConcurrentQueue
121
        quit    chan struct{}
122
}
123

124
// newControlTowerSubscriber instantiates a new subscriber state object.
125
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
1✔
126
        // Create a queue for payment updates.
1✔
127
        queue := queue.NewConcurrentQueue(20)
1✔
128
        queue.Start()
1✔
129

1✔
130
        return &controlTowerSubscriberImpl{
1✔
131
                updates: queue.ChanOut(),
1✔
132
                queue:   queue,
1✔
133
                quit:    make(chan struct{}),
1✔
134
        }
1✔
135
}
1✔
136

137
// Close signals that the subscriber is no longer interested in updates.
138
func (s *controlTowerSubscriberImpl) Close() {
1✔
139
        // Close quit channel so that any pending writes to the queue are
1✔
140
        // cancelled.
1✔
141
        close(s.quit)
1✔
142

1✔
143
        // Stop the queue goroutine so that it won't leak.
1✔
144
        s.queue.Stop()
1✔
145
}
1✔
146

147
// Updates is the channel over which *channeldb.MPPayment updates can be
148
// received.
149
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
1✔
150
        return s.updates
1✔
151
}
1✔
152

153
// controlTower is persistent implementation of ControlTower to restrict
154
// double payment sending.
155
type controlTower struct {
156
        db payments.PaymentDB
157

158
        // subscriberIndex is used to provide a unique id for each subscriber
159
        // to all payments. This is used to easily remove the subscriber when
160
        // necessary.
161
        subscriberIndex        uint64
162
        subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
163
        subscribers            map[lntypes.Hash][]*controlTowerSubscriberImpl
164
        subscribersMtx         sync.Mutex
165

166
        // paymentsMtx provides synchronization on the payment level to ensure
167
        // that no race conditions occur in between updating the database and
168
        // sending a notification.
169
        paymentsMtx *multimutex.Mutex[lntypes.Hash]
170
}
171

172
// NewControlTower creates a new instance of the controlTower.
173
func NewControlTower(db pymtpkg.PaymentDB) ControlTower {
1✔
174
        return &controlTower{
1✔
175
                db: db,
1✔
176
                subscribersAllPayments: make(
1✔
177
                        map[uint64]*controlTowerSubscriberImpl,
1✔
178
                ),
1✔
179
                subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
1✔
180
                paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
1✔
181
        }
1✔
182
}
1✔
183

184
// InitPayment checks or records the given PaymentCreationInfo with the DB,
185
// making sure it does not already exist as an in-flight payment. Then this
186
// method returns successfully, the payment is guaranteed to be in the
187
// Initiated state.
188
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
189
        info *channeldb.PaymentCreationInfo) error {
1✔
190

1✔
191
        err := p.db.InitPayment(paymentHash, info)
1✔
192
        if err != nil {
2✔
193
                return err
1✔
194
        }
1✔
195

196
        // Take lock before querying the db to prevent missing or duplicating
197
        // an update.
198
        p.paymentsMtx.Lock(paymentHash)
1✔
199
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
200

1✔
201
        payment, err := p.db.FetchPayment(paymentHash)
1✔
202
        if err != nil {
1✔
203
                return err
×
UNCOV
204
        }
×
205

206
        p.notifySubscribers(paymentHash, payment)
1✔
207

1✔
208
        return nil
1✔
209
}
210

211
// DeleteFailedAttempts deletes all failed htlcs if the payment was
212
// successfully settled.
213
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
1✔
214
        return p.db.DeleteFailedAttempts(paymentHash)
1✔
215
}
1✔
216

217
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
218
// DB.
219
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
220
        attempt *channeldb.HTLCAttemptInfo) error {
1✔
221

1✔
222
        p.paymentsMtx.Lock(paymentHash)
1✔
223
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
224

1✔
225
        payment, err := p.db.RegisterAttempt(paymentHash, attempt)
1✔
226
        if err != nil {
1✔
227
                return err
×
UNCOV
228
        }
×
229

230
        // Notify subscribers of the attempt registration.
231
        p.notifySubscribers(paymentHash, payment)
1✔
232

1✔
233
        return nil
1✔
234
}
235

236
// SettleAttempt marks the given attempt settled with the preimage. If
237
// this is a multi shard payment, this might implicitly mean the the
238
// full payment succeeded.
239
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
240
        attemptID uint64, settleInfo *channeldb.HTLCSettleInfo) (
241
        *channeldb.HTLCAttempt, error) {
1✔
242

1✔
243
        p.paymentsMtx.Lock(paymentHash)
1✔
244
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
245

1✔
246
        payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
1✔
247
        if err != nil {
1✔
248
                return nil, err
×
UNCOV
249
        }
×
250

251
        // Notify subscribers of success event.
252
        p.notifySubscribers(paymentHash, payment)
1✔
253

1✔
254
        return payment.GetAttempt(attemptID)
1✔
255
}
256

257
// FailAttempt marks the given payment attempt failed.
258
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
259
        attemptID uint64, failInfo *channeldb.HTLCFailInfo) (
260
        *channeldb.HTLCAttempt, error) {
1✔
261

1✔
262
        p.paymentsMtx.Lock(paymentHash)
1✔
263
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
264

1✔
265
        payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
1✔
266
        if err != nil {
1✔
267
                return nil, err
×
UNCOV
268
        }
×
269

270
        // Notify subscribers of failed attempt.
271
        p.notifySubscribers(paymentHash, payment)
1✔
272

1✔
273
        return payment.GetAttempt(attemptID)
1✔
274
}
275

276
// FetchPayment fetches the payment corresponding to the given payment hash.
277
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
278
        DBMPPayment, error) {
1✔
279

1✔
280
        return p.db.FetchPayment(paymentHash)
1✔
281
}
1✔
282

283
// FailPayment transitions a payment into the Failed state, and records the
284
// reason the payment failed. After invoking this method, InitPayment should
285
// return nil on its next call for this payment hash, allowing the switch to
286
// make a subsequent payment.
287
//
288
// NOTE: This method will overwrite the failure reason if the payment is already
289
// failed.
290
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
291
        reason channeldb.FailureReason) error {
1✔
292

1✔
293
        p.paymentsMtx.Lock(paymentHash)
1✔
294
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
295

1✔
296
        payment, err := p.db.Fail(paymentHash, reason)
1✔
297
        if err != nil {
1✔
298
                return err
×
UNCOV
299
        }
×
300

301
        // Notify subscribers of fail event.
302
        p.notifySubscribers(paymentHash, payment)
1✔
303

1✔
304
        return nil
1✔
305
}
306

307
// FetchInFlightPayments returns all payments with status InFlight.
308
func (p *controlTower) FetchInFlightPayments() ([]*channeldb.MPPayment, error) {
1✔
309
        return p.db.FetchInFlightPayments()
1✔
310
}
1✔
311

312
// SubscribePayment subscribes to updates for the payment with the given hash. A
313
// first update with the current state of the payment is always sent out
314
// immediately.
315
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
316
        ControlTowerSubscriber, error) {
1✔
317

1✔
318
        // Take lock before querying the db to prevent missing or duplicating an
1✔
319
        // update.
1✔
320
        p.paymentsMtx.Lock(paymentHash)
1✔
321
        defer p.paymentsMtx.Unlock(paymentHash)
1✔
322

1✔
323
        payment, err := p.db.FetchPayment(paymentHash)
1✔
324
        if err != nil {
1✔
UNCOV
325
                return nil, err
×
UNCOV
326
        }
×
327

328
        subscriber := newControlTowerSubscriber()
1✔
329

1✔
330
        // Always write current payment state to the channel.
1✔
331
        subscriber.queue.ChanIn() <- payment
1✔
332

1✔
333
        // Payment is currently in flight. Register this subscriber for further
1✔
334
        // updates. Otherwise this update is the final update and the incoming
1✔
335
        // channel can be closed. This will close the queue's outgoing channel
1✔
336
        // when all updates have been written.
1✔
337
        if !payment.Terminated() {
2✔
338
                p.subscribersMtx.Lock()
1✔
339
                p.subscribers[paymentHash] = append(
1✔
340
                        p.subscribers[paymentHash], subscriber,
1✔
341
                )
1✔
342
                p.subscribersMtx.Unlock()
1✔
343
        } else {
2✔
344
                close(subscriber.queue.ChanIn())
1✔
345
        }
1✔
346

347
        return subscriber, nil
1✔
348
}
349

350
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
351
// update with the current state of every inflight payment is always sent out
352
// immediately.
353
// Note: If payments are in-flight while starting a new subscription, the start
354
// of the payment stream could produce out-of-order and/or duplicate events. In
355
// order to get updates for every in-flight payment attempt make sure to
356
// subscribe to this method before initiating any payments.
357
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
1✔
358
        subscriber := newControlTowerSubscriber()
1✔
359

1✔
360
        // Add the subscriber to the list before fetching in-flight payments, so
1✔
361
        // no events are missed. If a payment attempt update occurs after
1✔
362
        // appending and before fetching in-flight payments, an out-of-order
1✔
363
        // duplicate may be produced, because it is then fetched in below call
1✔
364
        // and notified through the subscription.
1✔
365
        p.subscribersMtx.Lock()
1✔
366
        p.subscribersAllPayments[p.subscriberIndex] = subscriber
1✔
367
        p.subscriberIndex++
1✔
368
        p.subscribersMtx.Unlock()
1✔
369

1✔
370
        log.Debugf("Scanning for inflight payments")
1✔
371
        inflightPayments, err := p.db.FetchInFlightPayments()
1✔
372
        if err != nil {
1✔
373
                return nil, err
×
UNCOV
374
        }
×
375
        log.Debugf("Scanning for inflight payments finished",
1✔
376
                len(inflightPayments))
1✔
377

1✔
378
        for index := range inflightPayments {
1✔
UNCOV
379
                // Always write current payment state to the channel.
×
UNCOV
380
                subscriber.queue.ChanIn() <- inflightPayments[index]
×
UNCOV
381
        }
×
382

383
        return subscriber, nil
1✔
384
}
385

386
// notifySubscribers sends a final payment event to all subscribers of this
387
// payment. The channel will be closed after this. Note that this function must
388
// be executed atomically (by means of a lock) with the database update to
389
// guarantee consistency of the notifications.
390
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
391
        event *channeldb.MPPayment) {
1✔
392

1✔
393
        // Get all subscribers for this payment.
1✔
394
        p.subscribersMtx.Lock()
1✔
395

1✔
396
        subscribersPaymentHash, ok := p.subscribers[paymentHash]
1✔
397
        if !ok && len(p.subscribersAllPayments) == 0 {
2✔
398
                p.subscribersMtx.Unlock()
1✔
399
                return
1✔
400
        }
1✔
401

402
        // If the payment reached a terminal state, the subscriber list can be
403
        // cleared. There won't be any more updates.
404
        terminal := event.Terminated()
1✔
405
        if terminal {
2✔
406
                delete(p.subscribers, paymentHash)
1✔
407
        }
1✔
408

409
        // Copy subscribers to all payments locally while holding the lock in
410
        // order to avoid concurrency issues while reading/writing the map.
411
        subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
1✔
412
        for k, v := range p.subscribersAllPayments {
2✔
413
                subscribersAllPayments[k] = v
1✔
414
        }
1✔
415
        p.subscribersMtx.Unlock()
1✔
416

1✔
417
        // Notify all subscribers that subscribed to the current payment hash.
1✔
418
        for _, subscriber := range subscribersPaymentHash {
2✔
419
                select {
1✔
420
                case subscriber.queue.ChanIn() <- event:
1✔
421
                        // If this event is the last, close the incoming channel
1✔
422
                        // of the queue. This will signal the subscriber that
1✔
423
                        // there won't be any more updates.
1✔
424
                        if terminal {
2✔
425
                                close(subscriber.queue.ChanIn())
1✔
426
                        }
1✔
427

428
                // If subscriber disappeared, skip notification. For further
429
                // notifications, we'll keep skipping over this subscriber.
430
                case <-subscriber.quit:
1✔
431
                }
432
        }
433

434
        // Notify all subscribers that subscribed to all payments.
435
        for key, subscriber := range subscribersAllPayments {
2✔
436
                select {
1✔
437
                case subscriber.queue.ChanIn() <- event:
1✔
438

439
                // If subscriber disappeared, remove it from the subscribers
440
                // list.
UNCOV
441
                case <-subscriber.quit:
×
UNCOV
442
                        p.subscribersMtx.Lock()
×
UNCOV
443
                        delete(p.subscribersAllPayments, key)
×
UNCOV
444
                        p.subscribersMtx.Unlock()
×
445
                }
446
        }
447
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc