• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16811814134

07 Aug 2025 05:46PM UTC coverage: 57.463% (-9.5%) from 66.947%
16811814134

Pull #9844

github

web-flow
Merge 4b08ee16d into 2269859d9
Pull Request #9844: Refactor Payment PR 3

434 of 645 new or added lines in 17 files covered. (67.29%)

28260 existing lines in 457 files now uncovered.

99053 of 172378 relevant lines covered (57.46%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.5
/routing/control_tower.go
1
package routing
2

3
import (
4
        "sync"
5

6
        "github.com/lightningnetwork/lnd/channeldb"
7
        "github.com/lightningnetwork/lnd/lntypes"
8
        "github.com/lightningnetwork/lnd/multimutex"
9
        paymentsdb "github.com/lightningnetwork/lnd/payments/db"
10
        "github.com/lightningnetwork/lnd/queue"
11
)
12

13
// DBMPPayment is an interface derived from paymentsdb.MPPayment that is used by
14
// the payment lifecycle.
15
type DBMPPayment interface {
16
        // GetState returns the current state of the payment.
17
        GetState() *paymentsdb.MPPaymentState
18

19
        // Terminated returns true if the payment is in a final state.
20
        Terminated() bool
21

22
        // GetStatus returns the current status of the payment.
23
        GetStatus() paymentsdb.PaymentStatus
24

25
        // NeedWaitAttempts specifies whether the payment needs to wait for the
26
        // outcome of an attempt.
27
        NeedWaitAttempts() (bool, error)
28

29
        // GetHTLCs returns all HTLCs of this payment.
30
        GetHTLCs() []paymentsdb.HTLCAttempt
31

32
        // InFlightHTLCs returns all HTLCs that are in flight.
33
        InFlightHTLCs() []paymentsdb.HTLCAttempt
34

35
        // AllowMoreAttempts is used to decide whether we can safely attempt
36
        // more HTLCs for a given payment state. Return an error if the payment
37
        // is in an unexpected state.
38
        AllowMoreAttempts() (bool, error)
39

40
        // TerminalInfo returns the settled HTLC attempt or the payment's
41
        // failure reason.
42
        TerminalInfo() (*paymentsdb.HTLCAttempt, *paymentsdb.FailureReason)
43
}
44

45
// ControlTower tracks all outgoing payments made, whose primary purpose is to
46
// prevent duplicate payments to the same payment hash. In production, a
47
// persistent implementation is preferred so that tracking can survive across
48
// restarts. Payments are transitioned through various payment states, and the
49
// ControlTower interface provides access to driving the state transitions.
50
type ControlTower interface {
51
        // This method checks that no succeeded payment exist for this payment
52
        // hash.
53
        InitPayment(lntypes.Hash, *paymentsdb.PaymentCreationInfo) error
54

55
        // DeleteFailedAttempts removes all failed HTLCs from the db. It should
56
        // be called for a given payment whenever all inflight htlcs are
57
        // completed, and the payment has reached a final settled state.
58
        DeleteFailedAttempts(lntypes.Hash) error
59

60
        // RegisterAttempt atomically records the provided HTLCAttemptInfo.
61
        RegisterAttempt(lntypes.Hash, *paymentsdb.HTLCAttemptInfo) error
62

63
        // SettleAttempt marks the given attempt settled with the preimage. If
64
        // this is a multi shard payment, this might implicitly mean the the
65
        // full payment succeeded.
66
        //
67
        // After invoking this method, InitPayment should always return an
68
        // error to prevent us from making duplicate payments to the same
69
        // payment hash. The provided preimage is atomically saved to the DB
70
        // for record keeping.
71
        SettleAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCSettleInfo) (
72
                *paymentsdb.HTLCAttempt, error)
73

74
        // FailAttempt marks the given payment attempt failed.
75
        FailAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCFailInfo) (
76
                *paymentsdb.HTLCAttempt, error)
77

78
        // FetchPayment fetches the payment corresponding to the given payment
79
        // hash.
80
        FetchPayment(paymentHash lntypes.Hash) (DBMPPayment, error)
81

82
        // FailPayment transitions a payment into the Failed state, and records
83
        // the ultimate reason the payment failed. Note that this should only
84
        // be called when all active attempts are already failed. After
85
        // invoking this method, InitPayment should return nil on its next call
86
        // for this payment hash, allowing the user to make a subsequent
87
        // payment.
88
        FailPayment(lntypes.Hash, paymentsdb.FailureReason) error
89

90
        // FetchInFlightPayments returns all payments with status InFlight.
91
        FetchInFlightPayments() ([]*paymentsdb.MPPayment, error)
92

93
        // SubscribePayment subscribes to updates for the payment with the given
94
        // hash. A first update with the current state of the payment is always
95
        // sent out immediately.
96
        SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
97
                error)
98

99
        // SubscribeAllPayments subscribes to updates for all payments. A first
100
        // update with the current state of every inflight payment is always
101
        // sent out immediately.
102
        SubscribeAllPayments() (ControlTowerSubscriber, error)
103
}
104

105
// ControlTowerSubscriber contains the state for a payment update subscriber.
106
type ControlTowerSubscriber interface {
107
        // Updates is the channel over which *paymentsdb.MPPayment updates can be
108
        // received.
109
        Updates() <-chan interface{}
110

111
        // Close signals that the subscriber is no longer interested in updates.
112
        Close()
113
}
114

115
// ControlTowerSubscriberImpl contains the state for a payment update
116
// subscriber.
117
type controlTowerSubscriberImpl struct {
118
        updates <-chan interface{}
119
        queue   *queue.ConcurrentQueue
120
        quit    chan struct{}
121
}
122

123
// newControlTowerSubscriber instantiates a new subscriber state object.
124
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
3✔
125
        // Create a queue for payment updates.
3✔
126
        queue := queue.NewConcurrentQueue(20)
3✔
127
        queue.Start()
3✔
128

3✔
129
        return &controlTowerSubscriberImpl{
3✔
130
                updates: queue.ChanOut(),
3✔
131
                queue:   queue,
3✔
132
                quit:    make(chan struct{}),
3✔
133
        }
3✔
134
}
3✔
135

136
// Close signals that the subscriber is no longer interested in updates.
137
func (s *controlTowerSubscriberImpl) Close() {
3✔
138
        // Close quit channel so that any pending writes to the queue are
3✔
139
        // cancelled.
3✔
140
        close(s.quit)
3✔
141

3✔
142
        // Stop the queue goroutine so that it won't leak.
3✔
143
        s.queue.Stop()
3✔
144
}
3✔
145

146
// Updates is the channel over which *paymentsdb.MPPayment updates can be
147
// received.
148
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
3✔
149
        return s.updates
3✔
150
}
3✔
151

152
// controlTower is persistent implementation of ControlTower to restrict
153
// double payment sending.
154
type controlTower struct {
155
        db *channeldb.KVPaymentsDB
156

157
        // subscriberIndex is used to provide a unique id for each subscriber
158
        // to all payments. This is used to easily remove the subscriber when
159
        // necessary.
160
        subscriberIndex        uint64
161
        subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
162
        subscribers            map[lntypes.Hash][]*controlTowerSubscriberImpl
163
        subscribersMtx         sync.Mutex
164

165
        // paymentsMtx provides synchronization on the payment level to ensure
166
        // that no race conditions occur in between updating the database and
167
        // sending a notification.
168
        paymentsMtx *multimutex.Mutex[lntypes.Hash]
169
}
170

171
// NewControlTower creates a new instance of the controlTower.
172
func NewControlTower(db *channeldb.KVPaymentsDB) ControlTower {
3✔
173
        return &controlTower{
3✔
174
                db: db,
3✔
175
                subscribersAllPayments: make(
3✔
176
                        map[uint64]*controlTowerSubscriberImpl,
3✔
177
                ),
3✔
178
                subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
3✔
179
                paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
3✔
180
        }
3✔
181
}
3✔
182

183
// InitPayment checks or records the given PaymentCreationInfo with the DB,
184
// making sure it does not already exist as an in-flight payment. Then this
185
// method returns successfully, the payment is guaranteed to be in the
186
// Initiated state.
187
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
188
        info *paymentsdb.PaymentCreationInfo) error {
3✔
189

3✔
190
        err := p.db.InitPayment(paymentHash, info)
3✔
191
        if err != nil {
6✔
192
                return err
3✔
193
        }
3✔
194

195
        // Take lock before querying the db to prevent missing or duplicating
196
        // an update.
197
        p.paymentsMtx.Lock(paymentHash)
3✔
198
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
199

3✔
200
        payment, err := p.db.FetchPayment(paymentHash)
3✔
201
        if err != nil {
3✔
202
                return err
×
203
        }
×
204

205
        p.notifySubscribers(paymentHash, payment)
3✔
206

3✔
207
        return nil
3✔
208
}
209

210
// DeleteFailedAttempts deletes all failed htlcs if the payment was
211
// successfully settled.
212
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
3✔
213
        return p.db.DeleteFailedAttempts(paymentHash)
3✔
214
}
3✔
215

216
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
217
// DB.
218
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
219
        attempt *paymentsdb.HTLCAttemptInfo) error {
3✔
220

3✔
221
        p.paymentsMtx.Lock(paymentHash)
3✔
222
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
223

3✔
224
        payment, err := p.db.RegisterAttempt(paymentHash, attempt)
3✔
225
        if err != nil {
3✔
226
                return err
×
227
        }
×
228

229
        // Notify subscribers of the attempt registration.
230
        p.notifySubscribers(paymentHash, payment)
3✔
231

3✔
232
        return nil
3✔
233
}
234

235
// SettleAttempt marks the given attempt settled with the preimage. If
236
// this is a multi shard payment, this might implicitly mean the the
237
// full payment succeeded.
238
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
239
        attemptID uint64, settleInfo *paymentsdb.HTLCSettleInfo) (
240
        *paymentsdb.HTLCAttempt, error) {
3✔
241

3✔
242
        p.paymentsMtx.Lock(paymentHash)
3✔
243
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
244

3✔
245
        payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
3✔
246
        if err != nil {
3✔
247
                return nil, err
×
248
        }
×
249

250
        // Notify subscribers of success event.
251
        p.notifySubscribers(paymentHash, payment)
3✔
252

3✔
253
        return payment.GetAttempt(attemptID)
3✔
254
}
255

256
// FailAttempt marks the given payment attempt failed.
257
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
258
        attemptID uint64, failInfo *paymentsdb.HTLCFailInfo) (
259
        *paymentsdb.HTLCAttempt, error) {
3✔
260

3✔
261
        p.paymentsMtx.Lock(paymentHash)
3✔
262
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
263

3✔
264
        payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
3✔
265
        if err != nil {
3✔
266
                return nil, err
×
267
        }
×
268

269
        // Notify subscribers of failed attempt.
270
        p.notifySubscribers(paymentHash, payment)
3✔
271

3✔
272
        return payment.GetAttempt(attemptID)
3✔
273
}
274

275
// FetchPayment fetches the payment corresponding to the given payment hash.
276
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
277
        DBMPPayment, error) {
3✔
278

3✔
279
        return p.db.FetchPayment(paymentHash)
3✔
280
}
3✔
281

282
// FailPayment transitions a payment into the Failed state, and records the
283
// reason the payment failed. After invoking this method, InitPayment should
284
// return nil on its next call for this payment hash, allowing the switch to
285
// make a subsequent payment.
286
//
287
// NOTE: This method will overwrite the failure reason if the payment is already
288
// failed.
289
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
290
        reason paymentsdb.FailureReason) error {
3✔
291

3✔
292
        p.paymentsMtx.Lock(paymentHash)
3✔
293
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
294

3✔
295
        payment, err := p.db.Fail(paymentHash, reason)
3✔
296
        if err != nil {
3✔
297
                return err
×
298
        }
×
299

300
        // Notify subscribers of fail event.
301
        p.notifySubscribers(paymentHash, payment)
3✔
302

3✔
303
        return nil
3✔
304
}
305

306
// FetchInFlightPayments returns all payments with status InFlight.
307
func (p *controlTower) FetchInFlightPayments() ([]*paymentsdb.MPPayment, error) {
3✔
308
        return p.db.FetchInFlightPayments()
3✔
309
}
3✔
310

311
// SubscribePayment subscribes to updates for the payment with the given hash. A
312
// first update with the current state of the payment is always sent out
313
// immediately.
314
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
315
        ControlTowerSubscriber, error) {
3✔
316

3✔
317
        // Take lock before querying the db to prevent missing or duplicating an
3✔
318
        // update.
3✔
319
        p.paymentsMtx.Lock(paymentHash)
3✔
320
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
321

3✔
322
        payment, err := p.db.FetchPayment(paymentHash)
3✔
323
        if err != nil {
3✔
UNCOV
324
                return nil, err
×
UNCOV
325
        }
×
326

327
        subscriber := newControlTowerSubscriber()
3✔
328

3✔
329
        // Always write current payment state to the channel.
3✔
330
        subscriber.queue.ChanIn() <- payment
3✔
331

3✔
332
        // Payment is currently in flight. Register this subscriber for further
3✔
333
        // updates. Otherwise this update is the final update and the incoming
3✔
334
        // channel can be closed. This will close the queue's outgoing channel
3✔
335
        // when all updates have been written.
3✔
336
        if !payment.Terminated() {
6✔
337
                p.subscribersMtx.Lock()
3✔
338
                p.subscribers[paymentHash] = append(
3✔
339
                        p.subscribers[paymentHash], subscriber,
3✔
340
                )
3✔
341
                p.subscribersMtx.Unlock()
3✔
342
        } else {
6✔
343
                close(subscriber.queue.ChanIn())
3✔
344
        }
3✔
345

346
        return subscriber, nil
3✔
347
}
348

349
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
350
// update with the current state of every inflight payment is always sent out
351
// immediately.
352
// Note: If payments are in-flight while starting a new subscription, the start
353
// of the payment stream could produce out-of-order and/or duplicate events. In
354
// order to get updates for every in-flight payment attempt make sure to
355
// subscribe to this method before initiating any payments.
356
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
3✔
357
        subscriber := newControlTowerSubscriber()
3✔
358

3✔
359
        // Add the subscriber to the list before fetching in-flight payments, so
3✔
360
        // no events are missed. If a payment attempt update occurs after
3✔
361
        // appending and before fetching in-flight payments, an out-of-order
3✔
362
        // duplicate may be produced, because it is then fetched in below call
3✔
363
        // and notified through the subscription.
3✔
364
        p.subscribersMtx.Lock()
3✔
365
        p.subscribersAllPayments[p.subscriberIndex] = subscriber
3✔
366
        p.subscriberIndex++
3✔
367
        p.subscribersMtx.Unlock()
3✔
368

3✔
369
        log.Debugf("Scanning for inflight payments")
3✔
370
        inflightPayments, err := p.db.FetchInFlightPayments()
3✔
371
        if err != nil {
3✔
372
                return nil, err
×
373
        }
×
374
        log.Debugf("Scanning for inflight payments finished",
3✔
375
                len(inflightPayments))
3✔
376

3✔
377
        for index := range inflightPayments {
3✔
UNCOV
378
                // Always write current payment state to the channel.
×
UNCOV
379
                subscriber.queue.ChanIn() <- inflightPayments[index]
×
UNCOV
380
        }
×
381

382
        return subscriber, nil
3✔
383
}
384

385
// notifySubscribers sends a final payment event to all subscribers of this
386
// payment. The channel will be closed after this. Note that this function must
387
// be executed atomically (by means of a lock) with the database update to
388
// guarantee consistency of the notifications.
389
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
390
        event *paymentsdb.MPPayment) {
3✔
391

3✔
392
        // Get all subscribers for this payment.
3✔
393
        p.subscribersMtx.Lock()
3✔
394

3✔
395
        subscribersPaymentHash, ok := p.subscribers[paymentHash]
3✔
396
        if !ok && len(p.subscribersAllPayments) == 0 {
6✔
397
                p.subscribersMtx.Unlock()
3✔
398
                return
3✔
399
        }
3✔
400

401
        // If the payment reached a terminal state, the subscriber list can be
402
        // cleared. There won't be any more updates.
403
        terminal := event.Terminated()
3✔
404
        if terminal {
6✔
405
                delete(p.subscribers, paymentHash)
3✔
406
        }
3✔
407

408
        // Copy subscribers to all payments locally while holding the lock in
409
        // order to avoid concurrency issues while reading/writing the map.
410
        subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
3✔
411
        for k, v := range p.subscribersAllPayments {
6✔
412
                subscribersAllPayments[k] = v
3✔
413
        }
3✔
414
        p.subscribersMtx.Unlock()
3✔
415

3✔
416
        // Notify all subscribers that subscribed to the current payment hash.
3✔
417
        for _, subscriber := range subscribersPaymentHash {
6✔
418
                select {
3✔
419
                case subscriber.queue.ChanIn() <- event:
3✔
420
                        // If this event is the last, close the incoming channel
3✔
421
                        // of the queue. This will signal the subscriber that
3✔
422
                        // there won't be any more updates.
3✔
423
                        if terminal {
6✔
424
                                close(subscriber.queue.ChanIn())
3✔
425
                        }
3✔
426

427
                // If subscriber disappeared, skip notification. For further
428
                // notifications, we'll keep skipping over this subscriber.
429
                case <-subscriber.quit:
3✔
430
                }
431
        }
432

433
        // Notify all subscribers that subscribed to all payments.
434
        for key, subscriber := range subscribersAllPayments {
6✔
435
                select {
3✔
436
                case subscriber.queue.ChanIn() <- event:
3✔
437

438
                // If subscriber disappeared, remove it from the subscribers
439
                // list.
UNCOV
440
                case <-subscriber.quit:
×
UNCOV
441
                        p.subscribersMtx.Lock()
×
UNCOV
442
                        delete(p.subscribersAllPayments, key)
×
UNCOV
443
                        p.subscribersMtx.Unlock()
×
444
                }
445
        }
446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc