• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17101605539

20 Aug 2025 02:35PM UTC coverage: 57.321% (-9.4%) from 66.68%
17101605539

push

github

web-flow
Merge pull request #10102 from yyforyongyu/fix-UpdatesInHorizon

Catch bad gossip peer and fix `UpdatesInHorizon`

28 of 89 new or added lines in 4 files covered. (31.46%)

29163 existing lines in 459 files now uncovered.

99187 of 173038 relevant lines covered (57.32%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.55
/routing/control_tower.go
1
package routing
2

3
import (
4
        "sync"
5

6
        "github.com/lightningnetwork/lnd/lntypes"
7
        "github.com/lightningnetwork/lnd/multimutex"
8
        paymentsdb "github.com/lightningnetwork/lnd/payments/db"
9
        "github.com/lightningnetwork/lnd/queue"
10
)
11

12
// ControlTower tracks all outgoing payments made, whose primary purpose is to
13
// prevent duplicate payments to the same payment hash. In production, a
14
// persistent implementation is preferred so that tracking can survive across
15
// restarts. Payments are transitioned through various payment states, and the
16
// ControlTower interface provides access to driving the state transitions.
17
type ControlTower interface {
18
        // InitPayment initializes a new payment with the given payment hash and
19
        // also notifies subscribers of the payment creation.
20
        //
21
        // NOTE: Subscribers should be notified by the new state of the payment.
22
        InitPayment(lntypes.Hash, *paymentsdb.PaymentCreationInfo) error
23

24
        // DeleteFailedAttempts removes all failed HTLCs from the db. It should
25
        // be called for a given payment whenever all inflight htlcs are
26
        // completed, and the payment has reached a final settled state.
27
        DeleteFailedAttempts(lntypes.Hash) error
28

29
        // RegisterAttempt atomically records the provided HTLCAttemptInfo.
30
        //
31
        // NOTE: Subscribers should be notified by the new state of the payment.
32
        RegisterAttempt(lntypes.Hash, *paymentsdb.HTLCAttemptInfo) error
33

34
        // SettleAttempt marks the given attempt settled with the preimage. If
35
        // this is a multi shard payment, this might implicitly mean the the
36
        // full payment succeeded.
37
        //
38
        // After invoking this method, InitPayment should always return an
39
        // error to prevent us from making duplicate payments to the same
40
        // payment hash. The provided preimage is atomically saved to the DB
41
        // for record keeping.
42
        //
43
        // NOTE: Subscribers should be notified by the new state of the payment.
44
        SettleAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCSettleInfo) (
45
                *paymentsdb.HTLCAttempt, error)
46

47
        // FailAttempt marks the given payment attempt failed.
48
        //
49
        // NOTE: Subscribers should be notified by the new state of the payment.
50
        FailAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCFailInfo) (
51
                *paymentsdb.HTLCAttempt, error)
52

53
        // FetchPayment fetches the payment corresponding to the given payment
54
        // hash.
55
        FetchPayment(paymentHash lntypes.Hash) (paymentsdb.DBMPPayment, error)
56

57
        // FailPayment transitions a payment into the Failed state, and records
58
        // the ultimate reason the payment failed. Note that this should only
59
        // be called when all active attempts are already failed. After
60
        // invoking this method, InitPayment should return nil on its next call
61
        // for this payment hash, allowing the user to make a subsequent
62
        // payment.
63
        //
64
        // NOTE: Subscribers should be notified by the new state of the payment.
65
        FailPayment(lntypes.Hash, paymentsdb.FailureReason) error
66

67
        // FetchInFlightPayments returns all payments with status InFlight.
68
        FetchInFlightPayments() ([]*paymentsdb.MPPayment, error)
69

70
        // SubscribePayment subscribes to updates for the payment with the given
71
        // hash. A first update with the current state of the payment is always
72
        // sent out immediately.
73
        SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
74
                error)
75

76
        // SubscribeAllPayments subscribes to updates for all payments. A first
77
        // update with the current state of every inflight payment is always
78
        // sent out immediately.
79
        SubscribeAllPayments() (ControlTowerSubscriber, error)
80
}
81

82
// ControlTowerSubscriber contains the state for a payment update subscriber.
83
type ControlTowerSubscriber interface {
84
        // Updates is the channel over which *channeldb.MPPayment updates can be
85
        // received.
86
        Updates() <-chan interface{}
87

88
        // Close signals that the subscriber is no longer interested in updates.
89
        Close()
90
}
91

92
// ControlTowerSubscriberImpl contains the state for a payment update
93
// subscriber.
94
type controlTowerSubscriberImpl struct {
95
        updates <-chan interface{}
96
        queue   *queue.ConcurrentQueue
97
        quit    chan struct{}
98
}
99

100
// newControlTowerSubscriber instantiates a new subscriber state object.
101
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
3✔
102
        // Create a queue for payment updates.
3✔
103
        queue := queue.NewConcurrentQueue(20)
3✔
104
        queue.Start()
3✔
105

3✔
106
        return &controlTowerSubscriberImpl{
3✔
107
                updates: queue.ChanOut(),
3✔
108
                queue:   queue,
3✔
109
                quit:    make(chan struct{}),
3✔
110
        }
3✔
111
}
3✔
112

113
// Close signals that the subscriber is no longer interested in updates.
114
func (s *controlTowerSubscriberImpl) Close() {
3✔
115
        // Close quit channel so that any pending writes to the queue are
3✔
116
        // cancelled.
3✔
117
        close(s.quit)
3✔
118

3✔
119
        // Stop the queue goroutine so that it won't leak.
3✔
120
        s.queue.Stop()
3✔
121
}
3✔
122

123
// Updates is the channel over which *channeldb.MPPayment updates can be
124
// received.
125
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
3✔
126
        return s.updates
3✔
127
}
3✔
128

129
// controlTower is persistent implementation of ControlTower to restrict
130
// double payment sending.
131
type controlTower struct {
132
        db paymentsdb.DB
133

134
        // subscriberIndex is used to provide a unique id for each subscriber
135
        // to all payments. This is used to easily remove the subscriber when
136
        // necessary.
137
        subscriberIndex        uint64
138
        subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
139
        subscribers            map[lntypes.Hash][]*controlTowerSubscriberImpl
140
        subscribersMtx         sync.Mutex
141

142
        // paymentsMtx provides synchronization on the payment level to ensure
143
        // that no race conditions occur in between updating the database and
144
        // sending a notification.
145
        paymentsMtx *multimutex.Mutex[lntypes.Hash]
146
}
147

148
// NewControlTower creates a new instance of the controlTower.
149
func NewControlTower(db paymentsdb.DB) ControlTower {
3✔
150
        return &controlTower{
3✔
151
                db: db,
3✔
152
                subscribersAllPayments: make(
3✔
153
                        map[uint64]*controlTowerSubscriberImpl,
3✔
154
                ),
3✔
155
                subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
3✔
156
                paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
3✔
157
        }
3✔
158
}
3✔
159

160
// InitPayment checks or records the given PaymentCreationInfo with the DB,
161
// making sure it does not already exist as an in-flight payment. Then this
162
// method returns successfully, the payment is guaranteed to be in the
163
// Initiated state.
164
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
165
        info *paymentsdb.PaymentCreationInfo) error {
3✔
166

3✔
167
        err := p.db.InitPayment(paymentHash, info)
3✔
168
        if err != nil {
6✔
169
                return err
3✔
170
        }
3✔
171

172
        // Take lock before querying the db to prevent missing or duplicating
173
        // an update.
174
        p.paymentsMtx.Lock(paymentHash)
3✔
175
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
176

3✔
177
        payment, err := p.db.FetchPayment(paymentHash)
3✔
178
        if err != nil {
3✔
UNCOV
179
                return err
×
UNCOV
180
        }
×
181

182
        p.notifySubscribers(paymentHash, payment)
3✔
183

3✔
184
        return nil
3✔
185
}
186

187
// DeleteFailedAttempts deletes all failed htlcs if the payment was
188
// successfully settled.
189
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
3✔
190
        return p.db.DeleteFailedAttempts(paymentHash)
3✔
191
}
3✔
192

193
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
194
// DB.
195
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
196
        attempt *paymentsdb.HTLCAttemptInfo) error {
3✔
197

3✔
198
        p.paymentsMtx.Lock(paymentHash)
3✔
199
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
200

3✔
201
        payment, err := p.db.RegisterAttempt(paymentHash, attempt)
3✔
202
        if err != nil {
3✔
UNCOV
203
                return err
×
UNCOV
204
        }
×
205

206
        // Notify subscribers of the attempt registration.
207
        p.notifySubscribers(paymentHash, payment)
3✔
208

3✔
209
        return nil
3✔
210
}
211

212
// SettleAttempt marks the given attempt settled with the preimage. If
213
// this is a multi shard payment, this might implicitly mean the the
214
// full payment succeeded.
215
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
216
        attemptID uint64, settleInfo *paymentsdb.HTLCSettleInfo) (
217
        *paymentsdb.HTLCAttempt, error) {
3✔
218

3✔
219
        p.paymentsMtx.Lock(paymentHash)
3✔
220
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
221

3✔
222
        payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
3✔
223
        if err != nil {
3✔
UNCOV
224
                return nil, err
×
225
        }
×
226

227
        // Notify subscribers of success event.
228
        p.notifySubscribers(paymentHash, payment)
3✔
229

3✔
230
        return payment.GetAttempt(attemptID)
3✔
231
}
232

233
// FailAttempt marks the given payment attempt failed.
234
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
235
        attemptID uint64, failInfo *paymentsdb.HTLCFailInfo) (
236
        *paymentsdb.HTLCAttempt, error) {
3✔
237

3✔
238
        p.paymentsMtx.Lock(paymentHash)
3✔
239
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
240

3✔
241
        payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
3✔
242
        if err != nil {
3✔
UNCOV
243
                return nil, err
×
UNCOV
244
        }
×
245

246
        // Notify subscribers of failed attempt.
247
        p.notifySubscribers(paymentHash, payment)
3✔
248

3✔
249
        return payment.GetAttempt(attemptID)
3✔
250
}
251

252
// FetchPayment fetches the payment corresponding to the given payment hash.
253
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
254
        paymentsdb.DBMPPayment, error) {
3✔
255

3✔
256
        return p.db.FetchPayment(paymentHash)
3✔
257
}
3✔
258

259
// FailPayment transitions a payment into the Failed state, and records the
260
// reason the payment failed. After invoking this method, InitPayment should
261
// return nil on its next call for this payment hash, allowing the switch to
262
// make a subsequent payment.
263
//
264
// NOTE: This method will overwrite the failure reason if the payment is already
265
// failed.
266
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
267
        reason paymentsdb.FailureReason) error {
3✔
268

3✔
269
        p.paymentsMtx.Lock(paymentHash)
3✔
270
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
271

3✔
272
        payment, err := p.db.Fail(paymentHash, reason)
3✔
273
        if err != nil {
3✔
UNCOV
274
                return err
×
UNCOV
275
        }
×
276

277
        // Notify subscribers of fail event.
278
        p.notifySubscribers(paymentHash, payment)
3✔
279

3✔
280
        return nil
3✔
281
}
282

283
// FetchInFlightPayments returns all payments with status InFlight.
284
func (p *controlTower) FetchInFlightPayments() ([]*paymentsdb.MPPayment,
285
        error) {
3✔
286

3✔
287
        return p.db.FetchInFlightPayments()
3✔
288
}
3✔
289

290
// SubscribePayment subscribes to updates for the payment with the given hash. A
291
// first update with the current state of the payment is always sent out
292
// immediately.
293
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
294
        ControlTowerSubscriber, error) {
3✔
295

3✔
296
        // Take lock before querying the db to prevent missing or duplicating an
3✔
297
        // update.
3✔
298
        p.paymentsMtx.Lock(paymentHash)
3✔
299
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
300

3✔
301
        payment, err := p.db.FetchPayment(paymentHash)
3✔
302
        if err != nil {
3✔
UNCOV
303
                return nil, err
×
UNCOV
304
        }
×
305

306
        subscriber := newControlTowerSubscriber()
3✔
307

3✔
308
        // Always write current payment state to the channel.
3✔
309
        subscriber.queue.ChanIn() <- payment
3✔
310

3✔
311
        // Payment is currently in flight. Register this subscriber for further
3✔
312
        // updates. Otherwise this update is the final update and the incoming
3✔
313
        // channel can be closed. This will close the queue's outgoing channel
3✔
314
        // when all updates have been written.
3✔
315
        if !payment.Terminated() {
6✔
316
                p.subscribersMtx.Lock()
3✔
317
                p.subscribers[paymentHash] = append(
3✔
318
                        p.subscribers[paymentHash], subscriber,
3✔
319
                )
3✔
320
                p.subscribersMtx.Unlock()
3✔
321
        } else {
6✔
322
                close(subscriber.queue.ChanIn())
3✔
323
        }
3✔
324

325
        return subscriber, nil
3✔
326
}
327

328
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
329
// update with the current state of every inflight payment is always sent out
330
// immediately.
331
// Note: If payments are in-flight while starting a new subscription, the start
332
// of the payment stream could produce out-of-order and/or duplicate events. In
333
// order to get updates for every in-flight payment attempt make sure to
334
// subscribe to this method before initiating any payments.
335
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
3✔
336
        subscriber := newControlTowerSubscriber()
3✔
337

3✔
338
        // Add the subscriber to the list before fetching in-flight payments, so
3✔
339
        // no events are missed. If a payment attempt update occurs after
3✔
340
        // appending and before fetching in-flight payments, an out-of-order
3✔
341
        // duplicate may be produced, because it is then fetched in below call
3✔
342
        // and notified through the subscription.
3✔
343
        p.subscribersMtx.Lock()
3✔
344
        p.subscribersAllPayments[p.subscriberIndex] = subscriber
3✔
345
        p.subscriberIndex++
3✔
346
        p.subscribersMtx.Unlock()
3✔
347

3✔
348
        log.Debugf("Scanning for inflight payments")
3✔
349
        inflightPayments, err := p.db.FetchInFlightPayments()
3✔
350
        if err != nil {
3✔
UNCOV
351
                return nil, err
×
UNCOV
352
        }
×
353
        log.Debugf("Scanning for inflight payments finished",
3✔
354
                len(inflightPayments))
3✔
355

3✔
356
        for index := range inflightPayments {
3✔
UNCOV
357
                // Always write current payment state to the channel.
×
UNCOV
358
                subscriber.queue.ChanIn() <- inflightPayments[index]
×
UNCOV
359
        }
×
360

361
        return subscriber, nil
3✔
362
}
363

364
// notifySubscribers sends a final payment event to all subscribers of this
365
// payment. The channel will be closed after this. Note that this function must
366
// be executed atomically (by means of a lock) with the database update to
367
// guarantee consistency of the notifications.
368
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
369
        event *paymentsdb.MPPayment) {
3✔
370

3✔
371
        // Get all subscribers for this payment.
3✔
372
        p.subscribersMtx.Lock()
3✔
373

3✔
374
        subscribersPaymentHash, ok := p.subscribers[paymentHash]
3✔
375
        if !ok && len(p.subscribersAllPayments) == 0 {
6✔
376
                p.subscribersMtx.Unlock()
3✔
377
                return
3✔
378
        }
3✔
379

380
        // If the payment reached a terminal state, the subscriber list can be
381
        // cleared. There won't be any more updates.
382
        terminal := event.Terminated()
3✔
383
        if terminal {
6✔
384
                delete(p.subscribers, paymentHash)
3✔
385
        }
3✔
386

387
        // Copy subscribers to all payments locally while holding the lock in
388
        // order to avoid concurrency issues while reading/writing the map.
389
        subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
3✔
390
        for k, v := range p.subscribersAllPayments {
6✔
391
                subscribersAllPayments[k] = v
3✔
392
        }
3✔
393
        p.subscribersMtx.Unlock()
3✔
394

3✔
395
        // Notify all subscribers that subscribed to the current payment hash.
3✔
396
        for _, subscriber := range subscribersPaymentHash {
6✔
397
                select {
3✔
398
                case subscriber.queue.ChanIn() <- event:
3✔
399
                        // If this event is the last, close the incoming channel
3✔
400
                        // of the queue. This will signal the subscriber that
3✔
401
                        // there won't be any more updates.
3✔
402
                        if terminal {
6✔
403
                                close(subscriber.queue.ChanIn())
3✔
404
                        }
3✔
405

406
                // If subscriber disappeared, skip notification. For further
407
                // notifications, we'll keep skipping over this subscriber.
408
                case <-subscriber.quit:
3✔
409
                }
410
        }
411

412
        // Notify all subscribers that subscribed to all payments.
413
        for key, subscriber := range subscribersAllPayments {
6✔
414
                select {
3✔
415
                case subscriber.queue.ChanIn() <- event:
3✔
416

417
                // If subscriber disappeared, remove it from the subscribers
418
                // list.
UNCOV
419
                case <-subscriber.quit:
×
UNCOV
420
                        p.subscribersMtx.Lock()
×
UNCOV
421
                        delete(p.subscribersAllPayments, key)
×
UNCOV
422
                        p.subscribersMtx.Unlock()
×
423
                }
424
        }
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc