• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.5
/routing/control_tower.go
1
package routing
2

3
import (
4
        "sync"
5

6
        "github.com/lightningnetwork/lnd/channeldb"
7
        "github.com/lightningnetwork/lnd/lntypes"
8
        "github.com/lightningnetwork/lnd/multimutex"
9
        "github.com/lightningnetwork/lnd/queue"
10
)
11

12
// DBMPPayment is an interface derived from channeldb.MPPayment that is used by
13
// the payment lifecycle.
14
type DBMPPayment interface {
15
        // GetState returns the current state of the payment.
16
        GetState() *channeldb.MPPaymentState
17

18
        // Terminated returns true if the payment is in a final state.
19
        Terminated() bool
20

21
        // GetStatus returns the current status of the payment.
22
        GetStatus() channeldb.PaymentStatus
23

24
        // NeedWaitAttempts specifies whether the payment needs to wait for the
25
        // outcome of an attempt.
26
        NeedWaitAttempts() (bool, error)
27

28
        // GetHTLCs returns all HTLCs of this payment.
29
        GetHTLCs() []channeldb.HTLCAttempt
30

31
        // InFlightHTLCs returns all HTLCs that are in flight.
32
        InFlightHTLCs() []channeldb.HTLCAttempt
33

34
        // AllowMoreAttempts is used to decide whether we can safely attempt
35
        // more HTLCs for a given payment state. Return an error if the payment
36
        // is in an unexpected state.
37
        AllowMoreAttempts() (bool, error)
38

39
        // TerminalInfo returns the settled HTLC attempt or the payment's
40
        // failure reason.
41
        TerminalInfo() (*channeldb.HTLCAttempt, *channeldb.FailureReason)
42
}
43

44
// ControlTower tracks all outgoing payments made, whose primary purpose is to
45
// prevent duplicate payments to the same payment hash. In production, a
46
// persistent implementation is preferred so that tracking can survive across
47
// restarts. Payments are transitioned through various payment states, and the
48
// ControlTower interface provides access to driving the state transitions.
49
type ControlTower interface {
50
        // This method checks that no succeeded payment exist for this payment
51
        // hash.
52
        InitPayment(lntypes.Hash, *channeldb.PaymentCreationInfo) error
53

54
        // DeleteFailedAttempts removes all failed HTLCs from the db. It should
55
        // be called for a given payment whenever all inflight htlcs are
56
        // completed, and the payment has reached a final settled state.
57
        DeleteFailedAttempts(lntypes.Hash) error
58

59
        // RegisterAttempt atomically records the provided HTLCAttemptInfo.
60
        RegisterAttempt(lntypes.Hash, *channeldb.HTLCAttemptInfo) error
61

62
        // SettleAttempt marks the given attempt settled with the preimage. If
63
        // this is a multi shard payment, this might implicitly mean the the
64
        // full payment succeeded.
65
        //
66
        // After invoking this method, InitPayment should always return an
67
        // error to prevent us from making duplicate payments to the same
68
        // payment hash. The provided preimage is atomically saved to the DB
69
        // for record keeping.
70
        SettleAttempt(lntypes.Hash, uint64, *channeldb.HTLCSettleInfo) (
71
                *channeldb.HTLCAttempt, error)
72

73
        // FailAttempt marks the given payment attempt failed.
74
        FailAttempt(lntypes.Hash, uint64, *channeldb.HTLCFailInfo) (
75
                *channeldb.HTLCAttempt, error)
76

77
        // FetchPayment fetches the payment corresponding to the given payment
78
        // hash.
79
        FetchPayment(paymentHash lntypes.Hash) (DBMPPayment, error)
80

81
        // FailPayment transitions a payment into the Failed state, and records
82
        // the ultimate reason the payment failed. Note that this should only
83
        // be called when all active attempts are already failed. After
84
        // invoking this method, InitPayment should return nil on its next call
85
        // for this payment hash, allowing the user to make a subsequent
86
        // payment.
87
        FailPayment(lntypes.Hash, channeldb.FailureReason) error
88

89
        // FetchInFlightPayments returns all payments with status InFlight.
90
        FetchInFlightPayments() ([]*channeldb.MPPayment, error)
91

92
        // SubscribePayment subscribes to updates for the payment with the given
93
        // hash. A first update with the current state of the payment is always
94
        // sent out immediately.
95
        SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
96
                error)
97

98
        // SubscribeAllPayments subscribes to updates for all payments. A first
99
        // update with the current state of every inflight payment is always
100
        // sent out immediately.
101
        SubscribeAllPayments() (ControlTowerSubscriber, error)
102
}
103

104
// ControlTowerSubscriber contains the state for a payment update subscriber.
105
type ControlTowerSubscriber interface {
106
        // Updates is the channel over which *channeldb.MPPayment updates can be
107
        // received.
108
        Updates() <-chan interface{}
109

110
        // Close signals that the subscriber is no longer interested in updates.
111
        Close()
112
}
113

114
// ControlTowerSubscriberImpl contains the state for a payment update
115
// subscriber.
116
type controlTowerSubscriberImpl struct {
117
        updates <-chan interface{}
118
        queue   *queue.ConcurrentQueue
119
        quit    chan struct{}
120
}
121

122
// newControlTowerSubscriber instantiates a new subscriber state object.
123
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
3✔
124
        // Create a queue for payment updates.
3✔
125
        queue := queue.NewConcurrentQueue(20)
3✔
126
        queue.Start()
3✔
127

3✔
128
        return &controlTowerSubscriberImpl{
3✔
129
                updates: queue.ChanOut(),
3✔
130
                queue:   queue,
3✔
131
                quit:    make(chan struct{}),
3✔
132
        }
3✔
133
}
3✔
134

135
// Close signals that the subscriber is no longer interested in updates.
136
func (s *controlTowerSubscriberImpl) Close() {
3✔
137
        // Close quit channel so that any pending writes to the queue are
3✔
138
        // cancelled.
3✔
139
        close(s.quit)
3✔
140

3✔
141
        // Stop the queue goroutine so that it won't leak.
3✔
142
        s.queue.Stop()
3✔
143
}
3✔
144

145
// Updates is the channel over which *channeldb.MPPayment updates can be
146
// received.
147
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
3✔
148
        return s.updates
3✔
149
}
3✔
150

151
// controlTower is persistent implementation of ControlTower to restrict
152
// double payment sending.
153
type controlTower struct {
154
        db *channeldb.PaymentControl
155

156
        // subscriberIndex is used to provide a unique id for each subscriber
157
        // to all payments. This is used to easily remove the subscriber when
158
        // necessary.
159
        subscriberIndex        uint64
160
        subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
161
        subscribers            map[lntypes.Hash][]*controlTowerSubscriberImpl
162
        subscribersMtx         sync.Mutex
163

164
        // paymentsMtx provides synchronization on the payment level to ensure
165
        // that no race conditions occur in between updating the database and
166
        // sending a notification.
167
        paymentsMtx *multimutex.Mutex[lntypes.Hash]
168
}
169

170
// NewControlTower creates a new instance of the controlTower.
171
func NewControlTower(db *channeldb.PaymentControl) ControlTower {
3✔
172
        return &controlTower{
3✔
173
                db: db,
3✔
174
                subscribersAllPayments: make(
3✔
175
                        map[uint64]*controlTowerSubscriberImpl,
3✔
176
                ),
3✔
177
                subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
3✔
178
                paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
3✔
179
        }
3✔
180
}
3✔
181

182
// InitPayment checks or records the given PaymentCreationInfo with the DB,
183
// making sure it does not already exist as an in-flight payment. Then this
184
// method returns successfully, the payment is guaranteed to be in the
185
// Initiated state.
186
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
187
        info *channeldb.PaymentCreationInfo) error {
3✔
188

3✔
189
        err := p.db.InitPayment(paymentHash, info)
3✔
190
        if err != nil {
6✔
191
                return err
3✔
192
        }
3✔
193

194
        // Take lock before querying the db to prevent missing or duplicating
195
        // an update.
196
        p.paymentsMtx.Lock(paymentHash)
3✔
197
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
198

3✔
199
        payment, err := p.db.FetchPayment(paymentHash)
3✔
200
        if err != nil {
3✔
201
                return err
×
202
        }
×
203

204
        p.notifySubscribers(paymentHash, payment)
3✔
205

3✔
206
        return nil
3✔
207
}
208

209
// DeleteFailedAttempts deletes all failed htlcs if the payment was
210
// successfully settled.
211
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
3✔
212
        return p.db.DeleteFailedAttempts(paymentHash)
3✔
213
}
3✔
214

215
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
216
// DB.
217
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
218
        attempt *channeldb.HTLCAttemptInfo) error {
3✔
219

3✔
220
        p.paymentsMtx.Lock(paymentHash)
3✔
221
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
222

3✔
223
        payment, err := p.db.RegisterAttempt(paymentHash, attempt)
3✔
224
        if err != nil {
3✔
225
                return err
×
226
        }
×
227

228
        // Notify subscribers of the attempt registration.
229
        p.notifySubscribers(paymentHash, payment)
3✔
230

3✔
231
        return nil
3✔
232
}
233

234
// SettleAttempt marks the given attempt settled with the preimage. If
235
// this is a multi shard payment, this might implicitly mean the the
236
// full payment succeeded.
237
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
238
        attemptID uint64, settleInfo *channeldb.HTLCSettleInfo) (
239
        *channeldb.HTLCAttempt, error) {
3✔
240

3✔
241
        p.paymentsMtx.Lock(paymentHash)
3✔
242
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
243

3✔
244
        payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
3✔
245
        if err != nil {
3✔
246
                return nil, err
×
247
        }
×
248

249
        // Notify subscribers of success event.
250
        p.notifySubscribers(paymentHash, payment)
3✔
251

3✔
252
        return payment.GetAttempt(attemptID)
3✔
253
}
254

255
// FailAttempt marks the given payment attempt failed.
256
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
257
        attemptID uint64, failInfo *channeldb.HTLCFailInfo) (
258
        *channeldb.HTLCAttempt, error) {
3✔
259

3✔
260
        p.paymentsMtx.Lock(paymentHash)
3✔
261
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
262

3✔
263
        payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
3✔
264
        if err != nil {
3✔
265
                return nil, err
×
266
        }
×
267

268
        // Notify subscribers of failed attempt.
269
        p.notifySubscribers(paymentHash, payment)
3✔
270

3✔
271
        return payment.GetAttempt(attemptID)
3✔
272
}
273

274
// FetchPayment fetches the payment corresponding to the given payment hash.
275
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
276
        DBMPPayment, error) {
3✔
277

3✔
278
        return p.db.FetchPayment(paymentHash)
3✔
279
}
3✔
280

281
// FailPayment transitions a payment into the Failed state, and records the
282
// reason the payment failed. After invoking this method, InitPayment should
283
// return nil on its next call for this payment hash, allowing the switch to
284
// make a subsequent payment.
285
//
286
// NOTE: This method will overwrite the failure reason if the payment is already
287
// failed.
288
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
289
        reason channeldb.FailureReason) error {
3✔
290

3✔
291
        p.paymentsMtx.Lock(paymentHash)
3✔
292
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
293

3✔
294
        payment, err := p.db.Fail(paymentHash, reason)
3✔
295
        if err != nil {
3✔
296
                return err
×
297
        }
×
298

299
        // Notify subscribers of fail event.
300
        p.notifySubscribers(paymentHash, payment)
3✔
301

3✔
302
        return nil
3✔
303
}
304

305
// FetchInFlightPayments returns all payments with status InFlight.
306
func (p *controlTower) FetchInFlightPayments() ([]*channeldb.MPPayment, error) {
3✔
307
        return p.db.FetchInFlightPayments()
3✔
308
}
3✔
309

310
// SubscribePayment subscribes to updates for the payment with the given hash. A
311
// first update with the current state of the payment is always sent out
312
// immediately.
313
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
314
        ControlTowerSubscriber, error) {
3✔
315

3✔
316
        // Take lock before querying the db to prevent missing or duplicating an
3✔
317
        // update.
3✔
318
        p.paymentsMtx.Lock(paymentHash)
3✔
319
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
320

3✔
321
        payment, err := p.db.FetchPayment(paymentHash)
3✔
322
        if err != nil {
3✔
UNCOV
323
                return nil, err
×
UNCOV
324
        }
×
325

326
        subscriber := newControlTowerSubscriber()
3✔
327

3✔
328
        // Always write current payment state to the channel.
3✔
329
        subscriber.queue.ChanIn() <- payment
3✔
330

3✔
331
        // Payment is currently in flight. Register this subscriber for further
3✔
332
        // updates. Otherwise this update is the final update and the incoming
3✔
333
        // channel can be closed. This will close the queue's outgoing channel
3✔
334
        // when all updates have been written.
3✔
335
        if !payment.Terminated() {
6✔
336
                p.subscribersMtx.Lock()
3✔
337
                p.subscribers[paymentHash] = append(
3✔
338
                        p.subscribers[paymentHash], subscriber,
3✔
339
                )
3✔
340
                p.subscribersMtx.Unlock()
3✔
341
        } else {
6✔
342
                close(subscriber.queue.ChanIn())
3✔
343
        }
3✔
344

345
        return subscriber, nil
3✔
346
}
347

348
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
349
// update with the current state of every inflight payment is always sent out
350
// immediately.
351
// Note: If payments are in-flight while starting a new subscription, the start
352
// of the payment stream could produce out-of-order and/or duplicate events. In
353
// order to get updates for every in-flight payment attempt make sure to
354
// subscribe to this method before initiating any payments.
355
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
3✔
356
        subscriber := newControlTowerSubscriber()
3✔
357

3✔
358
        // Add the subscriber to the list before fetching in-flight payments, so
3✔
359
        // no events are missed. If a payment attempt update occurs after
3✔
360
        // appending and before fetching in-flight payments, an out-of-order
3✔
361
        // duplicate may be produced, because it is then fetched in below call
3✔
362
        // and notified through the subscription.
3✔
363
        p.subscribersMtx.Lock()
3✔
364
        p.subscribersAllPayments[p.subscriberIndex] = subscriber
3✔
365
        p.subscriberIndex++
3✔
366
        p.subscribersMtx.Unlock()
3✔
367

3✔
368
        log.Debugf("Scanning for inflight payments")
3✔
369
        inflightPayments, err := p.db.FetchInFlightPayments()
3✔
370
        if err != nil {
3✔
371
                return nil, err
×
372
        }
×
373
        log.Debugf("Scanning for inflight payments finished",
3✔
374
                len(inflightPayments))
3✔
375

3✔
376
        for index := range inflightPayments {
3✔
UNCOV
377
                // Always write current payment state to the channel.
×
UNCOV
378
                subscriber.queue.ChanIn() <- inflightPayments[index]
×
UNCOV
379
        }
×
380

381
        return subscriber, nil
3✔
382
}
383

384
// notifySubscribers sends a final payment event to all subscribers of this
385
// payment. The channel will be closed after this. Note that this function must
386
// be executed atomically (by means of a lock) with the database update to
387
// guarantee consistency of the notifications.
388
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
389
        event *channeldb.MPPayment) {
3✔
390

3✔
391
        // Get all subscribers for this payment.
3✔
392
        p.subscribersMtx.Lock()
3✔
393

3✔
394
        subscribersPaymentHash, ok := p.subscribers[paymentHash]
3✔
395
        if !ok && len(p.subscribersAllPayments) == 0 {
6✔
396
                p.subscribersMtx.Unlock()
3✔
397
                return
3✔
398
        }
3✔
399

400
        // If the payment reached a terminal state, the subscriber list can be
401
        // cleared. There won't be any more updates.
402
        terminal := event.Terminated()
3✔
403
        if terminal {
6✔
404
                delete(p.subscribers, paymentHash)
3✔
405
        }
3✔
406

407
        // Copy subscribers to all payments locally while holding the lock in
408
        // order to avoid concurrency issues while reading/writing the map.
409
        subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
3✔
410
        for k, v := range p.subscribersAllPayments {
6✔
411
                subscribersAllPayments[k] = v
3✔
412
        }
3✔
413
        p.subscribersMtx.Unlock()
3✔
414

3✔
415
        // Notify all subscribers that subscribed to the current payment hash.
3✔
416
        for _, subscriber := range subscribersPaymentHash {
6✔
417
                select {
3✔
418
                case subscriber.queue.ChanIn() <- event:
3✔
419
                        // If this event is the last, close the incoming channel
3✔
420
                        // of the queue. This will signal the subscriber that
3✔
421
                        // there won't be any more updates.
3✔
422
                        if terminal {
6✔
423
                                close(subscriber.queue.ChanIn())
3✔
424
                        }
3✔
425

426
                // If subscriber disappeared, skip notification. For further
427
                // notifications, we'll keep skipping over this subscriber.
428
                case <-subscriber.quit:
3✔
429
                }
430
        }
431

432
        // Notify all subscribers that subscribed to all payments.
433
        for key, subscriber := range subscribersAllPayments {
6✔
434
                select {
3✔
435
                case subscriber.queue.ChanIn() <- event:
3✔
436

437
                // If subscriber disappeared, remove it from the subscribers
438
                // list.
UNCOV
439
                case <-subscriber.quit:
×
UNCOV
440
                        p.subscribersMtx.Lock()
×
UNCOV
441
                        delete(p.subscribersAllPayments, key)
×
UNCOV
442
                        p.subscribersMtx.Unlock()
×
443
                }
444
        }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc