• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.6
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "math/big"
9
        "net"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btclog/v2"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/keychain"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
20
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
21
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
22
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
23
)
24

25
const (
26
        // DefaultReadTimeout specifies the default duration we will wait during
27
        // a read before breaking out of a blocking read.
28
        DefaultReadTimeout = 15 * time.Second
29

30
        // DefaultWriteTimeout specifies the default duration we will wait
31
        // during a write before breaking out of a blocking write.
32
        DefaultWriteTimeout = 15 * time.Second
33

34
        // DefaultStatInterval specifies the default interval between logging
35
        // metrics about the client's operation.
36
        DefaultStatInterval = time.Minute
37

38
        // DefaultSessionCloseRange is the range over which we will generate a
39
        // random number of blocks to delay closing a session after its last
40
        // channel has been closed.
41
        DefaultSessionCloseRange = 288
42

43
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
44
        // in the in-memory queue.
45
        DefaultMaxTasksInMemQueue = 2000
46
)
47

48
// genSessionFilter constructs a filter that can be used to select sessions only
49
// if they match the policy of the client (namely anchor vs legacy). If
50
// activeOnly is set, then only active sessions will be returned.
51
func (c *client) genSessionFilter(
52
        activeOnly bool) wtdb.ClientSessionFilterFn {
3✔
53

3✔
54
        return func(session *wtdb.ClientSession) bool {
6✔
55
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
6✔
56
                        return false
3✔
57
                }
3✔
58

59
                if !activeOnly {
6✔
60
                        return true
3✔
61
                }
3✔
62

63
                return session.Status == wtdb.CSessionActive
3✔
64
        }
65
}
66

67
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
68
// function that will filter out any sessions that have been exhausted. A
69
// session is considered exhausted only if it has no un-acked updates and the
70
// sequence number of the session is equal to the max updates of the session
71
// policy.
72
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
3✔
73
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
6✔
74
                return session.SeqNum < session.Policy.MaxUpdates ||
3✔
75
                        numUnAcked > 0
3✔
76
        }
3✔
77
}
78

79
// RegisteredTower encompasses information about a registered watchtower with
80
// the client.
81
type RegisteredTower struct {
82
        *wtdb.Tower
83

84
        // Sessions is the set of sessions corresponding to the watchtower.
85
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
86

87
        // ActiveSessionCandidate determines whether the watchtower is currently
88
        // being considered for new sessions.
89
        ActiveSessionCandidate bool
90
}
91

92
// BreachRetributionBuilder is a function that can be used to construct a
93
// BreachRetribution from a channel ID and a commitment height.
94
type BreachRetributionBuilder func(id lnwire.ChannelID,
95
        commitHeight uint64) (*lnwallet.BreachRetribution,
96
        channeldb.ChannelType, error)
97

98
// newTowerMsg is an internal message we'll use within the client to signal
99
// that a new tower can be considered.
100
type newTowerMsg struct {
101
        // tower holds the info about the new Tower or new tower address
102
        // required to connect to it.
103
        tower *Tower
104

105
        // errChan is the channel through which we'll send a response back to
106
        // the caller when handling their request.
107
        //
108
        // NOTE: This channel must be buffered.
109
        errChan chan error
110
}
111

112
// staleTowerMsg is an internal message we'll use within the client to
113
// signal that a tower should no longer be considered.
114
type staleTowerMsg struct {
115
        // id is the unique database identifier for the tower.
116
        id wtdb.TowerID
117

118
        // pubKey is the identifying public key of the watchtower.
119
        pubKey *btcec.PublicKey
120

121
        // addr is an optional field that when set signals that the address
122
        // should be removed from the watchtower's set of addresses, indicating
123
        // that it is stale. If it's not set, then the watchtower should be
124
        // no longer be considered for new sessions.
125
        addr net.Addr
126

127
        // errChan is the channel through which we'll send a response back to
128
        // the caller when handling their request.
129
        //
130
        // NOTE: This channel must be buffered.
131
        errChan chan error
132
}
133

134
// deactivateTowerMsg is an internal message we'll use within the TowerClient
135
// to signal that a tower should be marked as inactive.
136
type deactivateTowerMsg struct {
137
        // id is the unique database identifier for the tower.
138
        id wtdb.TowerID
139

140
        // pubKey is the identifying public key of the watchtower.
141
        pubKey *btcec.PublicKey
142

143
        // errChan is the channel through which we'll send a response back to
144
        // the caller when handling their request.
145
        //
146
        // NOTE: This channel must be buffered.
147
        errChan chan error
148
}
149

150
// terminateSessMsg is an internal message we'll use within the TowerClient to
151
// signal that a session should be terminated.
152
type terminateSessMsg struct {
153
        // id is the session identifier.
154
        id wtdb.SessionID
155

156
        // errChan is the channel through which we'll send a response back to
157
        // the caller when handling their request.
158
        //
159
        // NOTE: This channel must be buffered.
160
        errChan chan error
161
}
162

163
// clientCfg holds the configuration values required by a client.
164
type clientCfg struct {
165
        *Config
166

167
        // Policy is the session policy the client will propose when creating
168
        // new sessions with the tower. If the policy differs from any active
169
        // sessions recorded in the database, those sessions will be ignored and
170
        // new sessions will be requested immediately.
171
        Policy wtpolicy.Policy
172

173
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
174
}
175

176
// client manages backing up revoked states for all states that fall under a
177
// specific policy type.
178
type client struct {
179
        cfg *clientCfg
180

181
        log btclog.Logger
182

183
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
184

185
        negotiator        SessionNegotiator
186
        candidateTowers   TowerCandidateIterator
187
        candidateSessions map[wtdb.SessionID]*ClientSession
188
        activeSessions    *sessionQueueSet
189

190
        sessionQueue *sessionQueue
191
        prevTask     *wtdb.BackupID
192

193
        statTicker *time.Ticker
194
        stats      *clientStats
195

196
        newTowers         chan *newTowerMsg
197
        staleTowers       chan *staleTowerMsg
198
        deactivateTowers  chan *deactivateTowerMsg
199
        terminateSessions chan *terminateSessMsg
200

201
        wg   sync.WaitGroup
202
        quit chan struct{}
203
}
204

205
// newClient initializes a new client from the provided clientCfg. An error is
206
// returned if the client could not be initialized.
207
func newClient(cfg *clientCfg) (*client, error) {
3✔
208
        identifier, err := cfg.Policy.BlobType.Identifier()
3✔
209
        if err != nil {
3✔
210
                return nil, err
×
211
        }
×
212
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
3✔
213

3✔
214
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
3✔
215
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
3✔
216
                queueDB, cfg.MaxTasksInMemQueue, plog,
3✔
217
        )
3✔
218
        if err != nil {
3✔
219
                return nil, err
×
220
        }
×
221

222
        c := &client{
3✔
223
                cfg:               cfg,
3✔
224
                log:               plog,
3✔
225
                pipeline:          queue,
3✔
226
                activeSessions:    newSessionQueueSet(),
3✔
227
                statTicker:        time.NewTicker(DefaultStatInterval),
3✔
228
                stats:             new(clientStats),
3✔
229
                newTowers:         make(chan *newTowerMsg),
3✔
230
                staleTowers:       make(chan *staleTowerMsg),
3✔
231
                deactivateTowers:  make(chan *deactivateTowerMsg),
3✔
232
                terminateSessions: make(chan *terminateSessMsg),
3✔
233
                quit:              make(chan struct{}),
3✔
234
        }
3✔
235

3✔
236
        candidateTowers := newTowerListIterator()
3✔
237
        perActiveTower := func(tower *Tower) {
6✔
238
                // If the tower has already been marked as active, then there is
3✔
239
                // no need to add it to the iterator again.
3✔
240
                if candidateTowers.IsActive(tower.ID) {
3✔
241
                        return
×
242
                }
×
243

244
                c.log.Infof("Using private watchtower %x, offering policy %s",
3✔
245
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
3✔
246

3✔
247
                // Add the tower to the set of candidate towers.
3✔
248
                candidateTowers.AddCandidate(tower)
3✔
249
        }
250

251
        // Load all candidate sessions and towers from the database into the
252
        // client. We will use any of these sessions if their policies match the
253
        // current policy of the client, otherwise they will be ignored and new
254
        // sessions will be requested.
255
        candidateSessions, err := getTowerAndSessionCandidates(
3✔
256
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
3✔
257
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
3✔
258
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
3✔
259
        )
3✔
260
        if err != nil {
3✔
261
                return nil, err
×
262
        }
×
263

264
        c.candidateTowers = candidateTowers
3✔
265
        c.candidateSessions = candidateSessions
3✔
266

3✔
267
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
3✔
268
                DB:            cfg.DB,
3✔
269
                SecretKeyRing: cfg.SecretKeyRing,
3✔
270
                Policy:        cfg.Policy,
3✔
271
                ChainHash:     cfg.ChainHash,
3✔
272
                SendMessage:   c.sendMessage,
3✔
273
                ReadMessage:   c.readMessage,
3✔
274
                Dial:          c.dial,
3✔
275
                Candidates:    c.candidateTowers,
3✔
276
                MinBackoff:    cfg.MinBackoff,
3✔
277
                MaxBackoff:    cfg.MaxBackoff,
3✔
278
                Log:           plog,
3✔
279
        })
3✔
280

3✔
281
        return c, nil
3✔
282
}
283

284
// getTowerAndSessionCandidates loads all the towers from the DB and then
285
// fetches the sessions for each of tower. Sessions are only collected if they
286
// pass the sessionFilter check. If a tower has a session that does pass the
287
// sessionFilter check then the perActiveTower call-back will be called on that
288
// tower.
289
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
290
        perActiveTower func(tower *Tower),
291
        opts ...wtdb.ClientSessionListOption) (
292
        map[wtdb.SessionID]*ClientSession, error) {
3✔
293

3✔
294
        // Fetch all active towers from the DB.
3✔
295
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
6✔
296
                return tower.Status == wtdb.TowerStatusActive
3✔
297
        })
3✔
298
        if err != nil {
3✔
299
                return nil, err
×
300
        }
×
301

302
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
3✔
303
        for _, dbTower := range towers {
6✔
304
                tower, err := NewTowerFromDBTower(dbTower)
3✔
305
                if err != nil {
3✔
306
                        return nil, err
×
307
                }
×
308

309
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
3✔
310
                if err != nil {
3✔
311
                        return nil, err
×
312
                }
×
313

314
                for _, s := range sessions {
6✔
315
                        cs, err := NewClientSessionFromDBSession(
3✔
316
                                s, tower, keyRing,
3✔
317
                        )
3✔
318
                        if err != nil {
3✔
319
                                return nil, err
×
320
                        }
×
321

322
                        // Add the session to the set of candidate sessions.
323
                        candidateSessions[s.ID] = cs
3✔
324
                }
325

326
                perActiveTower(tower)
3✔
327
        }
328

329
        return candidateSessions, nil
3✔
330
}
331

332
// getClientSessions retrieves the client sessions for a particular tower if
333
// specified, otherwise all client sessions for all towers are retrieved. An
334
// optional filter can be provided to filter out any undesired client sessions.
335
//
336
// NOTE: This method should only be used when deserialization of a
337
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
338
// ListClientSessions method should be used.
339
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
340
        opts ...wtdb.ClientSessionListOption) (
341
        map[wtdb.SessionID]*ClientSession, error) {
3✔
342

3✔
343
        dbSessions, err := db.ListClientSessions(forTower, opts...)
3✔
344
        if err != nil {
3✔
345
                return nil, err
×
346
        }
×
347

348
        // Reload the tower from disk using the tower ID contained in each
349
        // candidate session. We will also rederive any session keys needed to
350
        // be able to communicate with the towers and authenticate session
351
        // requests. This prevents us from having to store the private keys on
352
        // disk.
353
        sessions := make(map[wtdb.SessionID]*ClientSession)
3✔
354
        for _, s := range dbSessions {
6✔
355
                dbTower, err := db.LoadTowerByID(s.TowerID)
3✔
356
                if err != nil {
3✔
357
                        return nil, err
×
358
                }
×
359

360
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
3✔
361
                        Family: keychain.KeyFamilyTowerSession,
3✔
362
                        Index:  s.KeyIndex,
3✔
363
                })
3✔
364
                if err != nil {
3✔
365
                        return nil, err
×
366
                }
×
367

368
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
3✔
369

3✔
370
                tower, err := NewTowerFromDBTower(dbTower)
3✔
371
                if err != nil {
3✔
372
                        return nil, err
×
373
                }
×
374

375
                sessions[s.ID] = &ClientSession{
3✔
376
                        ID:                s.ID,
3✔
377
                        ClientSessionBody: s.ClientSessionBody,
3✔
378
                        Tower:             tower,
3✔
379
                        SessionKeyECDH:    sessionKeyECDH,
3✔
380
                }
3✔
381
        }
382

383
        return sessions, nil
3✔
384
}
385

386
// start initializes the watchtower client by loading or negotiating an active
387
// session and then begins processing backup tasks from the request pipeline.
388
func (c *client) start() error {
3✔
389
        c.log.Infof("Watchtower client starting")
3✔
390

3✔
391
        // First, restart a session queue for any sessions that have
3✔
392
        // committed but unacked state updates. This ensures that these
3✔
393
        // sessions will be able to flush the committed updates after a
3✔
394
        // restart.
3✔
395
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
3✔
396
        for _, session := range c.candidateSessions {
6✔
397
                committedUpdates, err := fetchCommittedUpdates(
3✔
398
                        &session.ID,
3✔
399
                )
3✔
400
                if err != nil {
3✔
401
                        return err
×
402
                }
×
403

404
                if len(committedUpdates) > 0 {
3✔
405
                        c.log.Infof("Starting session=%s to process "+
×
406
                                "%d committed backups", session.ID,
×
407
                                len(committedUpdates))
×
408

×
409
                        c.initActiveQueue(session, committedUpdates)
×
410
                }
×
411
        }
412

413
        // Now start the session negotiator, which will allow us to request new
414
        // session as soon as the backupDispatcher starts up.
415
        err := c.negotiator.Start()
3✔
416
        if err != nil {
3✔
417
                return err
×
418
        }
×
419

420
        // Start the task pipeline to which new backup tasks will be
421
        // submitted from active links.
422
        err = c.pipeline.Start()
3✔
423
        if err != nil {
3✔
424
                return err
×
425
        }
×
426

427
        c.wg.Add(1)
3✔
428
        go c.backupDispatcher()
3✔
429

3✔
430
        c.log.Infof("Watchtower client started successfully")
3✔
431

3✔
432
        return nil
3✔
433
}
434

435
// stop idempotently initiates a graceful shutdown of the watchtower client.
436
func (c *client) stop() error {
3✔
437
        var returnErr error
3✔
438
        c.log.Debugf("Stopping watchtower client")
3✔
439

3✔
440
        // 1. Stop the session negotiator.
3✔
441
        err := c.negotiator.Stop()
3✔
442
        if err != nil {
3✔
443
                returnErr = err
×
444
        }
×
445

446
        // 2. Stop the backup dispatcher and any other goroutines.
447
        close(c.quit)
3✔
448
        c.wg.Wait()
3✔
449

3✔
450
        // 3. If there was a left over 'prevTask' from the backup
3✔
451
        // dispatcher, replay that onto the pipeline.
3✔
452
        if c.prevTask != nil {
3✔
453
                err = c.pipeline.QueueBackupID(c.prevTask)
×
454
                if err != nil {
×
455
                        returnErr = err
×
456
                }
×
457
        }
458

459
        // 4. Shutdown all active session queues in parallel. These will
460
        // exit once all unhandled updates have been replayed to the
461
        // task pipeline.
462
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
6✔
463
                return func() {
6✔
464
                        err := s.Stop(false)
3✔
465
                        if err != nil {
3✔
466
                                c.log.Errorf("could not stop session "+
×
467
                                        "queue: %s: %v", s.ID(), err)
×
468

×
469
                                returnErr = err
×
470
                        }
×
471
                }
472
        })
473

474
        // 5. Shutdown the backup queue, which will prevent any further
475
        // updates from being accepted.
476
        if err = c.pipeline.Stop(); err != nil {
3✔
477
                returnErr = err
×
478
        }
×
479

480
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
3✔
481

3✔
482
        return returnErr
3✔
483
}
484

485
// backupState initiates a request to back up a particular revoked state. If the
486
// method returns nil, the backup is guaranteed to be successful unless the:
487
//   - justice transaction would create dust outputs when trying to abide by the
488
//     negotiated policy, or
489
//   - breached outputs contain too little value to sweep at the target sweep
490
//     fee rate.
491
func (c *client) backupState(chanID *lnwire.ChannelID,
492
        stateNum uint64) error {
3✔
493

3✔
494
        id := &wtdb.BackupID{
3✔
495
                ChanID:       *chanID,
3✔
496
                CommitHeight: stateNum,
3✔
497
        }
3✔
498

3✔
499
        return c.pipeline.QueueBackupID(id)
3✔
500
}
3✔
501

502
// nextSessionQueue attempts to fetch an active session from our set of
503
// candidate sessions. Candidate sessions with a differing policy from the
504
// active client's advertised policy will be ignored, but may be resumed if the
505
// client is restarted with a matching policy. If no candidates were found, nil
506
// is returned to signal that we need to request a new policy.
507
func (c *client) nextSessionQueue() (*sessionQueue, error) {
3✔
508
        // Select any candidate session at random, and remove it from the set of
3✔
509
        // candidate sessions.
3✔
510
        var candidateSession *ClientSession
3✔
511
        for id, sessionInfo := range c.candidateSessions {
6✔
512
                delete(c.candidateSessions, id)
3✔
513

3✔
514
                // Skip any sessions with policies that don't match the current
3✔
515
                // TxPolicy, as they would result in different justice
3✔
516
                // transactions from what is requested. These can be used again
3✔
517
                // if the client changes their configuration and restarting.
3✔
518
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
3✔
519
                        continue
×
520
                }
521

522
                candidateSession = sessionInfo
3✔
523
                break
3✔
524
        }
525

526
        // If none of the sessions could be used or none were found, we'll
527
        // return nil to signal that we need another session to be negotiated.
528
        if candidateSession == nil {
3✔
529
                return nil, nil
×
530
        }
×
531

532
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
3✔
533
                &candidateSession.ID,
3✔
534
        )
3✔
535
        if err != nil {
3✔
536
                return nil, err
×
537
        }
×
538

539
        // Initialize the session queue and spin it up, so it can begin handling
540
        // updates. If the queue was already made active on startup, this will
541
        // simply return the existing session queue from the set.
542
        return c.getOrInitActiveQueue(candidateSession, updates), nil
3✔
543
}
544

545
// stopAndRemoveSession stops the session with the given ID and removes it from
546
// the in-memory active sessions set.
547
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
3✔
548
        return c.activeSessions.StopAndRemove(id, final)
3✔
549
}
3✔
550

551
// deleteSessionFromTower dials the tower that we created the session with and
552
// attempts to send the tower the DeleteSession message.
553
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
3✔
554
        // First, we check if we have already loaded this tower in our
3✔
555
        // candidate towers iterator.
3✔
556
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
3✔
557
        if errors.Is(err, ErrTowerNotInIterator) {
3✔
558
                // If not, then we attempt to load it from the DB.
×
559
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
560
                if err != nil {
×
561
                        return err
×
562
                }
×
563

564
                tower, err = NewTowerFromDBTower(dbTower)
×
565
                if err != nil {
×
566
                        return err
×
567
                }
×
568
        } else if err != nil {
3✔
569
                return err
×
570
        }
×
571

572
        session, err := NewClientSessionFromDBSession(
3✔
573
                sess, tower, c.cfg.SecretKeyRing,
3✔
574
        )
3✔
575
        if err != nil {
3✔
576
                return err
×
577
        }
×
578

579
        localInit := wtwire.NewInitMessage(
3✔
580
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
3✔
581
                c.cfg.ChainHash,
3✔
582
        )
3✔
583

3✔
584
        var (
3✔
585
                conn wtserver.Peer
3✔
586

3✔
587
                // addrIterator is a copy of the tower's address iterator.
3✔
588
                // We use this copy so that iterating through the addresses does
3✔
589
                // not affect any other threads using this iterator.
3✔
590
                addrIterator = tower.Addresses.Copy()
3✔
591
                towerAddr    = addrIterator.Peek()
3✔
592
        )
3✔
593
        // Attempt to dial the tower with its available addresses.
3✔
594
        for {
6✔
595
                conn, err = c.dial(
3✔
596
                        session.SessionKeyECDH, &lnwire.NetAddress{
3✔
597
                                IdentityKey: tower.IdentityKey,
3✔
598
                                Address:     towerAddr,
3✔
599
                        },
3✔
600
                )
3✔
601
                if err != nil {
3✔
602
                        // If there are more addrs available, immediately try
×
603
                        // those.
×
604
                        nextAddr, iteratorErr := addrIterator.Next()
×
605
                        if iteratorErr == nil {
×
606
                                towerAddr = nextAddr
×
607
                                continue
×
608
                        }
609

610
                        // Otherwise, if we have exhausted the address list,
611
                        // exit.
612
                        addrIterator.Reset()
×
613

×
614
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
615
                                "available addresses",
×
616
                                tower.IdentityKey.SerializeCompressed())
×
617
                }
618

619
                break
3✔
620
        }
621
        defer conn.Close()
3✔
622

3✔
623
        // Send Init to tower.
3✔
624
        err = c.sendMessage(conn, localInit)
3✔
625
        if err != nil {
3✔
626
                return err
×
627
        }
×
628

629
        // Receive Init from tower.
630
        remoteMsg, err := c.readMessage(conn)
3✔
631
        if err != nil {
3✔
632
                return err
×
633
        }
×
634

635
        remoteInit, ok := remoteMsg.(*wtwire.Init)
3✔
636
        if !ok {
3✔
637
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
638
                        towerAddr, remoteMsg)
×
639
        }
×
640

641
        // Validate Init.
642
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
3✔
643
        if err != nil {
3✔
644
                return err
×
645
        }
×
646

647
        // Send DeleteSession to tower.
648
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
3✔
649
        if err != nil {
3✔
650
                return err
×
651
        }
×
652

653
        // Receive DeleteSessionReply from tower.
654
        remoteMsg, err = c.readMessage(conn)
3✔
655
        if err != nil {
3✔
656
                return err
×
657
        }
×
658

659
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
3✔
660
        if !ok {
3✔
661
                return fmt.Errorf("watchtower %s responded with %T to "+
×
662
                        "DeleteSession", towerAddr, remoteMsg)
×
663
        }
×
664

665
        switch deleteSessionReply.Code {
3✔
666
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
3✔
667
                return nil
3✔
668
        default:
×
669
                return fmt.Errorf("received error code %v in "+
×
670
                        "DeleteSessionReply when attempting to delete "+
×
671
                        "session from tower", deleteSessionReply.Code)
×
672
        }
673
}
674

675
// backupDispatcher processes events coming from the taskPipeline and is
676
// responsible for detecting when the client needs to renegotiate a session to
677
// fulfill continuing demand. The event loop exits if the client is quit.
678
//
679
// NOTE: This method MUST be run as a goroutine.
680
func (c *client) backupDispatcher() {
3✔
681
        defer c.wg.Done()
3✔
682

3✔
683
        c.log.Tracef("Starting backup dispatcher")
3✔
684
        defer c.log.Tracef("Stopping backup dispatcher")
3✔
685

3✔
686
        for {
6✔
687
                switch {
3✔
688

689
                // No active session queue and no additional sessions.
690
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
3✔
691
                        c.log.Infof("Requesting new session.")
3✔
692

3✔
693
                        // Immediately request a new session.
3✔
694
                        c.negotiator.RequestSession()
3✔
695

3✔
696
                        // Wait until we receive the newly negotiated session.
3✔
697
                        // All backups sent in the meantime are queued in the
3✔
698
                        // revoke queue, as we cannot process them.
3✔
699
                awaitSession:
3✔
700
                        select {
3✔
701
                        case session := <-c.negotiator.NewSessions():
3✔
702
                                c.log.Infof("Acquired new session with id=%s",
3✔
703
                                        session.ID)
3✔
704
                                c.candidateSessions[session.ID] = session
3✔
705
                                c.stats.sessionAcquired()
3✔
706

3✔
707
                                // We'll continue to choose the newly negotiated
3✔
708
                                // session as our active session queue.
3✔
709
                                continue
3✔
710

711
                        case <-c.statTicker.C:
×
712
                                c.log.Infof("Client stats: %s", c.stats)
×
713

714
                        // A new tower has been requested to be added. We'll
715
                        // update our persisted and in-memory state and consider
716
                        // its corresponding sessions, if any, as new
717
                        // candidates.
718
                        case msg := <-c.newTowers:
3✔
719
                                msg.errChan <- c.handleNewTower(msg.tower)
3✔
720

721
                        // A tower has been requested to be removed. We'll
722
                        // only allow removal of it if the address in question
723
                        // is not currently being used for session negotiation.
724
                        case msg := <-c.staleTowers:
3✔
725
                                msg.errChan <- c.handleStaleTower(msg)
3✔
726

727
                        // A tower has been requested to be de-activated. We'll
728
                        // only allow this if the tower is not currently being
729
                        // used for session negotiation.
730
                        case msg := <-c.deactivateTowers:
×
731
                                msg.errChan <- c.handleDeactivateTower(msg)
×
732

733
                        // A request has come through to terminate a session.
734
                        case msg := <-c.terminateSessions:
×
735
                                msg.errChan <- c.handleTerminateSession(msg)
×
736

737
                        case <-c.quit:
3✔
738
                                return
3✔
739
                        }
740

741
                        // Instead of looping, we'll jump back into the select
742
                        // case and await the delivery of the session to prevent
743
                        // us from re-requesting additional sessions.
744
                        goto awaitSession
3✔
745

746
                // No active session queue but have additional sessions.
747
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
3✔
748
                        // We've exhausted the prior session, we'll pop another
3✔
749
                        // from the remaining sessions and continue processing
3✔
750
                        // backup tasks.
3✔
751
                        var err error
3✔
752
                        c.sessionQueue, err = c.nextSessionQueue()
3✔
753
                        if err != nil {
3✔
754
                                c.log.Errorf("error fetching next session "+
×
755
                                        "queue: %v", err)
×
756
                        }
×
757

758
                        if c.sessionQueue != nil {
6✔
759
                                c.log.Debugf("Loaded next candidate session "+
3✔
760
                                        "queue id=%s", c.sessionQueue.ID())
3✔
761
                        }
3✔
762

763
                // Have active session queue, process backups.
764
                case c.sessionQueue != nil:
3✔
765
                        if c.prevTask != nil {
3✔
766
                                c.processTask(c.prevTask)
×
767

×
768
                                // Continue to ensure the sessionQueue is
×
769
                                // properly initialized before attempting to
×
770
                                // process more tasks from the pipeline.
×
771
                                continue
×
772
                        }
773

774
                        // Normal operation where new tasks are read from the
775
                        // pipeline.
776
                        select {
3✔
777

778
                        // If any sessions are negotiated while we have an
779
                        // active session queue, queue them for future use.
780
                        // This shouldn't happen with the current design, so
781
                        // it doesn't hurt to select here just in case. In the
782
                        // future, we will likely allow more asynchrony so that
783
                        // we can request new sessions before the session is
784
                        // fully empty, which this case would handle.
785
                        case session := <-c.negotiator.NewSessions():
×
786
                                c.log.Warnf("Acquired new session with id=%s "+
×
787
                                        "while processing tasks", session.ID)
×
788
                                c.candidateSessions[session.ID] = session
×
789
                                c.stats.sessionAcquired()
×
790

791
                        case <-c.statTicker.C:
×
792
                                c.log.Infof("Client stats: %s", c.stats)
×
793

794
                        // Process each backup task serially from the queue of
795
                        // revoked states.
796
                        case task, ok := <-c.pipeline.NextBackupID():
3✔
797
                                // All backups in the pipeline have been
3✔
798
                                // processed, it is now safe to exit.
3✔
799
                                if !ok {
3✔
800
                                        return
×
801
                                }
×
802

803
                                c.log.Debugf("Processing %v", task)
3✔
804

3✔
805
                                c.stats.taskReceived()
3✔
806
                                c.processTask(task)
3✔
807

808
                        // A new tower has been requested to be added. We'll
809
                        // update our persisted and in-memory state and consider
810
                        // its corresponding sessions, if any, as new
811
                        // candidates.
812
                        case msg := <-c.newTowers:
3✔
813
                                msg.errChan <- c.handleNewTower(msg.tower)
3✔
814

815
                        // A tower has been removed, so we'll remove certain
816
                        // information that's persisted and also in our
817
                        // in-memory state depending on the request, and set any
818
                        // of its corresponding candidate sessions as inactive.
819
                        case msg := <-c.staleTowers:
3✔
820
                                msg.errChan <- c.handleStaleTower(msg)
3✔
821

822
                        // A tower has been requested to be de-activated.
823
                        case msg := <-c.deactivateTowers:
3✔
824
                                msg.errChan <- c.handleDeactivateTower(msg)
3✔
825

826
                        // A request has come through to terminate a session.
827
                        case msg := <-c.terminateSessions:
3✔
828
                                msg.errChan <- c.handleTerminateSession(msg)
3✔
829

830
                        case <-c.quit:
3✔
831
                                return
3✔
832
                        }
833
                }
834
        }
835
}
836

837
// processTask attempts to schedule the given backupTask on the active
838
// sessionQueue. The task will either be accepted or rejected, after which the
839
// appropriate modifications to the client's state machine will be made. After
840
// every invocation of processTask, the caller should ensure that the
841
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
842
// that are rejected because the active sessionQueue is full will be cached as
843
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
844
func (c *client) processTask(task *wtdb.BackupID) {
3✔
845
        script, ok := c.cfg.getSweepScript(task.ChanID)
3✔
846
        if !ok {
3✔
847
                log.Infof("not processing task for unregistered channel: %s",
×
848
                        task.ChanID)
×
849

×
850
                return
×
851
        }
×
852

853
        backupTask := newBackupTask(*task, script)
3✔
854

3✔
855
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
3✔
856
        if accepted {
6✔
857
                c.taskAccepted(task, status)
3✔
858
        } else {
3✔
859
                c.taskRejected(task, status)
×
860
        }
×
861
}
862

863
// taskAccepted processes the acceptance of a task by a sessionQueue depending
864
// on the state the sessionQueue is in *after* the task is added. The client's
865
// prevTask is always removed as a result of this call. The client's
866
// sessionQueue will be removed if accepting the task left the sessionQueue in
867
// an exhausted state.
868
func (c *client) taskAccepted(task *wtdb.BackupID,
869
        newStatus sessionQueueStatus) {
3✔
870

3✔
871
        c.log.Infof("Queued %v successfully for session %v", task,
3✔
872
                c.sessionQueue.ID())
3✔
873

3✔
874
        c.stats.taskAccepted()
3✔
875

3✔
876
        // If this task was accepted, we discard anything held in the prevTask.
3✔
877
        // Either it was nil before, or is the task which was just accepted.
3✔
878
        c.prevTask = nil
3✔
879

3✔
880
        switch newStatus {
3✔
881

882
        // The sessionQueue still has capacity after accepting this task.
883
        case sessionQueueAvailable:
3✔
884

885
        // The sessionQueue is full after accepting this task, so we will need
886
        // to request a new one before proceeding.
887
        case sessionQueueExhausted:
3✔
888
                c.stats.sessionExhausted()
3✔
889

3✔
890
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
3✔
891

3✔
892
                // This task left the session exhausted, set it to nil and
3✔
893
                // proceed to the next loop, so we can consume another
3✔
894
                // pre-negotiated session or request another.
3✔
895
                c.sessionQueue = nil
3✔
896
        }
897
}
898

899
// taskRejected process the rejection of a task by a sessionQueue depending on
900
// the state the was in *before* the task was rejected. The client's prevTask
901
// will cache the task if the sessionQueue was exhausted beforehand, and nil
902
// the sessionQueue to find a new session. If the sessionQueue was not
903
// exhausted and not shutting down, the client marks the task as ineligible, as
904
// this implies we couldn't construct a valid justice transaction given the
905
// session's policy.
906
func (c *client) taskRejected(task *wtdb.BackupID,
907
        curStatus sessionQueueStatus) {
×
908

×
909
        switch curStatus {
×
910

911
        // The sessionQueue has available capacity but the task was rejected,
912
        // this indicates that the task was ineligible for backup.
913
        case sessionQueueAvailable:
×
914
                c.stats.taskIneligible()
×
915

×
916
                c.log.Infof("Ignoring ineligible %v", task)
×
917

×
918
                err := c.cfg.DB.MarkBackupIneligible(
×
919
                        task.ChanID, task.CommitHeight,
×
920
                )
×
921
                if err != nil {
×
922
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
923
                                task, err)
×
924

×
925
                        // It is safe to not handle this error, even if we could
×
926
                        // not persist the result. At worst, this task may be
×
927
                        // reprocessed on a subsequent start up, and will either
×
928
                        // succeed do a change in session parameters or fail in
×
929
                        // the same manner.
×
930
                }
×
931

932
                // If this task was rejected *and* the session had available
933
                // capacity, we discard anything held in the prevTask. Either it
934
                // was nil before, or is the task which was just rejected.
935
                c.prevTask = nil
×
936

937
        // The sessionQueue rejected the task because it is full, we will stash
938
        // this task and try to add it to the next available sessionQueue.
939
        case sessionQueueExhausted:
×
940
                c.stats.sessionExhausted()
×
941

×
942
                c.log.Debugf("Session %v exhausted, %v queued for next session",
×
943
                        c.sessionQueue.ID(), task)
×
944

×
945
                // Cache the task that we pulled off, so that we can process it
×
946
                // once a new session queue is available.
×
947
                c.sessionQueue = nil
×
948
                c.prevTask = task
×
949

950
        // The sessionQueue rejected the task because it is shutting down. We
951
        // will stash this task and try to add it to the next available
952
        // sessionQueue.
953
        case sessionQueueShuttingDown:
×
954
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
955
                        "next session", c.sessionQueue.ID(), task)
×
956

×
957
                // Cache the task that we pulled off, so that we can process it
×
958
                // once a new session queue is available.
×
959
                c.sessionQueue = nil
×
960
                c.prevTask = task
×
961
        }
962
}
963

964
// dial connects the peer at addr using privKey as our secret key for the
965
// connection. The connection will use the configured Net's resolver to resolve
966
// the address for either Tor or clear net connections.
967
func (c *client) dial(localKey keychain.SingleKeyECDH,
968
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
3✔
969

3✔
970
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
3✔
971
}
3✔
972

973
// readMessage receives and parses the next message from the given Peer. An
974
// error is returned if a message is not received before the server's read
975
// timeout, the read off the wire failed, or the message could not be
976
// deserialized.
977
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
3✔
978
        // Set a read timeout to ensure we drop the connection if nothing is
3✔
979
        // received in a timely manner.
3✔
980
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
3✔
981
        if err != nil {
3✔
982
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
983
                c.log.Errorf("Unable to read msg: %v", err)
×
984
                return nil, err
×
985
        }
×
986

987
        // Pull the next message off the wire,
988
        rawMsg, err := peer.ReadNextMessage()
3✔
989
        if err != nil {
3✔
990
                err = fmt.Errorf("unable to read message: %w", err)
×
991
                c.log.Errorf("Unable to read msg: %v", err)
×
992
                return nil, err
×
993
        }
×
994

995
        // Parse the received message according to the watchtower wire
996
        // specification.
997
        msgReader := bytes.NewReader(rawMsg)
3✔
998
        msg, err := wtwire.ReadMessage(msgReader, 0)
3✔
999
        if err != nil {
3✔
1000
                err = fmt.Errorf("unable to parse message: %w", err)
×
1001
                c.log.Errorf("Unable to read msg: %v", err)
×
1002
                return nil, err
×
1003
        }
×
1004

1005
        c.logMessage(peer, msg, true)
3✔
1006

3✔
1007
        return msg, nil
3✔
1008
}
1009

1010
// sendMessage sends a watchtower wire message to the target peer.
1011
func (c *client) sendMessage(peer wtserver.Peer,
1012
        msg wtwire.Message) error {
3✔
1013

3✔
1014
        // Encode the next wire message into the buffer.
3✔
1015
        // TODO(conner): use buffer pool
3✔
1016
        var b bytes.Buffer
3✔
1017
        _, err := wtwire.WriteMessage(&b, msg, 0)
3✔
1018
        if err != nil {
3✔
1019
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1020
                c.log.Errorf("Unable to send msg: %v", err)
×
1021
                return err
×
1022
        }
×
1023

1024
        // Set the write deadline for the connection, ensuring we drop the
1025
        // connection if nothing is sent in a timely manner.
1026
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
3✔
1027
        if err != nil {
3✔
1028
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1029
                c.log.Errorf("Unable to send msg: %v", err)
×
1030
                return err
×
1031
        }
×
1032

1033
        c.logMessage(peer, msg, false)
3✔
1034

3✔
1035
        // Write out the full message to the remote peer.
3✔
1036
        _, err = peer.Write(b.Bytes())
3✔
1037
        if err != nil {
3✔
1038
                c.log.Errorf("Unable to send msg: %v", err)
×
1039
        }
×
1040
        return err
3✔
1041
}
1042

1043
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1044
// database and supplying it with the resources needed by the client.
1045
func (c *client) newSessionQueue(s *ClientSession,
1046
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1047

3✔
1048
        return newSessionQueue(&sessionQueueConfig{
3✔
1049
                ClientSession:          s,
3✔
1050
                ChainHash:              c.cfg.ChainHash,
3✔
1051
                Dial:                   c.dial,
3✔
1052
                ReadMessage:            c.readMessage,
3✔
1053
                SendMessage:            c.sendMessage,
3✔
1054
                Signer:                 c.cfg.Signer,
3✔
1055
                DB:                     c.cfg.DB,
3✔
1056
                MinBackoff:             c.cfg.MinBackoff,
3✔
1057
                MaxBackoff:             c.cfg.MaxBackoff,
3✔
1058
                Log:                    c.log,
3✔
1059
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
3✔
1060
                TaskPipeline:           c.pipeline,
3✔
1061
        }, updates)
3✔
1062
}
3✔
1063

1064
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1065
// passed ClientSession. If it exists, the active sessionQueue is returned.
1066
// Otherwise, a new sessionQueue is initialized and added to the set.
1067
func (c *client) getOrInitActiveQueue(s *ClientSession,
1068
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1069

3✔
1070
        if sq, ok := c.activeSessions.Get(s.ID); ok {
3✔
1071
                return sq
×
1072
        }
×
1073

1074
        return c.initActiveQueue(s, updates)
3✔
1075
}
1076

1077
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1078
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1079
// so that it can deliver any committed updates or begin accepting newly
1080
// assigned tasks.
1081
func (c *client) initActiveQueue(s *ClientSession,
1082
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1083

3✔
1084
        // Initialize the session queue, providing it with all the resources it
3✔
1085
        // requires from the client instance.
3✔
1086
        sq := c.newSessionQueue(s, updates)
3✔
1087

3✔
1088
        // Add the session queue as an active session so that we remember to
3✔
1089
        // stop it on shutdown. This method will also start the queue so that it
3✔
1090
        // can be active in processing newly assigned tasks or to upload
3✔
1091
        // previously committed updates.
3✔
1092
        c.activeSessions.AddAndStart(sq)
3✔
1093

3✔
1094
        return sq
3✔
1095
}
3✔
1096

1097
// terminateSession sets the given session's status to CSessionTerminal meaning
1098
// that it will not be used again.
1099
func (c *client) terminateSession(id wtdb.SessionID) error {
3✔
1100
        errChan := make(chan error, 1)
3✔
1101

3✔
1102
        select {
3✔
1103
        case c.terminateSessions <- &terminateSessMsg{
1104
                id:      id,
1105
                errChan: errChan,
1106
        }:
3✔
1107
        case <-c.pipeline.quit:
×
1108
                return ErrClientExiting
×
1109
        }
1110

1111
        select {
3✔
1112
        case err := <-errChan:
3✔
1113
                return err
3✔
1114
        case <-c.pipeline.quit:
×
1115
                return ErrClientExiting
×
1116
        }
1117
}
1118

1119
// handleTerminateSession handles a request to terminate a session. It will
1120
// first shut down the session if it is part of the active session set, then
1121
// it will ensure that the active session queue is set reset if it is using the
1122
// session in question. Finally, the session's status in the DB will be updated.
1123
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
3✔
1124
        id := msg.id
3✔
1125

3✔
1126
        delete(c.candidateSessions, id)
3✔
1127

3✔
1128
        err := c.activeSessions.StopAndRemove(id, true)
3✔
1129
        if err != nil {
3✔
1130
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1131
        }
×
1132

1133
        // If our active session queue corresponds to the session being
1134
        // terminated, then we'll proceed to negotiate a new one.
1135
        if c.sessionQueue != nil {
6✔
1136
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
6✔
1137
                        c.sessionQueue = nil
3✔
1138
                }
3✔
1139
        }
1140

1141
        return nil
3✔
1142
}
1143

1144
// deactivateTower sends a tower deactivation request to the backupDispatcher
1145
// where it will be handled synchronously. The request should result in all the
1146
// sessions that we have with the given tower being shutdown and removed from
1147
// our in-memory set of active sessions.
1148
func (c *client) deactivateTower(id wtdb.TowerID,
1149
        pubKey *btcec.PublicKey) error {
3✔
1150

3✔
1151
        errChan := make(chan error, 1)
3✔
1152

3✔
1153
        select {
3✔
1154
        case c.deactivateTowers <- &deactivateTowerMsg{
1155
                id:      id,
1156
                pubKey:  pubKey,
1157
                errChan: errChan,
1158
        }:
3✔
1159
        case <-c.pipeline.quit:
×
1160
                return ErrClientExiting
×
1161
        }
1162

1163
        select {
3✔
1164
        case err := <-errChan:
3✔
1165
                return err
3✔
1166
        case <-c.pipeline.quit:
×
1167
                return ErrClientExiting
×
1168
        }
1169
}
1170

1171
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1172
// it from the in-memory candidate set, and we will also stop any active
1173
// sessions we have with this tower.
1174
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
3✔
1175
        // Remove the tower from our in-memory candidate set so that it is not
3✔
1176
        // used for any new session negotiations.
3✔
1177
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
3✔
1178
        if err != nil {
3✔
1179
                return err
×
1180
        }
×
1181

1182
        pubKey := msg.pubKey.SerializeCompressed()
3✔
1183
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
3✔
1184
        if err != nil {
3✔
1185
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1186
                        "%v", pubKey, err)
×
1187
        }
×
1188

1189
        // Iterate over all the sessions we have for this tower and remove them
1190
        // from our candidate set and also from our set of started, active
1191
        // sessions.
1192
        for sessionID := range sessions {
6✔
1193
                delete(c.candidateSessions, sessionID)
3✔
1194

3✔
1195
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1196
                if err != nil {
3✔
1197
                        return fmt.Errorf("could not stop session %s: %w",
×
1198
                                sessionID, err)
×
1199
                }
×
1200
        }
1201

1202
        // If our active session queue corresponds to the stale tower, we'll
1203
        // proceed to negotiate a new one.
1204
        if c.sessionQueue != nil {
6✔
1205
                towerKey := c.sessionQueue.tower.IdentityKey
3✔
1206

3✔
1207
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
6✔
1208
                        c.sessionQueue = nil
3✔
1209
                }
3✔
1210
        }
1211

1212
        return nil
3✔
1213
}
1214

1215
// addTower adds a new watchtower reachable at the given address and considers
1216
// it for new sessions. If the watchtower already exists, then any new addresses
1217
// included will be considered when dialing it for session negotiations and
1218
// backups.
1219
func (c *client) addTower(tower *Tower) error {
3✔
1220
        errChan := make(chan error, 1)
3✔
1221

3✔
1222
        select {
3✔
1223
        case c.newTowers <- &newTowerMsg{
1224
                tower:   tower,
1225
                errChan: errChan,
1226
        }:
3✔
1227
        case <-c.pipeline.quit:
×
1228
                return ErrClientExiting
×
1229
        }
1230

1231
        select {
3✔
1232
        case err := <-errChan:
3✔
1233
                return err
3✔
1234
        case <-c.pipeline.quit:
×
1235
                return ErrClientExiting
×
1236
        }
1237
}
1238

1239
// handleNewTower handles a request for a new tower to be added. If the tower
1240
// already exists, then its corresponding sessions, if any, will be set
1241
// considered as candidates.
1242
func (c *client) handleNewTower(tower *Tower) error {
3✔
1243
        c.candidateTowers.AddCandidate(tower)
3✔
1244

3✔
1245
        // Include all of its corresponding sessions to our set of candidates.
3✔
1246
        sessions, err := getClientSessions(
3✔
1247
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
3✔
1248
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
3✔
1249
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
3✔
1250
        )
3✔
1251
        if err != nil {
3✔
1252
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1253
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1254
        }
×
1255
        for id, session := range sessions {
6✔
1256
                c.candidateSessions[id] = session
3✔
1257
        }
3✔
1258

1259
        return nil
3✔
1260
}
1261

1262
// removeTower removes a watchtower from being considered for future session
1263
// negotiations and from being used for any subsequent backups until it's added
1264
// again. If an address is provided, then this call only serves as a way of
1265
// removing the address from the watchtower instead.
1266
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1267
        addr net.Addr) error {
3✔
1268

3✔
1269
        errChan := make(chan error, 1)
3✔
1270

3✔
1271
        select {
3✔
1272
        case c.staleTowers <- &staleTowerMsg{
1273
                id:      id,
1274
                pubKey:  pubKey,
1275
                addr:    addr,
1276
                errChan: errChan,
1277
        }:
3✔
1278
        case <-c.pipeline.quit:
×
1279
                return ErrClientExiting
×
1280
        }
1281

1282
        select {
3✔
1283
        case err := <-errChan:
3✔
1284
                return err
3✔
1285
        case <-c.pipeline.quit:
×
1286
                return ErrClientExiting
×
1287
        }
1288
}
1289

1290
// handleStaleTower handles a request for an existing tower to be removed. If
1291
// none of the tower's sessions have pending updates, then they will become
1292
// inactive and removed as candidates. If the active session queue corresponds
1293
// to any of these sessions, a new one will be negotiated.
1294
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
3✔
1295
        // We'll first update our in-memory state.
3✔
1296
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
3✔
1297
        if err != nil {
3✔
1298
                return err
×
1299
        }
×
1300

1301
        // If an address was provided, then we're only meant to remove the
1302
        // address from the tower.
1303
        if msg.addr != nil {
3✔
1304
                return nil
×
1305
        }
×
1306

1307
        // Otherwise, the tower should no longer be used for future session
1308
        // negotiations and backups.
1309

1310
        pubKey := msg.pubKey.SerializeCompressed()
3✔
1311
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
3✔
1312
        if err != nil {
3✔
1313
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1314
                        "%v", pubKey, err)
×
1315
        }
×
1316
        for sessionID := range sessions {
6✔
1317
                delete(c.candidateSessions, sessionID)
3✔
1318

3✔
1319
                // Shutdown the session so that any pending updates are
3✔
1320
                // replayed back onto the main task pipeline.
3✔
1321
                err = c.activeSessions.StopAndRemove(sessionID, true)
3✔
1322
                if err != nil {
3✔
1323
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1324
                                err)
×
1325
                }
×
1326
        }
1327

1328
        // If our active session queue corresponds to the stale tower, we'll
1329
        // proceed to negotiate a new one.
1330
        if c.sessionQueue != nil {
6✔
1331
                towerKey := c.sessionQueue.tower.IdentityKey
3✔
1332

3✔
1333
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
6✔
1334
                        c.sessionQueue = nil
3✔
1335
                }
3✔
1336
        }
1337

1338
        return nil
3✔
1339
}
1340

1341
// registeredTowers retrieves the list of watchtowers registered with the
1342
// client.
1343
func (c *client) registeredTowers(towers []*wtdb.Tower,
1344
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1345

×
1346
        // Generate a filter that will fetch all the client's sessions
×
1347
        // regardless of if they are active or not.
×
1348
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1349

×
1350
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1351
        if err != nil {
×
1352
                return nil, err
×
1353
        }
×
1354

1355
        // Construct a lookup map that coalesces all the sessions for a specific
1356
        // watchtower.
1357
        towerSessions := make(
×
1358
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1359
        )
×
1360
        for id, s := range clientSessions {
×
1361
                sessions, ok := towerSessions[s.TowerID]
×
1362
                if !ok {
×
1363
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1364
                        towerSessions[s.TowerID] = sessions
×
1365
                }
×
1366
                sessions[id] = s
×
1367
        }
1368

1369
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1370
        for _, tower := range towers {
×
1371
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1372
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1373
                        Tower:                  tower,
×
1374
                        Sessions:               towerSessions[tower.ID],
×
1375
                        ActiveSessionCandidate: isActive,
×
1376
                })
×
1377
        }
×
1378

1379
        return registeredTowers, nil
×
1380
}
1381

1382
// lookupTower retrieves the info of sessions held with the given tower handled
1383
// by this client.
1384
func (c *client) lookupTower(tower *wtdb.Tower,
1385
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
3✔
1386

3✔
1387
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
3✔
1388

3✔
1389
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
3✔
1390
        if err != nil {
3✔
1391
                return nil, err
×
1392
        }
×
1393

1394
        return &RegisteredTower{
3✔
1395
                Tower:                  tower,
3✔
1396
                Sessions:               towerSessions,
3✔
1397
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
3✔
1398
        }, nil
3✔
1399
}
1400

1401
// getStats returns the in-memory statistics of the client since startup.
1402
func (c *client) getStats() ClientStats {
3✔
1403
        return c.stats.getStatsCopy()
3✔
1404
}
3✔
1405

1406
// policy returns the active client policy configuration.
1407
func (c *client) policy() wtpolicy.Policy {
3✔
1408
        return c.cfg.Policy
3✔
1409
}
3✔
1410

1411
// logMessage writes information about a message received from a remote peer,
1412
// using directional prepositions to signal whether the message was sent or
1413
// received.
1414
func (c *client) logMessage(
1415
        peer wtserver.Peer, msg wtwire.Message, read bool) {
3✔
1416

3✔
1417
        var action = "Received"
3✔
1418
        var preposition = "from"
3✔
1419
        if !read {
6✔
1420
                action = "Sending"
3✔
1421
                preposition = "to"
3✔
1422
        }
3✔
1423

1424
        summary := wtwire.MessageSummary(msg)
3✔
1425
        if len(summary) > 0 {
6✔
1426
                summary = "(" + summary + ")"
3✔
1427
        }
3✔
1428

1429
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
3✔
1430
                preposition, peer.RemotePub().SerializeCompressed(),
3✔
1431
                peer.RemoteAddr())
3✔
1432
}
1433

1434
func newRandomDelay(max uint32) (uint32, error) {
3✔
1435
        var maxDelay big.Int
3✔
1436
        maxDelay.SetUint64(uint64(max))
3✔
1437

3✔
1438
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
3✔
1439
        if err != nil {
3✔
1440
                return 0, err
×
1441
        }
×
1442

1443
        return uint32(randDelay.Uint64()), nil
3✔
1444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc