• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/watchtower/wtclient/session_negotiator.go
1
package wtclient
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/keychain"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/watchtower/blob"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
16
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
17
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
18
)
19

20
// SessionNegotiator is an interface for asynchronously requesting new sessions.
21
type SessionNegotiator interface {
22
        // RequestSession signals to the session negotiator that the client
23
        // needs another session. Once the session is negotiated, it should be
24
        // returned via NewSessions.
25
        RequestSession()
26

27
        // NewSessions is a read-only channel where newly negotiated sessions
28
        // will be delivered.
29
        NewSessions() <-chan *ClientSession
30

31
        // Start safely initializes the session negotiator.
32
        Start() error
33

34
        // Stop safely shuts down the session negotiator.
35
        Stop() error
36
}
37

38
// NegotiatorConfig provides access to the resources required by a
39
// SessionNegotiator to faithfully carry out its duties. All nil-able field must
40
// be initialized.
41
type NegotiatorConfig struct {
42
        // DB provides access to a persistent storage medium used by the tower
43
        // to properly allocate session ephemeral keys and record successfully
44
        // negotiated sessions.
45
        DB DB
46

47
        // SecretKeyRing allows the client to derive new session private keys
48
        // when attempting to negotiate session with a tower.
49
        SecretKeyRing ECDHKeyRing
50

51
        // Candidates is an abstract set of tower candidates that the negotiator
52
        // will traverse serially when attempting to negotiate a new session.
53
        Candidates TowerCandidateIterator
54

55
        // Policy defines the session policy that will be proposed to towers
56
        // when attempting to negotiate a new session. This policy will be used
57
        // across all negotiation proposals for the lifetime of the negotiator.
58
        Policy wtpolicy.Policy
59

60
        // Dial initiates an outbound brontide connection to the given address
61
        // using a specified private key. The peer is returned in the event of a
62
        // successful connection.
63
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
64
                error)
65

66
        // SendMessage writes a wtwire message to remote peer.
67
        SendMessage func(wtserver.Peer, wtwire.Message) error
68

69
        // ReadMessage reads a message from a remote peer and returns the
70
        // decoded wtwire message.
71
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
72

73
        // ChainHash the genesis hash identifying the chain for any negotiated
74
        // sessions. Any state updates sent to that session should also
75
        // originate from this chain.
76
        ChainHash chainhash.Hash
77

78
        // MinBackoff defines the initial backoff applied by the session
79
        // negotiator after all tower candidates have been exhausted and
80
        // reattempting negotiation with the same set of candidates. Subsequent
81
        // backoff durations will grow exponentially.
82
        MinBackoff time.Duration
83

84
        // MaxBackoff defines the maximum backoff applied by the session
85
        // negotiator after all tower candidates have been exhausted and
86
        // reattempting negotiation with the same set of candidates. If the
87
        // exponential backoff produces a timeout greater than this value, the
88
        // backoff duration will be clamped to MaxBackoff.
89
        MaxBackoff time.Duration
90

91
        // Log specifies the desired log output, which should be prefixed by the
92
        // client type, e.g. anchor or legacy.
93
        Log btclog.Logger
94
}
95

96
// sessionNegotiator is concrete SessionNegotiator that is able to request new
97
// sessions from a set of candidate towers asynchronously and return successful
98
// sessions to the primary client.
99
type sessionNegotiator struct {
100
        started sync.Once
101
        stopped sync.Once
102

103
        localInit *wtwire.Init
104

105
        cfg *NegotiatorConfig
106
        log btclog.Logger
107

108
        dispatcher             chan struct{}
109
        newSessions            chan *ClientSession
110
        successfulNegotiations chan *ClientSession
111

112
        wg   sync.WaitGroup
113
        quit chan struct{}
114
}
115

116
// Compile-time constraint to ensure a *sessionNegotiator implements the
117
// SessionNegotiator interface.
118
var _ SessionNegotiator = (*sessionNegotiator)(nil)
119

120
// newSessionNegotiator initializes a fresh sessionNegotiator instance.
121
func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {
3✔
122
        // Generate the set of features the negotiator will present to the tower
3✔
123
        // upon connection.
3✔
124
        features := cfg.Policy.FeatureBits()
3✔
125

3✔
126
        localInit := wtwire.NewInitMessage(
3✔
127
                lnwire.NewRawFeatureVector(features...),
3✔
128
                cfg.ChainHash,
3✔
129
        )
3✔
130

3✔
131
        return &sessionNegotiator{
3✔
132
                cfg:                    cfg,
3✔
133
                log:                    cfg.Log,
3✔
134
                localInit:              localInit,
3✔
135
                dispatcher:             make(chan struct{}, 1),
3✔
136
                newSessions:            make(chan *ClientSession),
3✔
137
                successfulNegotiations: make(chan *ClientSession),
3✔
138
                quit:                   make(chan struct{}),
3✔
139
        }
3✔
140
}
3✔
141

142
// Start safely starts up the sessionNegotiator.
143
func (n *sessionNegotiator) Start() error {
3✔
144
        n.started.Do(func() {
6✔
145
                n.log.Debugf("Starting session negotiator")
3✔
146

3✔
147
                n.wg.Add(1)
3✔
148
                go n.negotiationDispatcher()
3✔
149
        })
3✔
150

151
        return nil
3✔
152
}
153

154
// Stop safely shuts down the sessionNegotiator.
155
func (n *sessionNegotiator) Stop() error {
3✔
156
        n.stopped.Do(func() {
6✔
157
                n.log.Debugf("Stopping session negotiator")
3✔
158

3✔
159
                close(n.quit)
3✔
160
                n.wg.Wait()
3✔
161
        })
3✔
162

163
        return nil
3✔
164
}
165

166
// NewSessions returns a receive-only channel from which newly negotiated
167
// sessions will be returned.
168
func (n *sessionNegotiator) NewSessions() <-chan *ClientSession {
3✔
169
        return n.newSessions
3✔
170
}
3✔
171

172
// RequestSession sends a request to the sessionNegotiator to begin requesting a
173
// new session. If one is already in the process of being negotiated, the
174
// request will be ignored.
175
func (n *sessionNegotiator) RequestSession() {
3✔
176
        select {
3✔
177
        case n.dispatcher <- struct{}{}:
3✔
178
        default:
×
179
        }
180
}
181

182
// negotiationDispatcher acts as the primary event loop for the
183
// sessionNegotiator, coordinating requests for more sessions and dispatching
184
// attempts to negotiate them from a list of candidates.
185
func (n *sessionNegotiator) negotiationDispatcher() {
3✔
186
        defer n.wg.Done()
3✔
187

3✔
188
        var pendingNegotiations int
3✔
189
        for {
6✔
190
                select {
3✔
191
                case <-n.dispatcher:
3✔
192
                        pendingNegotiations++
3✔
193

3✔
194
                        if pendingNegotiations > 1 {
3✔
195
                                n.log.Debugf("Already negotiating session, " +
×
196
                                        "waiting for existing negotiation to " +
×
197
                                        "complete")
×
198
                                continue
×
199
                        }
200

201
                        // TODO(conner): consider reusing good towers
202

203
                        n.log.Debugf("Dispatching session negotiation")
3✔
204

3✔
205
                        n.wg.Add(1)
3✔
206
                        go n.negotiate()
3✔
207

208
                case session := <-n.successfulNegotiations:
3✔
209
                        select {
3✔
210
                        case n.newSessions <- session:
3✔
211
                                pendingNegotiations--
3✔
212
                        case <-n.quit:
×
213
                                return
×
214
                        }
215

216
                        if pendingNegotiations > 0 {
3✔
217
                                n.log.Debugf("Dispatching pending session " +
×
218
                                        "negotiation")
×
219

×
220
                                n.wg.Add(1)
×
221
                                go n.negotiate()
×
222
                        }
×
223

224
                case <-n.quit:
3✔
225
                        return
3✔
226
                }
227
        }
228
}
229

230
// negotiate handles the process of iterating through potential tower candidates
231
// and attempting to negotiate a new session until a successful negotiation
232
// occurs. If the candidate iterator becomes exhausted because none were
233
// successful, this method will back off exponentially up to the configured max
234
// backoff. This method will continue trying until a negotiation is successful
235
// before returning the negotiated session to the dispatcher via the succeed
236
// channel.
237
//
238
// NOTE: This method MUST be run as a goroutine.
239
func (n *sessionNegotiator) negotiate() {
3✔
240
        defer n.wg.Done()
3✔
241

3✔
242
        // On the first pass, initialize the backoff to our configured min
3✔
243
        // backoff.
3✔
244
        var backoff time.Duration
3✔
245

3✔
246
        // Create a closure to update the backoff upon failure such that it
3✔
247
        // stays within our min and max backoff parameters.
3✔
248
        updateBackoff := func() {
6✔
249
                if backoff == 0 {
6✔
250
                        backoff = n.cfg.MinBackoff
3✔
251
                } else {
3✔
UNCOV
252
                        backoff *= 2
×
UNCOV
253
                        if backoff > n.cfg.MaxBackoff {
×
UNCOV
254
                                backoff = n.cfg.MaxBackoff
×
UNCOV
255
                        }
×
256
                }
257
        }
258

259
retryWithBackoff:
260
        // If we are retrying, wait out the delay before continuing.
261
        if backoff > 0 {
6✔
262
                select {
3✔
263
                case <-time.After(backoff):
3✔
264
                case <-n.quit:
3✔
265
                        return
3✔
266
                }
267
        }
268

269
tryNextCandidate:
3✔
270
        for {
6✔
271
                select {
3✔
272
                case <-n.quit:
×
273
                        return
×
274
                default:
3✔
275
                }
276

277
                // Pull the next candidate from our list of addresses.
278
                tower, err := n.cfg.Candidates.Next()
3✔
279
                if err != nil {
6✔
280
                        // We've run out of addresses, update our backoff.
3✔
281
                        updateBackoff()
3✔
282

3✔
283
                        n.log.Debugf("Unable to get new tower candidate, "+
3✔
284
                                "retrying after %v -- reason: %v", backoff, err)
3✔
285

3✔
286
                        // Only reset the iterator once we've exhausted all
3✔
287
                        // candidates. Doing so allows us to load balance
3✔
288
                        // sessions better amongst all of the tower candidates.
3✔
289
                        if err == ErrTowerCandidatesExhausted {
6✔
290
                                n.cfg.Candidates.Reset()
3✔
291
                        }
3✔
292

293
                        goto retryWithBackoff
3✔
294
                }
295

296
                towerPub := tower.IdentityKey.SerializeCompressed()
3✔
297
                n.log.Debugf("Attempting session negotiation with tower=%x",
3✔
298
                        towerPub)
3✔
299

3✔
300
                var forceNextKey bool
3✔
301
                for {
6✔
302
                        // Before proceeding, we will reserve a session key
3✔
303
                        // index to use with this specific tower. If one is
3✔
304
                        // already reserved, the existing index will be
3✔
305
                        // returned.
3✔
306
                        keyIndex, err := n.cfg.DB.NextSessionKeyIndex(
3✔
307
                                tower.ID, n.cfg.Policy.BlobType, forceNextKey,
3✔
308
                        )
3✔
309
                        if err != nil {
3✔
310
                                n.log.Debugf("Unable to reserve session key "+
×
311
                                        "index for tower=%x: %v", towerPub, err)
×
312

×
313
                                goto tryNextCandidate
×
314
                        }
315

316
                        // We'll now attempt the CreateSession dance with the
317
                        // tower to get a new session, trying all addresses if
318
                        // necessary.
319
                        err = n.createSession(tower, keyIndex)
3✔
320
                        if err == nil {
6✔
321
                                return
3✔
322
                        } else if errors.Is(err, ErrSessionKeyAlreadyUsed) {
3✔
UNCOV
323
                                forceNextKey = true
×
UNCOV
324
                                continue
×
325
                        }
326

327
                        // An unexpected error occurred, update our backoff.
UNCOV
328
                        updateBackoff()
×
UNCOV
329

×
UNCOV
330
                        n.log.Debugf("Session negotiation with tower=%x "+
×
UNCOV
331
                                "failed, trying again -- reason: %v", towerPub,
×
UNCOV
332
                                err)
×
UNCOV
333

×
UNCOV
334
                        goto retryWithBackoff
×
335
                }
336
        }
337
}
338

339
// createSession takes a tower and attempts to negotiate a session using any of
340
// its stored addresses. This method returns after the first successful
341
// negotiation, or after all addresses have failed with ErrFailedNegotiation.
342
func (n *sessionNegotiator) createSession(tower *Tower, keyIndex uint32) error {
3✔
343
        sessionKeyDesc, err := n.cfg.SecretKeyRing.DeriveKey(
3✔
344
                keychain.KeyLocator{
3✔
345
                        Family: keychain.KeyFamilyTowerSession,
3✔
346
                        Index:  keyIndex,
3✔
347
                },
3✔
348
        )
3✔
349
        if err != nil {
3✔
350
                return err
×
351
        }
×
352
        sessionKey := keychain.NewPubKeyECDH(
3✔
353
                sessionKeyDesc, n.cfg.SecretKeyRing,
3✔
354
        )
3✔
355

3✔
356
        addr := tower.Addresses.PeekAndLock()
3✔
357
        for {
6✔
358
                lnAddr := &lnwire.NetAddress{
3✔
359
                        IdentityKey: tower.IdentityKey,
3✔
360
                        Address:     addr,
3✔
361
                }
3✔
362

3✔
363
                err = n.tryAddress(sessionKey, keyIndex, tower, lnAddr)
3✔
364
                tower.Addresses.ReleaseLock(addr)
3✔
365
                switch {
3✔
UNCOV
366
                case errors.Is(err, ErrSessionKeyAlreadyUsed):
×
UNCOV
367
                        return err
×
368

369
                case errors.Is(err, ErrPermanentTowerFailure):
×
370
                        // TODO(conner): report to iterator? can then be reset
×
371
                        // with restart
×
372
                        fallthrough
×
373

UNCOV
374
                case err != nil:
×
UNCOV
375
                        n.log.Debugf("Request for session negotiation with "+
×
UNCOV
376
                                "tower=%s failed, trying again -- reason: "+
×
UNCOV
377
                                "%v", lnAddr, err)
×
UNCOV
378

×
UNCOV
379
                        // Get the next tower address if there is one.
×
UNCOV
380
                        addr, err = tower.Addresses.NextAndLock()
×
UNCOV
381
                        if err == ErrAddressesExhausted {
×
UNCOV
382
                                tower.Addresses.Reset()
×
UNCOV
383

×
UNCOV
384
                                return ErrFailedNegotiation
×
UNCOV
385
                        }
×
386

387
                        continue
×
388

389
                default:
3✔
390
                        return nil
3✔
391
                }
392
        }
393
}
394

395
// tryAddress executes a single create session dance using the given address.
396
// The address should belong to the tower's set of addresses. This method only
397
// returns true if all steps succeed and the new session has been persisted, and
398
// fails otherwise.
399
func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
400
        keyIndex uint32, tower *Tower, lnAddr *lnwire.NetAddress) error {
3✔
401

3✔
402
        // Connect to the tower address using our generated session key.
3✔
403
        conn, err := n.cfg.Dial(sessionKey, lnAddr)
3✔
404
        if err != nil {
3✔
UNCOV
405
                return err
×
UNCOV
406
        }
×
407

408
        // Send local Init message.
409
        err = n.cfg.SendMessage(conn, n.localInit)
3✔
410
        if err != nil {
3✔
UNCOV
411
                return fmt.Errorf("unable to send Init: %w", err)
×
UNCOV
412
        }
×
413

414
        // Receive remote Init message.
415
        remoteMsg, err := n.cfg.ReadMessage(conn)
3✔
416
        if err != nil {
3✔
417
                return fmt.Errorf("unable to read Init: %w", err)
×
418
        }
×
419

420
        // Check that returned message is wtwire.Init.
421
        remoteInit, ok := remoteMsg.(*wtwire.Init)
3✔
422
        if !ok {
3✔
423
                return fmt.Errorf("expected Init, got %T in reply", remoteMsg)
×
424
        }
×
425

426
        // Verify the watchtower's remote Init message against our own.
427
        err = n.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
3✔
428
        if err != nil {
3✔
429
                return err
×
430
        }
×
431

432
        policy := n.cfg.Policy
3✔
433
        createSession := &wtwire.CreateSession{
3✔
434
                BlobType:     policy.BlobType,
3✔
435
                MaxUpdates:   policy.MaxUpdates,
3✔
436
                RewardBase:   policy.RewardBase,
3✔
437
                RewardRate:   policy.RewardRate,
3✔
438
                SweepFeeRate: policy.SweepFeeRate,
3✔
439
        }
3✔
440

3✔
441
        // Send CreateSession message.
3✔
442
        err = n.cfg.SendMessage(conn, createSession)
3✔
443
        if err != nil {
3✔
444
                return fmt.Errorf("unable to send CreateSession: %w", err)
×
445
        }
×
446

447
        // Receive CreateSessionReply message.
448
        remoteMsg, err = n.cfg.ReadMessage(conn)
3✔
449
        if err != nil {
3✔
UNCOV
450
                return fmt.Errorf("unable to read CreateSessionReply: %w", err)
×
UNCOV
451
        }
×
452

453
        // Check that returned message is wtwire.CreateSessionReply.
454
        createSessionReply, ok := remoteMsg.(*wtwire.CreateSessionReply)
3✔
455
        if !ok {
3✔
456
                return fmt.Errorf("expected CreateSessionReply, got %T in "+
×
457
                        "reply", remoteMsg)
×
458
        }
×
459

460
        switch createSessionReply.Code {
3✔
461
        case wtwire.CodeOK:
3✔
462
                // TODO(conner): validate reward address
3✔
463
                rewardPkScript := createSessionReply.Data
3✔
464

3✔
465
                sessionID := wtdb.NewSessionIDFromPubKey(sessionKey.PubKey())
3✔
466
                dbClientSession := &wtdb.ClientSession{
3✔
467
                        ClientSessionBody: wtdb.ClientSessionBody{
3✔
468
                                TowerID:        tower.ID,
3✔
469
                                KeyIndex:       keyIndex,
3✔
470
                                Policy:         n.cfg.Policy,
3✔
471
                                RewardPkScript: rewardPkScript,
3✔
472
                        },
3✔
473
                        ID: sessionID,
3✔
474
                }
3✔
475

3✔
476
                err = n.cfg.DB.CreateClientSession(dbClientSession)
3✔
477
                if err != nil {
3✔
478
                        return fmt.Errorf("unable to persist ClientSession: %w",
×
479
                                err)
×
480
                }
×
481

482
                n.log.Debugf("New session negotiated with %s, policy: %s",
3✔
483
                        lnAddr, dbClientSession.Policy)
3✔
484

3✔
485
                clientSession := &ClientSession{
3✔
486
                        ID:                sessionID,
3✔
487
                        ClientSessionBody: dbClientSession.ClientSessionBody,
3✔
488
                        Tower:             tower,
3✔
489
                        SessionKeyECDH:    sessionKey,
3✔
490
                }
3✔
491

3✔
492
                // We have a newly negotiated session, return it to the
3✔
493
                // dispatcher so that it can update how many outstanding
3✔
494
                // negotiation requests we have.
3✔
495
                select {
3✔
496
                case n.successfulNegotiations <- clientSession:
3✔
497
                        return nil
3✔
UNCOV
498
                case <-n.quit:
×
UNCOV
499
                        return ErrNegotiatorExiting
×
500
                }
501

UNCOV
502
        case wtwire.CreateSessionCodeAlreadyExists:
×
UNCOV
503
                // TODO(conner): use the last-applied in the create session
×
UNCOV
504
                //  reply to handle case where we lose state, session already
×
UNCOV
505
                //  exists, and we want to possibly resume using the session.
×
UNCOV
506
                //  NOTE that this should not be done until the server code
×
UNCOV
507
                //  has been adapted to first check that the CreateSession
×
UNCOV
508
                //  request is for the same blob-type as the initial session.
×
UNCOV
509

×
UNCOV
510
                return ErrSessionKeyAlreadyUsed
×
511

512
        // TODO(conner): handle error codes properly
513
        case wtwire.CreateSessionCodeRejectBlobType:
×
514
                return fmt.Errorf("tower rejected blob type: %v",
×
515
                        policy.BlobType)
×
516

517
        case wtwire.CreateSessionCodeRejectMaxUpdates:
×
518
                return fmt.Errorf("tower rejected max updates: %v",
×
519
                        policy.MaxUpdates)
×
520

521
        case wtwire.CreateSessionCodeRejectRewardRate:
×
522
                // The tower rejected the session because of the reward rate. If
×
523
                // we didn't request a reward session, we'll treat this as a
×
524
                // permanent tower failure.
×
525
                if !policy.BlobType.Has(blob.FlagReward) {
×
526
                        return ErrPermanentTowerFailure
×
527
                }
×
528

529
                return fmt.Errorf("tower rejected reward rate: %v",
×
530
                        policy.RewardRate)
×
531

532
        case wtwire.CreateSessionCodeRejectSweepFeeRate:
×
533
                return fmt.Errorf("tower rejected sweep fee rate: %v",
×
534
                        policy.SweepFeeRate)
×
535

536
        default:
×
537
                return fmt.Errorf("received unhandled error code: %v",
×
538
                        createSessionReply.Code)
×
539
        }
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc