• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.29
/routing/control_tower.go
1
package routing
2

3
import (
4
        "sync"
5

6
        "github.com/lightningnetwork/lnd/channeldb"
7
        "github.com/lightningnetwork/lnd/lntypes"
8
        "github.com/lightningnetwork/lnd/multimutex"
9
        "github.com/lightningnetwork/lnd/queue"
10
)
11

12
// DBMPPayment is an interface derived from channeldb.MPPayment that is used by
13
// the payment lifecycle.
14
type DBMPPayment interface {
15
        // GetState returns the current state of the payment.
16
        GetState() *channeldb.MPPaymentState
17

18
        // Terminated returns true if the payment is in a final state.
19
        Terminated() bool
20

21
        // GetStatus returns the current status of the payment.
22
        GetStatus() channeldb.PaymentStatus
23

24
        // NeedWaitAttempts specifies whether the payment needs to wait for the
25
        // outcome of an attempt.
26
        NeedWaitAttempts() (bool, error)
27

28
        // GetHTLCs returns all HTLCs of this payment.
29
        GetHTLCs() []channeldb.HTLCAttempt
30

31
        // InFlightHTLCs returns all HTLCs that are in flight.
32
        InFlightHTLCs() []channeldb.HTLCAttempt
33

34
        // AllowMoreAttempts is used to decide whether we can safely attempt
35
        // more HTLCs for a given payment state. Return an error if the payment
36
        // is in an unexpected state.
37
        AllowMoreAttempts() (bool, error)
38

39
        // TerminalInfo returns the settled HTLC attempt or the payment's
40
        // failure reason.
41
        TerminalInfo() (*channeldb.HTLCAttempt, *channeldb.FailureReason)
42
}
43

44
// ControlTower tracks all outgoing payments made, whose primary purpose is to
45
// prevent duplicate payments to the same payment hash. In production, a
46
// persistent implementation is preferred so that tracking can survive across
47
// restarts. Payments are transitioned through various payment states, and the
48
// ControlTower interface provides access to driving the state transitions.
49
type ControlTower interface {
50
        // This method checks that no succeeded payment exist for this payment
51
        // hash.
52
        InitPayment(lntypes.Hash, *channeldb.PaymentCreationInfo) error
53

54
        // DeleteFailedAttempts removes all failed HTLCs from the db. It should
55
        // be called for a given payment whenever all inflight htlcs are
56
        // completed, and the payment has reached a final settled state.
57
        DeleteFailedAttempts(lntypes.Hash) error
58

59
        // RegisterAttempt atomically records the provided HTLCAttemptInfo.
60
        RegisterAttempt(lntypes.Hash, *channeldb.HTLCAttemptInfo) error
61

62
        // SettleAttempt marks the given attempt settled with the preimage. If
63
        // this is a multi shard payment, this might implicitly mean the the
64
        // full payment succeeded.
65
        //
66
        // After invoking this method, InitPayment should always return an
67
        // error to prevent us from making duplicate payments to the same
68
        // payment hash. The provided preimage is atomically saved to the DB
69
        // for record keeping.
70
        SettleAttempt(lntypes.Hash, uint64, *channeldb.HTLCSettleInfo) (
71
                *channeldb.HTLCAttempt, error)
72

73
        // FailAttempt marks the given payment attempt failed.
74
        FailAttempt(lntypes.Hash, uint64, *channeldb.HTLCFailInfo) (
75
                *channeldb.HTLCAttempt, error)
76

77
        // FetchPayment fetches the payment corresponding to the given payment
78
        // hash.
79
        FetchPayment(paymentHash lntypes.Hash) (DBMPPayment, error)
80

81
        // FailPayment transitions a payment into the Failed state, and records
82
        // the ultimate reason the payment failed. Note that this should only
83
        // be called when all active attempts are already failed. After
84
        // invoking this method, InitPayment should return nil on its next call
85
        // for this payment hash, allowing the user to make a subsequent
86
        // payment.
87
        FailPayment(lntypes.Hash, channeldb.FailureReason) error
88

89
        // FetchInFlightPayments returns all payments with status InFlight.
90
        FetchInFlightPayments() ([]*channeldb.MPPayment, error)
91

92
        // SubscribePayment subscribes to updates for the payment with the given
93
        // hash. A first update with the current state of the payment is always
94
        // sent out immediately.
95
        SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
96
                error)
97

98
        // SubscribeAllPayments subscribes to updates for all payments. A first
99
        // update with the current state of every inflight payment is always
100
        // sent out immediately.
101
        SubscribeAllPayments() (ControlTowerSubscriber, error)
102
}
103

104
// ControlTowerSubscriber contains the state for a payment update subscriber.
105
type ControlTowerSubscriber interface {
106
        // Updates is the channel over which *channeldb.MPPayment updates can be
107
        // received.
108
        Updates() <-chan interface{}
109

110
        // Close signals that the subscriber is no longer interested in updates.
111
        Close()
112
}
113

114
// ControlTowerSubscriberImpl contains the state for a payment update
115
// subscriber.
116
type controlTowerSubscriberImpl struct {
117
        updates <-chan interface{}
118
        queue   *queue.ConcurrentQueue
119
        quit    chan struct{}
120
}
121

122
// newControlTowerSubscriber instantiates a new subscriber state object.
123
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
3✔
124
        // Create a queue for payment updates.
3✔
125
        queue := queue.NewConcurrentQueue(20)
3✔
126
        queue.Start()
3✔
127

3✔
128
        return &controlTowerSubscriberImpl{
3✔
129
                updates: queue.ChanOut(),
3✔
130
                queue:   queue,
3✔
131
                quit:    make(chan struct{}),
3✔
132
        }
3✔
133
}
3✔
134

135
// Close signals that the subscriber is no longer interested in updates.
136
func (s *controlTowerSubscriberImpl) Close() {
3✔
137
        // Close quit channel so that any pending writes to the queue are
3✔
138
        // cancelled.
3✔
139
        close(s.quit)
3✔
140

3✔
141
        // Stop the queue goroutine so that it won't leak.
3✔
142
        s.queue.Stop()
3✔
143
}
3✔
144

145
// Updates is the channel over which *channeldb.MPPayment updates can be
146
// received.
147
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
3✔
148
        return s.updates
3✔
149
}
3✔
150

151
// controlTower is persistent implementation of ControlTower to restrict
152
// double payment sending.
153
type controlTower struct {
154
        db *channeldb.PaymentControl
155

156
        // subscriberIndex is used to provide a unique id for each subscriber
157
        // to all payments. This is used to easily remove the subscriber when
158
        // necessary.
159
        subscriberIndex        uint64
160
        subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
161
        subscribers            map[lntypes.Hash][]*controlTowerSubscriberImpl
162
        subscribersMtx         sync.Mutex
163

164
        // paymentsMtx provides synchronization on the payment level to ensure
165
        // that no race conditions occur in between updating the database and
166
        // sending a notification.
167
        paymentsMtx *multimutex.Mutex[lntypes.Hash]
168
}
169

170
// NewControlTower creates a new instance of the controlTower.
171
func NewControlTower(db *channeldb.PaymentControl) ControlTower {
3✔
172
        return &controlTower{
3✔
173
                db: db,
3✔
174
                subscribersAllPayments: make(
3✔
175
                        map[uint64]*controlTowerSubscriberImpl,
3✔
176
                ),
3✔
177
                subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
3✔
178
                paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
3✔
179
        }
3✔
180
}
3✔
181

182
// InitPayment checks or records the given PaymentCreationInfo with the DB,
183
// making sure it does not already exist as an in-flight payment. Then this
184
// method returns successfully, the payment is guaranteed to be in the
185
// Initiated state.
186
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
187
        info *channeldb.PaymentCreationInfo) error {
3✔
188

3✔
189
        err := p.db.InitPayment(paymentHash, info)
3✔
190
        if err != nil {
6✔
191
                return err
3✔
192
        }
3✔
193

194
        // Take lock before querying the db to prevent missing or duplicating
195
        // an update.
196
        p.paymentsMtx.Lock(paymentHash)
3✔
197
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
198

3✔
199
        payment, err := p.db.FetchPayment(paymentHash)
3✔
200
        if err != nil {
3✔
201
                return err
×
202
        }
×
203

204
        p.notifySubscribers(paymentHash, payment)
3✔
205

3✔
206
        return nil
3✔
207
}
208

209
// DeleteFailedAttempts deletes all failed htlcs if the payment was
210
// successfully settled.
211
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
3✔
212
        return p.db.DeleteFailedAttempts(paymentHash)
3✔
213
}
3✔
214

215
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
216
// DB.
217
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
218
        attempt *channeldb.HTLCAttemptInfo) error {
3✔
219

3✔
220
        p.paymentsMtx.Lock(paymentHash)
3✔
221
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
222

3✔
223
        payment, err := p.db.RegisterAttempt(paymentHash, attempt)
3✔
224
        if err != nil {
3✔
225
                return err
×
226
        }
×
227

228
        // Notify subscribers of the attempt registration.
229
        p.notifySubscribers(paymentHash, payment)
3✔
230

3✔
231
        return nil
3✔
232
}
233

234
// SettleAttempt marks the given attempt settled with the preimage. If
235
// this is a multi shard payment, this might implicitly mean the the
236
// full payment succeeded.
237
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
238
        attemptID uint64, settleInfo *channeldb.HTLCSettleInfo) (
239
        *channeldb.HTLCAttempt, error) {
3✔
240

3✔
241
        p.paymentsMtx.Lock(paymentHash)
3✔
242
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
243

3✔
244
        payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
3✔
245
        if err != nil {
3✔
246
                return nil, err
×
247
        }
×
248

249
        // Notify subscribers of success event.
250
        p.notifySubscribers(paymentHash, payment)
3✔
251

3✔
252
        return payment.GetAttempt(attemptID)
3✔
253
}
254

255
// FailAttempt marks the given payment attempt failed.
256
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
257
        attemptID uint64, failInfo *channeldb.HTLCFailInfo) (
258
        *channeldb.HTLCAttempt, error) {
3✔
259

3✔
260
        p.paymentsMtx.Lock(paymentHash)
3✔
261
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
262

3✔
263
        payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
3✔
264
        if err != nil {
3✔
265
                return nil, err
×
266
        }
×
267

268
        // Notify subscribers of failed attempt.
269
        p.notifySubscribers(paymentHash, payment)
3✔
270

3✔
271
        return payment.GetAttempt(attemptID)
3✔
272
}
273

274
// FetchPayment fetches the payment corresponding to the given payment hash.
275
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
276
        DBMPPayment, error) {
3✔
277

3✔
278
        return p.db.FetchPayment(paymentHash)
3✔
279
}
3✔
280

281
// FailPayment transitions a payment into the Failed state, and records the
282
// reason the payment failed. After invoking this method, InitPayment should
283
// return nil on its next call for this payment hash, allowing the switch to
284
// make a subsequent payment.
285
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
286
        reason channeldb.FailureReason) error {
3✔
287

3✔
288
        p.paymentsMtx.Lock(paymentHash)
3✔
289
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
290

3✔
291
        payment, err := p.db.Fail(paymentHash, reason)
3✔
292
        if err != nil {
3✔
293
                return err
×
294
        }
×
295

296
        // Notify subscribers of fail event.
297
        p.notifySubscribers(paymentHash, payment)
3✔
298

3✔
299
        return nil
3✔
300
}
301

302
// FetchInFlightPayments returns all payments with status InFlight.
303
func (p *controlTower) FetchInFlightPayments() ([]*channeldb.MPPayment, error) {
3✔
304
        return p.db.FetchInFlightPayments()
3✔
305
}
3✔
306

307
// SubscribePayment subscribes to updates for the payment with the given hash. A
308
// first update with the current state of the payment is always sent out
309
// immediately.
310
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
311
        ControlTowerSubscriber, error) {
3✔
312

3✔
313
        // Take lock before querying the db to prevent missing or duplicating an
3✔
314
        // update.
3✔
315
        p.paymentsMtx.Lock(paymentHash)
3✔
316
        defer p.paymentsMtx.Unlock(paymentHash)
3✔
317

3✔
318
        payment, err := p.db.FetchPayment(paymentHash)
3✔
319
        if err != nil {
3✔
UNCOV
320
                return nil, err
×
UNCOV
321
        }
×
322

323
        subscriber := newControlTowerSubscriber()
3✔
324

3✔
325
        // Always write current payment state to the channel.
3✔
326
        subscriber.queue.ChanIn() <- payment
3✔
327

3✔
328
        // Payment is currently in flight. Register this subscriber for further
3✔
329
        // updates. Otherwise this update is the final update and the incoming
3✔
330
        // channel can be closed. This will close the queue's outgoing channel
3✔
331
        // when all updates have been written.
3✔
332
        if !payment.Terminated() {
6✔
333
                p.subscribersMtx.Lock()
3✔
334
                p.subscribers[paymentHash] = append(
3✔
335
                        p.subscribers[paymentHash], subscriber,
3✔
336
                )
3✔
337
                p.subscribersMtx.Unlock()
3✔
338
        } else {
6✔
339
                close(subscriber.queue.ChanIn())
3✔
340
        }
3✔
341

342
        return subscriber, nil
3✔
343
}
344

345
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
346
// update with the current state of every inflight payment is always sent out
347
// immediately.
348
// Note: If payments are in-flight while starting a new subscription, the start
349
// of the payment stream could produce out-of-order and/or duplicate events. In
350
// order to get updates for every in-flight payment attempt make sure to
351
// subscribe to this method before initiating any payments.
352
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
3✔
353
        subscriber := newControlTowerSubscriber()
3✔
354

3✔
355
        // Add the subscriber to the list before fetching in-flight payments, so
3✔
356
        // no events are missed. If a payment attempt update occurs after
3✔
357
        // appending and before fetching in-flight payments, an out-of-order
3✔
358
        // duplicate may be produced, because it is then fetched in below call
3✔
359
        // and notified through the subscription.
3✔
360
        p.subscribersMtx.Lock()
3✔
361
        p.subscribersAllPayments[p.subscriberIndex] = subscriber
3✔
362
        p.subscriberIndex++
3✔
363
        p.subscribersMtx.Unlock()
3✔
364

3✔
365
        inflightPayments, err := p.db.FetchInFlightPayments()
3✔
366
        if err != nil {
3✔
367
                return nil, err
×
368
        }
×
369

370
        for index := range inflightPayments {
3✔
UNCOV
371
                // Always write current payment state to the channel.
×
UNCOV
372
                subscriber.queue.ChanIn() <- inflightPayments[index]
×
UNCOV
373
        }
×
374

375
        return subscriber, nil
3✔
376
}
377

378
// notifySubscribers sends a final payment event to all subscribers of this
379
// payment. The channel will be closed after this. Note that this function must
380
// be executed atomically (by means of a lock) with the database update to
381
// guarantee consistency of the notifications.
382
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
383
        event *channeldb.MPPayment) {
3✔
384

3✔
385
        // Get all subscribers for this payment.
3✔
386
        p.subscribersMtx.Lock()
3✔
387

3✔
388
        subscribersPaymentHash, ok := p.subscribers[paymentHash]
3✔
389
        if !ok && len(p.subscribersAllPayments) == 0 {
6✔
390
                p.subscribersMtx.Unlock()
3✔
391
                return
3✔
392
        }
3✔
393

394
        // If the payment reached a terminal state, the subscriber list can be
395
        // cleared. There won't be any more updates.
396
        terminal := event.Terminated()
3✔
397
        if terminal {
6✔
398
                delete(p.subscribers, paymentHash)
3✔
399
        }
3✔
400

401
        // Copy subscribers to all payments locally while holding the lock in
402
        // order to avoid concurrency issues while reading/writing the map.
403
        subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
3✔
404
        for k, v := range p.subscribersAllPayments {
6✔
405
                subscribersAllPayments[k] = v
3✔
406
        }
3✔
407
        p.subscribersMtx.Unlock()
3✔
408

3✔
409
        // Notify all subscribers that subscribed to the current payment hash.
3✔
410
        for _, subscriber := range subscribersPaymentHash {
6✔
411
                select {
3✔
412
                case subscriber.queue.ChanIn() <- event:
3✔
413
                        // If this event is the last, close the incoming channel
3✔
414
                        // of the queue. This will signal the subscriber that
3✔
415
                        // there won't be any more updates.
3✔
416
                        if terminal {
6✔
417
                                close(subscriber.queue.ChanIn())
3✔
418
                        }
3✔
419

420
                // If subscriber disappeared, skip notification. For further
421
                // notifications, we'll keep skipping over this subscriber.
422
                case <-subscriber.quit:
3✔
423
                }
424
        }
425

426
        // Notify all subscribers that subscribed to all payments.
427
        for key, subscriber := range subscribersAllPayments {
6✔
428
                select {
3✔
429
                case subscriber.queue.ChanIn() <- event:
3✔
430

431
                // If subscriber disappeared, remove it from the subscribers
432
                // list.
UNCOV
433
                case <-subscriber.quit:
×
UNCOV
434
                        p.subscribersMtx.Lock()
×
UNCOV
435
                        delete(p.subscribersAllPayments, key)
×
UNCOV
436
                        p.subscribersMtx.Unlock()
×
437
                }
438
        }
439
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc