• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/watchtower/wtclient/session_negotiator.go
1
package wtclient
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btclog"
11
        "github.com/lightningnetwork/lnd/keychain"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/watchtower/blob"
14
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
15
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
16
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
17
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
18
)
19

20
// SessionNegotiator is an interface for asynchronously requesting new sessions.
21
type SessionNegotiator interface {
22
        // RequestSession signals to the session negotiator that the client
23
        // needs another session. Once the session is negotiated, it should be
24
        // returned via NewSessions.
25
        RequestSession()
26

27
        // NewSessions is a read-only channel where newly negotiated sessions
28
        // will be delivered.
29
        NewSessions() <-chan *ClientSession
30

31
        // Start safely initializes the session negotiator.
32
        Start() error
33

34
        // Stop safely shuts down the session negotiator.
35
        Stop() error
36
}
37

38
// NegotiatorConfig provides access to the resources required by a
39
// SessionNegotiator to faithfully carry out its duties. All nil-able field must
40
// be initialized.
41
type NegotiatorConfig struct {
42
        // DB provides access to a persistent storage medium used by the tower
43
        // to properly allocate session ephemeral keys and record successfully
44
        // negotiated sessions.
45
        DB DB
46

47
        // SecretKeyRing allows the client to derive new session private keys
48
        // when attempting to negotiate session with a tower.
49
        SecretKeyRing ECDHKeyRing
50

51
        // Candidates is an abstract set of tower candidates that the negotiator
52
        // will traverse serially when attempting to negotiate a new session.
53
        Candidates TowerCandidateIterator
54

55
        // Policy defines the session policy that will be proposed to towers
56
        // when attempting to negotiate a new session. This policy will be used
57
        // across all negotiation proposals for the lifetime of the negotiator.
58
        Policy wtpolicy.Policy
59

60
        // Dial initiates an outbound brontide connection to the given address
61
        // using a specified private key. The peer is returned in the event of a
62
        // successful connection.
63
        Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
64
                error)
65

66
        // SendMessage writes a wtwire message to remote peer.
67
        SendMessage func(wtserver.Peer, wtwire.Message) error
68

69
        // ReadMessage reads a message from a remote peer and returns the
70
        // decoded wtwire message.
71
        ReadMessage func(wtserver.Peer) (wtwire.Message, error)
72

73
        // ChainHash the genesis hash identifying the chain for any negotiated
74
        // sessions. Any state updates sent to that session should also
75
        // originate from this chain.
76
        ChainHash chainhash.Hash
77

78
        // MinBackoff defines the initial backoff applied by the session
79
        // negotiator after all tower candidates have been exhausted and
80
        // reattempting negotiation with the same set of candidates. Subsequent
81
        // backoff durations will grow exponentially.
82
        MinBackoff time.Duration
83

84
        // MaxBackoff defines the maximum backoff applied by the session
85
        // negotiator after all tower candidates have been exhausted and
86
        // reattempting negotiation with the same set of candidates. If the
87
        // exponential backoff produces a timeout greater than this value, the
88
        // backoff duration will be clamped to MaxBackoff.
89
        MaxBackoff time.Duration
90

91
        // Log specifies the desired log output, which should be prefixed by the
92
        // client type, e.g. anchor or legacy.
93
        Log btclog.Logger
94
}
95

96
// sessionNegotiator is concrete SessionNegotiator that is able to request new
97
// sessions from a set of candidate towers asynchronously and return successful
98
// sessions to the primary client.
99
type sessionNegotiator struct {
100
        started sync.Once
101
        stopped sync.Once
102

103
        localInit *wtwire.Init
104

105
        cfg *NegotiatorConfig
106
        log btclog.Logger
107

108
        dispatcher             chan struct{}
109
        newSessions            chan *ClientSession
110
        successfulNegotiations chan *ClientSession
111

112
        wg   sync.WaitGroup
113
        quit chan struct{}
114
}
115

116
// Compile-time constraint to ensure a *sessionNegotiator implements the
117
// SessionNegotiator interface.
118
var _ SessionNegotiator = (*sessionNegotiator)(nil)
119

120
// newSessionNegotiator initializes a fresh sessionNegotiator instance.
121
func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {
2✔
122
        // Generate the set of features the negotiator will present to the tower
2✔
123
        // upon connection.
2✔
124
        features := cfg.Policy.FeatureBits()
2✔
125

2✔
126
        localInit := wtwire.NewInitMessage(
2✔
127
                lnwire.NewRawFeatureVector(features...),
2✔
128
                cfg.ChainHash,
2✔
129
        )
2✔
130

2✔
131
        return &sessionNegotiator{
2✔
132
                cfg:                    cfg,
2✔
133
                log:                    cfg.Log,
2✔
134
                localInit:              localInit,
2✔
135
                dispatcher:             make(chan struct{}, 1),
2✔
136
                newSessions:            make(chan *ClientSession),
2✔
137
                successfulNegotiations: make(chan *ClientSession),
2✔
138
                quit:                   make(chan struct{}),
2✔
139
        }
2✔
140
}
2✔
141

142
// Start safely starts up the sessionNegotiator.
143
func (n *sessionNegotiator) Start() error {
2✔
144
        n.started.Do(func() {
4✔
145
                n.log.Debugf("Starting session negotiator")
2✔
146

2✔
147
                n.wg.Add(1)
2✔
148
                go n.negotiationDispatcher()
2✔
149
        })
2✔
150

151
        return nil
2✔
152
}
153

154
// Stop safely shuts down the sessionNegotiator.
155
func (n *sessionNegotiator) Stop() error {
2✔
156
        n.stopped.Do(func() {
4✔
157
                n.log.Debugf("Stopping session negotiator")
2✔
158

2✔
159
                close(n.quit)
2✔
160
                n.wg.Wait()
2✔
161
        })
2✔
162

163
        return nil
2✔
164
}
165

166
// NewSessions returns a receive-only channel from which newly negotiated
167
// sessions will be returned.
168
func (n *sessionNegotiator) NewSessions() <-chan *ClientSession {
2✔
169
        return n.newSessions
2✔
170
}
2✔
171

172
// RequestSession sends a request to the sessionNegotiator to begin requesting a
173
// new session. If one is already in the process of being negotiated, the
174
// request will be ignored.
175
func (n *sessionNegotiator) RequestSession() {
2✔
176
        select {
2✔
177
        case n.dispatcher <- struct{}{}:
2✔
178
        default:
×
179
        }
180
}
181

182
// negotiationDispatcher acts as the primary event loop for the
183
// sessionNegotiator, coordinating requests for more sessions and dispatching
184
// attempts to negotiate them from a list of candidates.
185
func (n *sessionNegotiator) negotiationDispatcher() {
2✔
186
        defer n.wg.Done()
2✔
187

2✔
188
        var pendingNegotiations int
2✔
189
        for {
4✔
190
                select {
2✔
191
                case <-n.dispatcher:
2✔
192
                        pendingNegotiations++
2✔
193

2✔
194
                        if pendingNegotiations > 1 {
2✔
195
                                n.log.Debugf("Already negotiating session, " +
×
196
                                        "waiting for existing negotiation to " +
×
197
                                        "complete")
×
198
                                continue
×
199
                        }
200

201
                        // TODO(conner): consider reusing good towers
202

203
                        n.log.Debugf("Dispatching session negotiation")
2✔
204

2✔
205
                        n.wg.Add(1)
2✔
206
                        go n.negotiate()
2✔
207

208
                case session := <-n.successfulNegotiations:
2✔
209
                        select {
2✔
210
                        case n.newSessions <- session:
2✔
211
                                pendingNegotiations--
2✔
212
                        case <-n.quit:
×
213
                                return
×
214
                        }
215

216
                        if pendingNegotiations > 0 {
2✔
217
                                n.log.Debugf("Dispatching pending session " +
×
218
                                        "negotiation")
×
219

×
220
                                n.wg.Add(1)
×
221
                                go n.negotiate()
×
222
                        }
×
223

224
                case <-n.quit:
2✔
225
                        return
2✔
226
                }
227
        }
228
}
229

230
// negotiate handles the process of iterating through potential tower candidates
231
// and attempting to negotiate a new session until a successful negotiation
232
// occurs. If the candidate iterator becomes exhausted because none were
233
// successful, this method will back off exponentially up to the configured max
234
// backoff. This method will continue trying until a negotiation is successful
235
// before returning the negotiated session to the dispatcher via the succeed
236
// channel.
237
//
238
// NOTE: This method MUST be run as a goroutine.
239
func (n *sessionNegotiator) negotiate() {
2✔
240
        defer n.wg.Done()
2✔
241

2✔
242
        // On the first pass, initialize the backoff to our configured min
2✔
243
        // backoff.
2✔
244
        var backoff time.Duration
2✔
245

2✔
246
        // Create a closure to update the backoff upon failure such that it
2✔
247
        // stays within our min and max backoff parameters.
2✔
248
        updateBackoff := func() {
4✔
249
                if backoff == 0 {
4✔
250
                        backoff = n.cfg.MinBackoff
2✔
251
                } else {
2✔
UNCOV
252
                        backoff *= 2
×
UNCOV
253
                        if backoff > n.cfg.MaxBackoff {
×
UNCOV
254
                                backoff = n.cfg.MaxBackoff
×
UNCOV
255
                        }
×
256
                }
257
        }
258

259
retryWithBackoff:
260
        // If we are retrying, wait out the delay before continuing.
261
        if backoff > 0 {
4✔
262
                select {
2✔
263
                case <-time.After(backoff):
2✔
264
                case <-n.quit:
2✔
265
                        return
2✔
266
                }
267
        }
268

269
tryNextCandidate:
2✔
270
        for {
4✔
271
                select {
2✔
272
                case <-n.quit:
×
273
                        return
×
274
                default:
2✔
275
                }
276

277
                // Pull the next candidate from our list of addresses.
278
                tower, err := n.cfg.Candidates.Next()
2✔
279
                if err != nil {
4✔
280
                        // We've run out of addresses, update our backoff.
2✔
281
                        updateBackoff()
2✔
282

2✔
283
                        n.log.Debugf("Unable to get new tower candidate, "+
2✔
284
                                "retrying after %v -- reason: %v", backoff, err)
2✔
285

2✔
286
                        // Only reset the iterator once we've exhausted all
2✔
287
                        // candidates. Doing so allows us to load balance
2✔
288
                        // sessions better amongst all of the tower candidates.
2✔
289
                        if err == ErrTowerCandidatesExhausted {
4✔
290
                                n.cfg.Candidates.Reset()
2✔
291
                        }
2✔
292

293
                        goto retryWithBackoff
2✔
294
                }
295

296
                towerPub := tower.IdentityKey.SerializeCompressed()
2✔
297
                n.log.Debugf("Attempting session negotiation with tower=%x",
2✔
298
                        towerPub)
2✔
299

2✔
300
                var forceNextKey bool
2✔
301
                for {
4✔
302
                        // Before proceeding, we will reserve a session key
2✔
303
                        // index to use with this specific tower. If one is
2✔
304
                        // already reserved, the existing index will be
2✔
305
                        // returned.
2✔
306
                        keyIndex, err := n.cfg.DB.NextSessionKeyIndex(
2✔
307
                                tower.ID, n.cfg.Policy.BlobType, forceNextKey,
2✔
308
                        )
2✔
309
                        if err != nil {
2✔
310
                                n.log.Debugf("Unable to reserve session key "+
×
311
                                        "index for tower=%x: %v", towerPub, err)
×
312

×
313
                                goto tryNextCandidate
×
314
                        }
315

316
                        // We'll now attempt the CreateSession dance with the
317
                        // tower to get a new session, trying all addresses if
318
                        // necessary.
319
                        err = n.createSession(tower, keyIndex)
2✔
320
                        if err == nil {
4✔
321
                                return
2✔
322
                        } else if errors.Is(err, ErrSessionKeyAlreadyUsed) {
2✔
UNCOV
323
                                forceNextKey = true
×
UNCOV
324
                                continue
×
325
                        }
326

327
                        // An unexpected error occurred, update our backoff.
UNCOV
328
                        updateBackoff()
×
UNCOV
329

×
UNCOV
330
                        n.log.Debugf("Session negotiation with tower=%x "+
×
UNCOV
331
                                "failed, trying again -- reason: %v", towerPub,
×
UNCOV
332
                                err)
×
UNCOV
333

×
UNCOV
334
                        goto retryWithBackoff
×
335
                }
336
        }
337
}
338

339
// createSession takes a tower and attempts to negotiate a session using any of
340
// its stored addresses. This method returns after the first successful
341
// negotiation, or after all addresses have failed with ErrFailedNegotiation.
342
func (n *sessionNegotiator) createSession(tower *Tower, keyIndex uint32) error {
2✔
343
        sessionKeyDesc, err := n.cfg.SecretKeyRing.DeriveKey(
2✔
344
                keychain.KeyLocator{
2✔
345
                        Family: keychain.KeyFamilyTowerSession,
2✔
346
                        Index:  keyIndex,
2✔
347
                },
2✔
348
        )
2✔
349
        if err != nil {
2✔
350
                return err
×
351
        }
×
352
        sessionKey := keychain.NewPubKeyECDH(
2✔
353
                sessionKeyDesc, n.cfg.SecretKeyRing,
2✔
354
        )
2✔
355

2✔
356
        addr := tower.Addresses.PeekAndLock()
2✔
357
        for {
4✔
358
                lnAddr := &lnwire.NetAddress{
2✔
359
                        IdentityKey: tower.IdentityKey,
2✔
360
                        Address:     addr,
2✔
361
                }
2✔
362

2✔
363
                err = n.tryAddress(sessionKey, keyIndex, tower, lnAddr)
2✔
364
                tower.Addresses.ReleaseLock(addr)
2✔
365
                switch {
2✔
UNCOV
366
                case errors.Is(err, ErrSessionKeyAlreadyUsed):
×
UNCOV
367
                        return err
×
368

369
                case errors.Is(err, ErrPermanentTowerFailure):
×
370
                        // TODO(conner): report to iterator? can then be reset
×
371
                        // with restart
×
372
                        fallthrough
×
373

UNCOV
374
                case err != nil:
×
UNCOV
375
                        n.log.Debugf("Request for session negotiation with "+
×
UNCOV
376
                                "tower=%s failed, trying again -- reason: "+
×
UNCOV
377
                                "%v", lnAddr, err)
×
UNCOV
378

×
UNCOV
379
                        // Get the next tower address if there is one.
×
UNCOV
380
                        addr, err = tower.Addresses.NextAndLock()
×
UNCOV
381
                        if err == ErrAddressesExhausted {
×
UNCOV
382
                                tower.Addresses.Reset()
×
UNCOV
383

×
UNCOV
384
                                return ErrFailedNegotiation
×
UNCOV
385
                        }
×
386

387
                        continue
×
388

389
                default:
2✔
390
                        return nil
2✔
391
                }
392
        }
393
}
394

395
// tryAddress executes a single create session dance using the given address.
396
// The address should belong to the tower's set of addresses. This method only
397
// returns true if all steps succeed and the new session has been persisted, and
398
// fails otherwise.
399
func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
400
        keyIndex uint32, tower *Tower, lnAddr *lnwire.NetAddress) error {
2✔
401

2✔
402
        // Connect to the tower address using our generated session key.
2✔
403
        conn, err := n.cfg.Dial(sessionKey, lnAddr)
2✔
404
        if err != nil {
2✔
UNCOV
405
                return err
×
UNCOV
406
        }
×
407

408
        // Send local Init message.
409
        err = n.cfg.SendMessage(conn, n.localInit)
2✔
410
        if err != nil {
2✔
UNCOV
411
                return fmt.Errorf("unable to send Init: %w", err)
×
UNCOV
412
        }
×
413

414
        // Receive remote Init message.
415
        remoteMsg, err := n.cfg.ReadMessage(conn)
2✔
416
        if err != nil {
2✔
417
                return fmt.Errorf("unable to read Init: %w", err)
×
418
        }
×
419

420
        // Check that returned message is wtwire.Init.
421
        remoteInit, ok := remoteMsg.(*wtwire.Init)
2✔
422
        if !ok {
2✔
423
                return fmt.Errorf("expected Init, got %T in reply", remoteMsg)
×
424
        }
×
425

426
        // Verify the watchtower's remote Init message against our own.
427
        err = n.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
2✔
428
        if err != nil {
2✔
429
                return err
×
430
        }
×
431

432
        policy := n.cfg.Policy
2✔
433
        createSession := &wtwire.CreateSession{
2✔
434
                BlobType:     policy.BlobType,
2✔
435
                MaxUpdates:   policy.MaxUpdates,
2✔
436
                RewardBase:   policy.RewardBase,
2✔
437
                RewardRate:   policy.RewardRate,
2✔
438
                SweepFeeRate: policy.SweepFeeRate,
2✔
439
        }
2✔
440

2✔
441
        // Send CreateSession message.
2✔
442
        err = n.cfg.SendMessage(conn, createSession)
2✔
443
        if err != nil {
2✔
444
                return fmt.Errorf("unable to send CreateSession: %w", err)
×
445
        }
×
446

447
        // Receive CreateSessionReply message.
448
        remoteMsg, err = n.cfg.ReadMessage(conn)
2✔
449
        if err != nil {
2✔
UNCOV
450
                return fmt.Errorf("unable to read CreateSessionReply: %w", err)
×
UNCOV
451
        }
×
452

453
        // Check that returned message is wtwire.CreateSessionReply.
454
        createSessionReply, ok := remoteMsg.(*wtwire.CreateSessionReply)
2✔
455
        if !ok {
2✔
456
                return fmt.Errorf("expected CreateSessionReply, got %T in "+
×
457
                        "reply", remoteMsg)
×
458
        }
×
459

460
        switch createSessionReply.Code {
2✔
461
        case wtwire.CodeOK:
2✔
462
                // TODO(conner): validate reward address
2✔
463
                rewardPkScript := createSessionReply.Data
2✔
464

2✔
465
                sessionID := wtdb.NewSessionIDFromPubKey(sessionKey.PubKey())
2✔
466
                dbClientSession := &wtdb.ClientSession{
2✔
467
                        ClientSessionBody: wtdb.ClientSessionBody{
2✔
468
                                TowerID:        tower.ID,
2✔
469
                                KeyIndex:       keyIndex,
2✔
470
                                Policy:         n.cfg.Policy,
2✔
471
                                RewardPkScript: rewardPkScript,
2✔
472
                        },
2✔
473
                        ID: sessionID,
2✔
474
                }
2✔
475

2✔
476
                err = n.cfg.DB.CreateClientSession(dbClientSession)
2✔
477
                if err != nil {
2✔
478
                        return fmt.Errorf("unable to persist ClientSession: %w",
×
479
                                err)
×
480
                }
×
481

482
                n.log.Debugf("New session negotiated with %s, policy: %s",
2✔
483
                        lnAddr, dbClientSession.Policy)
2✔
484

2✔
485
                clientSession := &ClientSession{
2✔
486
                        ID:                sessionID,
2✔
487
                        ClientSessionBody: dbClientSession.ClientSessionBody,
2✔
488
                        Tower:             tower,
2✔
489
                        SessionKeyECDH:    sessionKey,
2✔
490
                }
2✔
491

2✔
492
                // We have a newly negotiated session, return it to the
2✔
493
                // dispatcher so that it can update how many outstanding
2✔
494
                // negotiation requests we have.
2✔
495
                select {
2✔
496
                case n.successfulNegotiations <- clientSession:
2✔
497
                        return nil
2✔
UNCOV
498
                case <-n.quit:
×
UNCOV
499
                        return ErrNegotiatorExiting
×
500
                }
501

UNCOV
502
        case wtwire.CreateSessionCodeAlreadyExists:
×
UNCOV
503
                // TODO(conner): use the last-applied in the create session
×
UNCOV
504
                //  reply to handle case where we lose state, session already
×
UNCOV
505
                //  exists, and we want to possibly resume using the session.
×
UNCOV
506
                //  NOTE that this should not be done until the server code
×
UNCOV
507
                //  has been adapted to first check that the CreateSession
×
UNCOV
508
                //  request is for the same blob-type as the initial session.
×
UNCOV
509

×
UNCOV
510
                return ErrSessionKeyAlreadyUsed
×
511

512
        // TODO(conner): handle error codes properly
513
        case wtwire.CreateSessionCodeRejectBlobType:
×
514
                return fmt.Errorf("tower rejected blob type: %v",
×
515
                        policy.BlobType)
×
516

517
        case wtwire.CreateSessionCodeRejectMaxUpdates:
×
518
                return fmt.Errorf("tower rejected max updates: %v",
×
519
                        policy.MaxUpdates)
×
520

521
        case wtwire.CreateSessionCodeRejectRewardRate:
×
522
                // The tower rejected the session because of the reward rate. If
×
523
                // we didn't request a reward session, we'll treat this as a
×
524
                // permanent tower failure.
×
525
                if !policy.BlobType.Has(blob.FlagReward) {
×
526
                        return ErrPermanentTowerFailure
×
527
                }
×
528

529
                return fmt.Errorf("tower rejected reward rate: %v",
×
530
                        policy.RewardRate)
×
531

532
        case wtwire.CreateSessionCodeRejectSweepFeeRate:
×
533
                return fmt.Errorf("tower rejected sweep fee rate: %v",
×
534
                        policy.SweepFeeRate)
×
535

536
        default:
×
537
                return fmt.Errorf("received unhandled error code: %v",
×
538
                        createSessionReply.Code)
×
539
        }
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc