• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.96
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "math/big"
9
        "net"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btclog/v2"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/keychain"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
20
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
21
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
22
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
23
)
24

25
const (
26
        // DefaultReadTimeout specifies the default duration we will wait during
27
        // a read before breaking out of a blocking read.
28
        DefaultReadTimeout = 15 * time.Second
29

30
        // DefaultWriteTimeout specifies the default duration we will wait
31
        // during a write before breaking out of a blocking write.
32
        DefaultWriteTimeout = 15 * time.Second
33

34
        // DefaultStatInterval specifies the default interval between logging
35
        // metrics about the client's operation.
36
        DefaultStatInterval = time.Minute
37

38
        // DefaultSessionCloseRange is the range over which we will generate a
39
        // random number of blocks to delay closing a session after its last
40
        // channel has been closed.
41
        DefaultSessionCloseRange = 288
42

43
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
44
        // in the in-memory queue.
45
        DefaultMaxTasksInMemQueue = 2000
46
)
47

48
// genSessionFilter constructs a filter that can be used to select sessions only
49
// if they match the policy of the client (namely anchor vs legacy). If
50
// activeOnly is set, then only active sessions will be returned.
51
func (c *client) genSessionFilter(
52
        activeOnly bool) wtdb.ClientSessionFilterFn {
85✔
53

85✔
54
        return func(session *wtdb.ClientSession) bool {
113✔
55
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
28✔
UNCOV
56
                        return false
×
UNCOV
57
                }
×
58

59
                if !activeOnly {
32✔
60
                        return true
4✔
61
                }
4✔
62

63
                return session.Status == wtdb.CSessionActive
24✔
64
        }
65
}
66

67
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
68
// function that will filter out any sessions that have been exhausted. A
69
// session is considered exhausted only if it has no un-acked updates and the
70
// sequence number of the session is equal to the max updates of the session
71
// policy.
72
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
83✔
73
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
107✔
74
                return session.SeqNum < session.Policy.MaxUpdates ||
24✔
75
                        numUnAcked > 0
24✔
76
        }
24✔
77
}
78

79
// RegisteredTower encompasses information about a registered watchtower with
80
// the client.
81
type RegisteredTower struct {
82
        *wtdb.Tower
83

84
        // Sessions is the set of sessions corresponding to the watchtower.
85
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
86

87
        // ActiveSessionCandidate determines whether the watchtower is currently
88
        // being considered for new sessions.
89
        ActiveSessionCandidate bool
90
}
91

92
// BreachRetributionBuilder is a function that can be used to construct a
93
// BreachRetribution from a channel ID and a commitment height.
94
type BreachRetributionBuilder func(id lnwire.ChannelID,
95
        commitHeight uint64) (*lnwallet.BreachRetribution,
96
        channeldb.ChannelType, error)
97

98
// newTowerMsg is an internal message we'll use within the client to signal
99
// that a new tower can be considered.
100
type newTowerMsg struct {
101
        // tower holds the info about the new Tower or new tower address
102
        // required to connect to it.
103
        tower *Tower
104

105
        // errChan is the channel through which we'll send a response back to
106
        // the caller when handling their request.
107
        //
108
        // NOTE: This channel must be buffered.
109
        errChan chan error
110
}
111

112
// staleTowerMsg is an internal message we'll use within the client to
113
// signal that a tower should no longer be considered.
114
type staleTowerMsg struct {
115
        // id is the unique database identifier for the tower.
116
        id wtdb.TowerID
117

118
        // pubKey is the identifying public key of the watchtower.
119
        pubKey *btcec.PublicKey
120

121
        // addr is an optional field that when set signals that the address
122
        // should be removed from the watchtower's set of addresses, indicating
123
        // that it is stale. If it's not set, then the watchtower should be
124
        // no longer be considered for new sessions.
125
        addr net.Addr
126

127
        // errChan is the channel through which we'll send a response back to
128
        // the caller when handling their request.
129
        //
130
        // NOTE: This channel must be buffered.
131
        errChan chan error
132
}
133

134
// deactivateTowerMsg is an internal message we'll use within the TowerClient
135
// to signal that a tower should be marked as inactive.
136
type deactivateTowerMsg struct {
137
        // id is the unique database identifier for the tower.
138
        id wtdb.TowerID
139

140
        // pubKey is the identifying public key of the watchtower.
141
        pubKey *btcec.PublicKey
142

143
        // errChan is the channel through which we'll send a response back to
144
        // the caller when handling their request.
145
        //
146
        // NOTE: This channel must be buffered.
147
        errChan chan error
148
}
149

150
// terminateSessMsg is an internal message we'll use within the TowerClient to
151
// signal that a session should be terminated.
152
type terminateSessMsg struct {
153
        // id is the session identifier.
154
        id wtdb.SessionID
155

156
        // errChan is the channel through which we'll send a response back to
157
        // the caller when handling their request.
158
        //
159
        // NOTE: This channel must be buffered.
160
        errChan chan error
161
}
162

163
// clientCfg holds the configuration values required by a client.
164
type clientCfg struct {
165
        *Config
166

167
        // Policy is the session policy the client will propose when creating
168
        // new sessions with the tower. If the policy differs from any active
169
        // sessions recorded in the database, those sessions will be ignored and
170
        // new sessions will be requested immediately.
171
        Policy wtpolicy.Policy
172

173
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
174
}
175

176
// client manages backing up revoked states for all states that fall under a
177
// specific policy type.
178
type client struct {
179
        cfg *clientCfg
180

181
        log btclog.Logger
182

183
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
184

185
        negotiator        SessionNegotiator
186
        candidateTowers   TowerCandidateIterator
187
        candidateSessions map[wtdb.SessionID]*ClientSession
188
        activeSessions    *sessionQueueSet
189

190
        sessionQueue *sessionQueue
191
        prevTask     *wtdb.BackupID
192

193
        statTicker *time.Ticker
194
        stats      *clientStats
195

196
        newTowers         chan *newTowerMsg
197
        staleTowers       chan *staleTowerMsg
198
        deactivateTowers  chan *deactivateTowerMsg
199
        terminateSessions chan *terminateSessMsg
200

201
        wg   sync.WaitGroup
202
        quit chan struct{}
203
}
204

205
// newClient initializes a new client from the provided clientCfg. An error is
206
// returned if the client could not be initialized.
207
func newClient(cfg *clientCfg) (*client, error) {
36✔
208
        identifier, err := cfg.Policy.BlobType.Identifier()
36✔
209
        if err != nil {
36✔
210
                return nil, err
×
211
        }
×
212
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
36✔
213

36✔
214
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
36✔
215
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
36✔
216
                queueDB, cfg.MaxTasksInMemQueue, plog,
36✔
217
        )
36✔
218
        if err != nil {
36✔
219
                return nil, err
×
220
        }
×
221

222
        c := &client{
36✔
223
                cfg:               cfg,
36✔
224
                log:               plog,
36✔
225
                pipeline:          queue,
36✔
226
                activeSessions:    newSessionQueueSet(),
36✔
227
                statTicker:        time.NewTicker(DefaultStatInterval),
36✔
228
                stats:             new(clientStats),
36✔
229
                newTowers:         make(chan *newTowerMsg),
36✔
230
                staleTowers:       make(chan *staleTowerMsg),
36✔
231
                deactivateTowers:  make(chan *deactivateTowerMsg),
36✔
232
                terminateSessions: make(chan *terminateSessMsg),
36✔
233
                quit:              make(chan struct{}),
36✔
234
        }
36✔
235

36✔
236
        candidateTowers := newTowerListIterator()
36✔
237
        perActiveTower := func(tower *Tower) {
45✔
238
                // If the tower has already been marked as active, then there is
9✔
239
                // no need to add it to the iterator again.
9✔
240
                if candidateTowers.IsActive(tower.ID) {
9✔
241
                        return
×
242
                }
×
243

244
                c.log.Infof("Using private watchtower %x, offering policy %s",
9✔
245
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
9✔
246

9✔
247
                // Add the tower to the set of candidate towers.
9✔
248
                candidateTowers.AddCandidate(tower)
9✔
249
        }
250

251
        // Load all candidate sessions and towers from the database into the
252
        // client. We will use any of these sessions if their policies match the
253
        // current policy of the client, otherwise they will be ignored and new
254
        // sessions will be requested.
255
        candidateSessions, err := getTowerAndSessionCandidates(
36✔
256
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
36✔
257
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
36✔
258
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
36✔
259
        )
36✔
260
        if err != nil {
36✔
261
                return nil, err
×
262
        }
×
263

264
        c.candidateTowers = candidateTowers
36✔
265
        c.candidateSessions = candidateSessions
36✔
266

36✔
267
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
36✔
268
                DB:            cfg.DB,
36✔
269
                SecretKeyRing: cfg.SecretKeyRing,
36✔
270
                Policy:        cfg.Policy,
36✔
271
                ChainHash:     cfg.ChainHash,
36✔
272
                SendMessage:   c.sendMessage,
36✔
273
                ReadMessage:   c.readMessage,
36✔
274
                Dial:          c.dial,
36✔
275
                Candidates:    c.candidateTowers,
36✔
276
                MinBackoff:    cfg.MinBackoff,
36✔
277
                MaxBackoff:    cfg.MaxBackoff,
36✔
278
                Log:           plog,
36✔
279
        })
36✔
280

36✔
281
        return c, nil
36✔
282
}
283

284
// getTowerAndSessionCandidates loads all the towers from the DB and then
285
// fetches the sessions for each of tower. Sessions are only collected if they
286
// pass the sessionFilter check. If a tower has a session that does pass the
287
// sessionFilter check then the perActiveTower call-back will be called on that
288
// tower.
289
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
290
        perActiveTower func(tower *Tower),
291
        opts ...wtdb.ClientSessionListOption) (
292
        map[wtdb.SessionID]*ClientSession, error) {
36✔
293

36✔
294
        // Fetch all active towers from the DB.
36✔
295
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
45✔
296
                return tower.Status == wtdb.TowerStatusActive
9✔
297
        })
9✔
298
        if err != nil {
36✔
299
                return nil, err
×
300
        }
×
301

302
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
36✔
303
        for _, dbTower := range towers {
45✔
304
                tower, err := NewTowerFromDBTower(dbTower)
9✔
305
                if err != nil {
9✔
306
                        return nil, err
×
307
                }
×
308

309
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
9✔
310
                if err != nil {
9✔
311
                        return nil, err
×
312
                }
×
313

314
                for _, s := range sessions {
17✔
315
                        cs, err := NewClientSessionFromDBSession(
8✔
316
                                s, tower, keyRing,
8✔
317
                        )
8✔
318
                        if err != nil {
8✔
319
                                return nil, err
×
320
                        }
×
321

322
                        // Add the session to the set of candidate sessions.
323
                        candidateSessions[s.ID] = cs
8✔
324
                }
325

326
                perActiveTower(tower)
9✔
327
        }
328

329
        return candidateSessions, nil
36✔
330
}
331

332
// getClientSessions retrieves the client sessions for a particular tower if
333
// specified, otherwise all client sessions for all towers are retrieved. An
334
// optional filter can be provided to filter out any undesired client sessions.
335
//
336
// NOTE: This method should only be used when deserialization of a
337
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
338
// ListClientSessions method should be used.
339
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
340
        opts ...wtdb.ClientSessionListOption) (
341
        map[wtdb.SessionID]*ClientSession, error) {
47✔
342

47✔
343
        dbSessions, err := db.ListClientSessions(forTower, opts...)
47✔
344
        if err != nil {
47✔
345
                return nil, err
×
346
        }
×
347

348
        // Reload the tower from disk using the tower ID contained in each
349
        // candidate session. We will also rederive any session keys needed to
350
        // be able to communicate with the towers and authenticate session
351
        // requests. This prevents us from having to store the private keys on
352
        // disk.
353
        sessions := make(map[wtdb.SessionID]*ClientSession)
47✔
354
        for _, s := range dbSessions {
58✔
355
                dbTower, err := db.LoadTowerByID(s.TowerID)
11✔
356
                if err != nil {
11✔
357
                        return nil, err
×
358
                }
×
359

360
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
11✔
361
                        Family: keychain.KeyFamilyTowerSession,
11✔
362
                        Index:  s.KeyIndex,
11✔
363
                })
11✔
364
                if err != nil {
11✔
365
                        return nil, err
×
366
                }
×
367

368
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
11✔
369

11✔
370
                tower, err := NewTowerFromDBTower(dbTower)
11✔
371
                if err != nil {
11✔
372
                        return nil, err
×
373
                }
×
374

375
                sessions[s.ID] = &ClientSession{
11✔
376
                        ID:                s.ID,
11✔
377
                        ClientSessionBody: s.ClientSessionBody,
11✔
378
                        Tower:             tower,
11✔
379
                        SessionKeyECDH:    sessionKeyECDH,
11✔
380
                }
11✔
381
        }
382

383
        return sessions, nil
47✔
384
}
385

386
// start initializes the watchtower client by loading or negotiating an active
387
// session and then begins processing backup tasks from the request pipeline.
388
func (c *client) start() error {
36✔
389
        c.log.Infof("Watchtower client starting")
36✔
390

36✔
391
        // First, restart a session queue for any sessions that have
36✔
392
        // committed but unacked state updates. This ensures that these
36✔
393
        // sessions will be able to flush the committed updates after a
36✔
394
        // restart.
36✔
395
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
36✔
396
        for _, session := range c.candidateSessions {
44✔
397
                committedUpdates, err := fetchCommittedUpdates(
8✔
398
                        &session.ID,
8✔
399
                )
8✔
400
                if err != nil {
8✔
401
                        return err
×
402
                }
×
403

404
                if len(committedUpdates) > 0 {
10✔
405
                        c.log.Infof("Starting session=%s to process "+
2✔
406
                                "%d committed backups", session.ID,
2✔
407
                                len(committedUpdates))
2✔
408

2✔
409
                        c.initActiveQueue(session, committedUpdates)
2✔
410
                }
2✔
411
        }
412

413
        // Now start the session negotiator, which will allow us to request new
414
        // session as soon as the backupDispatcher starts up.
415
        err := c.negotiator.Start()
36✔
416
        if err != nil {
36✔
417
                return err
×
418
        }
×
419

420
        // Start the task pipeline to which new backup tasks will be
421
        // submitted from active links.
422
        err = c.pipeline.Start()
36✔
423
        if err != nil {
36✔
424
                return err
×
425
        }
×
426

427
        c.wg.Add(1)
36✔
428
        go c.backupDispatcher()
36✔
429

36✔
430
        c.log.Infof("Watchtower client started successfully")
36✔
431

36✔
432
        return nil
36✔
433
}
434

435
// stop idempotently initiates a graceful shutdown of the watchtower client.
436
func (c *client) stop() error {
36✔
437
        var returnErr error
36✔
438
        c.log.Debugf("Stopping watchtower client")
36✔
439

36✔
440
        // 1. Stop the session negotiator.
36✔
441
        err := c.negotiator.Stop()
36✔
442
        if err != nil {
36✔
443
                returnErr = err
×
444
        }
×
445

446
        // 2. Stop the backup dispatcher and any other goroutines.
447
        close(c.quit)
36✔
448
        c.wg.Wait()
36✔
449

36✔
450
        // 3. If there was a left over 'prevTask' from the backup
36✔
451
        // dispatcher, replay that onto the pipeline.
36✔
452
        if c.prevTask != nil {
36✔
453
                err = c.pipeline.QueueBackupID(c.prevTask)
×
454
                if err != nil {
×
455
                        returnErr = err
×
456
                }
×
457
        }
458

459
        // 4. Shutdown all active session queues in parallel. These will
460
        // exit once all unhandled updates have been replayed to the
461
        // task pipeline.
462
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
103✔
463
                return func() {
134✔
464
                        err := s.Stop(false)
67✔
465
                        if err != nil {
67✔
466
                                c.log.Errorf("could not stop session "+
×
467
                                        "queue: %s: %v", s.ID(), err)
×
468

×
469
                                returnErr = err
×
470
                        }
×
471
                }
472
        })
473

474
        // 5. Shutdown the backup queue, which will prevent any further
475
        // updates from being accepted.
476
        if err = c.pipeline.Stop(); err != nil {
36✔
477
                returnErr = err
×
478
        }
×
479

480
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
36✔
481

36✔
482
        return returnErr
36✔
483
}
484

485
// backupState initiates a request to back up a particular revoked state. If the
486
// method returns nil, the backup is guaranteed to be successful unless the:
487
//   - justice transaction would create dust outputs when trying to abide by the
488
//     negotiated policy, or
489
//   - breached outputs contain too little value to sweep at the target sweep
490
//     fee rate.
491
func (c *client) backupState(chanID *lnwire.ChannelID,
492
        stateNum uint64) error {
469✔
493

469✔
494
        id := &wtdb.BackupID{
469✔
495
                ChanID:       *chanID,
469✔
496
                CommitHeight: stateNum,
469✔
497
        }
469✔
498

469✔
499
        return c.pipeline.QueueBackupID(id)
469✔
500
}
469✔
501

502
// nextSessionQueue attempts to fetch an active session from our set of
503
// candidate sessions. Candidate sessions with a differing policy from the
504
// active client's advertised policy will be ignored, but may be resumed if the
505
// client is restarted with a matching policy. If no candidates were found, nil
506
// is returned to signal that we need to request a new policy.
507
func (c *client) nextSessionQueue() (*sessionQueue, error) {
84✔
508
        // Select any candidate session at random, and remove it from the set of
84✔
509
        // candidate sessions.
84✔
510
        var candidateSession *ClientSession
84✔
511
        for id, sessionInfo := range c.candidateSessions {
168✔
512
                delete(c.candidateSessions, id)
84✔
513

84✔
514
                // Skip any sessions with policies that don't match the current
84✔
515
                // TxPolicy, as they would result in different justice
84✔
516
                // transactions from what is requested. These can be used again
84✔
517
                // if the client changes their configuration and restarting.
84✔
518
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
84✔
519
                        continue
×
520
                }
521

522
                candidateSession = sessionInfo
84✔
523
                break
84✔
524
        }
525

526
        // If none of the sessions could be used or none were found, we'll
527
        // return nil to signal that we need another session to be negotiated.
528
        if candidateSession == nil {
84✔
529
                return nil, nil
×
530
        }
×
531

532
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
84✔
533
                &candidateSession.ID,
84✔
534
        )
84✔
535
        if err != nil {
84✔
536
                return nil, err
×
537
        }
×
538

539
        // Initialize the session queue and spin it up, so it can begin handling
540
        // updates. If the queue was already made active on startup, this will
541
        // simply return the existing session queue from the set.
542
        return c.getOrInitActiveQueue(candidateSession, updates), nil
84✔
543
}
544

545
// stopAndRemoveSession stops the session with the given ID and removes it from
546
// the in-memory active sessions set.
547
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
4✔
548
        return c.activeSessions.StopAndRemove(id, final)
4✔
549
}
4✔
550

551
// deleteSessionFromTower dials the tower that we created the session with and
552
// attempts to send the tower the DeleteSession message.
553
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
4✔
554
        // First, we check if we have already loaded this tower in our
4✔
555
        // candidate towers iterator.
4✔
556
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
4✔
557
        if errors.Is(err, ErrTowerNotInIterator) {
4✔
558
                // If not, then we attempt to load it from the DB.
×
559
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
560
                if err != nil {
×
561
                        return err
×
562
                }
×
563

564
                tower, err = NewTowerFromDBTower(dbTower)
×
565
                if err != nil {
×
566
                        return err
×
567
                }
×
568
        } else if err != nil {
4✔
569
                return err
×
570
        }
×
571

572
        session, err := NewClientSessionFromDBSession(
4✔
573
                sess, tower, c.cfg.SecretKeyRing,
4✔
574
        )
4✔
575
        if err != nil {
4✔
576
                return err
×
577
        }
×
578

579
        localInit := wtwire.NewInitMessage(
4✔
580
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
4✔
581
                c.cfg.ChainHash,
4✔
582
        )
4✔
583

4✔
584
        var (
4✔
585
                conn wtserver.Peer
4✔
586

4✔
587
                // addrIterator is a copy of the tower's address iterator.
4✔
588
                // We use this copy so that iterating through the addresses does
4✔
589
                // not affect any other threads using this iterator.
4✔
590
                addrIterator = tower.Addresses.Copy()
4✔
591
                towerAddr    = addrIterator.Peek()
4✔
592
        )
4✔
593
        // Attempt to dial the tower with its available addresses.
4✔
594
        for {
8✔
595
                conn, err = c.dial(
4✔
596
                        session.SessionKeyECDH, &lnwire.NetAddress{
4✔
597
                                IdentityKey: tower.IdentityKey,
4✔
598
                                Address:     towerAddr,
4✔
599
                        },
4✔
600
                )
4✔
601
                if err != nil {
4✔
602
                        // If there are more addrs available, immediately try
×
603
                        // those.
×
604
                        nextAddr, iteratorErr := addrIterator.Next()
×
605
                        if iteratorErr == nil {
×
606
                                towerAddr = nextAddr
×
607
                                continue
×
608
                        }
609

610
                        // Otherwise, if we have exhausted the address list,
611
                        // exit.
612
                        addrIterator.Reset()
×
613

×
614
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
615
                                "available addresses",
×
616
                                tower.IdentityKey.SerializeCompressed())
×
617
                }
618

619
                break
4✔
620
        }
621
        defer conn.Close()
4✔
622

4✔
623
        // Send Init to tower.
4✔
624
        err = c.sendMessage(conn, localInit)
4✔
625
        if err != nil {
4✔
626
                return err
×
627
        }
×
628

629
        // Receive Init from tower.
630
        remoteMsg, err := c.readMessage(conn)
4✔
631
        if err != nil {
4✔
632
                return err
×
633
        }
×
634

635
        remoteInit, ok := remoteMsg.(*wtwire.Init)
4✔
636
        if !ok {
4✔
637
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
638
                        towerAddr, remoteMsg)
×
639
        }
×
640

641
        // Validate Init.
642
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
4✔
643
        if err != nil {
4✔
644
                return err
×
645
        }
×
646

647
        // Send DeleteSession to tower.
648
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
4✔
649
        if err != nil {
4✔
650
                return err
×
651
        }
×
652

653
        // Receive DeleteSessionReply from tower.
654
        remoteMsg, err = c.readMessage(conn)
4✔
655
        if err != nil {
4✔
656
                return err
×
657
        }
×
658

659
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
4✔
660
        if !ok {
4✔
661
                return fmt.Errorf("watchtower %s responded with %T to "+
×
662
                        "DeleteSession", towerAddr, remoteMsg)
×
663
        }
×
664

665
        switch deleteSessionReply.Code {
4✔
666
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
4✔
667
                return nil
4✔
668
        default:
×
669
                return fmt.Errorf("received error code %v in "+
×
670
                        "DeleteSessionReply when attempting to delete "+
×
671
                        "session from tower", deleteSessionReply.Code)
×
672
        }
673
}
674

675
// backupDispatcher processes events coming from the taskPipeline and is
676
// responsible for detecting when the client needs to renegotiate a session to
677
// fulfill continuing demand. The event loop exits if the client is quit.
678
//
679
// NOTE: This method MUST be run as a goroutine.
680
func (c *client) backupDispatcher() {
36✔
681
        defer c.wg.Done()
36✔
682

36✔
683
        c.log.Tracef("Starting backup dispatcher")
36✔
684
        defer c.log.Tracef("Stopping backup dispatcher")
36✔
685

36✔
686
        for {
730✔
687
                switch {
694✔
688

689
                // No active session queue and no additional sessions.
690
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
79✔
691
                        c.log.Infof("Requesting new session.")
79✔
692

79✔
693
                        // Immediately request a new session.
79✔
694
                        c.negotiator.RequestSession()
79✔
695

79✔
696
                        // Wait until we receive the newly negotiated session.
79✔
697
                        // All backups sent in the meantime are queued in the
79✔
698
                        // revoke queue, as we cannot process them.
79✔
699
                awaitSession:
79✔
700
                        select {
79✔
701
                        case session := <-c.negotiator.NewSessions():
74✔
702
                                c.log.Infof("Acquired new session with id=%s",
74✔
703
                                        session.ID)
74✔
704
                                c.candidateSessions[session.ID] = session
74✔
705
                                c.stats.sessionAcquired()
74✔
706

74✔
707
                                // We'll continue to choose the newly negotiated
74✔
708
                                // session as our active session queue.
74✔
709
                                continue
74✔
710

711
                        case <-c.statTicker.C:
×
712
                                c.log.Infof("Client stats: %s", c.stats)
×
713

714
                        // A new tower has been requested to be added. We'll
715
                        // update our persisted and in-memory state and consider
716
                        // its corresponding sessions, if any, as new
717
                        // candidates.
718
                        case msg := <-c.newTowers:
37✔
719
                                msg.errChan <- c.handleNewTower(msg.tower)
37✔
720

721
                        // A tower has been requested to be removed. We'll
722
                        // only allow removal of it if the address in question
723
                        // is not currently being used for session negotiation.
724
                        case msg := <-c.staleTowers:
4✔
725
                                msg.errChan <- c.handleStaleTower(msg)
4✔
726

727
                        // A tower has been requested to be de-activated. We'll
728
                        // only allow this if the tower is not currently being
729
                        // used for session negotiation.
730
                        case msg := <-c.deactivateTowers:
×
731
                                msg.errChan <- c.handleDeactivateTower(msg)
×
732

733
                        // A request has come through to terminate a session.
734
                        case msg := <-c.terminateSessions:
×
735
                                msg.errChan <- c.handleTerminateSession(msg)
×
736

737
                        case <-c.quit:
5✔
738
                                return
5✔
739
                        }
740

741
                        // Instead of looping, we'll jump back into the select
742
                        // case and await the delivery of the session to prevent
743
                        // us from re-requesting additional sessions.
744
                        goto awaitSession
41✔
745

746
                // No active session queue but have additional sessions.
747
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
84✔
748
                        // We've exhausted the prior session, we'll pop another
84✔
749
                        // from the remaining sessions and continue processing
84✔
750
                        // backup tasks.
84✔
751
                        var err error
84✔
752
                        c.sessionQueue, err = c.nextSessionQueue()
84✔
753
                        if err != nil {
84✔
754
                                c.log.Errorf("error fetching next session "+
×
755
                                        "queue: %v", err)
×
756
                        }
×
757

758
                        if c.sessionQueue != nil {
168✔
759
                                c.log.Debugf("Loaded next candidate session "+
84✔
760
                                        "queue id=%s", c.sessionQueue.ID())
84✔
761
                        }
84✔
762

763
                // Have active session queue, process backups.
764
                case c.sessionQueue != nil:
531✔
765
                        if c.prevTask != nil {
532✔
766
                                c.processTask(c.prevTask)
1✔
767

1✔
768
                                // Continue to ensure the sessionQueue is
1✔
769
                                // properly initialized before attempting to
1✔
770
                                // process more tasks from the pipeline.
1✔
771
                                continue
1✔
772
                        }
773

774
                        // Normal operation where new tasks are read from the
775
                        // pipeline.
776
                        select {
530✔
777

778
                        // If any sessions are negotiated while we have an
779
                        // active session queue, queue them for future use.
780
                        // This shouldn't happen with the current design, so
781
                        // it doesn't hurt to select here just in case. In the
782
                        // future, we will likely allow more asynchrony so that
783
                        // we can request new sessions before the session is
784
                        // fully empty, which this case would handle.
785
                        case session := <-c.negotiator.NewSessions():
×
786
                                c.log.Warnf("Acquired new session with id=%s "+
×
787
                                        "while processing tasks", session.ID)
×
788
                                c.candidateSessions[session.ID] = session
×
789
                                c.stats.sessionAcquired()
×
790

791
                        case <-c.statTicker.C:
×
792
                                c.log.Infof("Client stats: %s", c.stats)
×
793

794
                        // Process each backup task serially from the queue of
795
                        // revoked states.
796
                        case task, ok := <-c.pipeline.NextBackupID():
479✔
797
                                // All backups in the pipeline have been
479✔
798
                                // processed, it is now safe to exit.
479✔
799
                                if !ok {
479✔
800
                                        return
×
801
                                }
×
802

803
                                c.log.Debugf("Processing %v", task)
479✔
804

479✔
805
                                c.stats.taskReceived()
479✔
806
                                c.processTask(task)
479✔
807

808
                        // A new tower has been requested to be added. We'll
809
                        // update our persisted and in-memory state and consider
810
                        // its corresponding sessions, if any, as new
811
                        // candidates.
812
                        case msg := <-c.newTowers:
10✔
813
                                msg.errChan <- c.handleNewTower(msg.tower)
10✔
814

815
                        // A tower has been removed, so we'll remove certain
816
                        // information that's persisted and also in our
817
                        // in-memory state depending on the request, and set any
818
                        // of its corresponding candidate sessions as inactive.
819
                        case msg := <-c.staleTowers:
7✔
820
                                msg.errChan <- c.handleStaleTower(msg)
7✔
821

822
                        // A tower has been requested to be de-activated.
823
                        case msg := <-c.deactivateTowers:
2✔
824
                                msg.errChan <- c.handleDeactivateTower(msg)
2✔
825

826
                        // A request has come through to terminate a session.
827
                        case msg := <-c.terminateSessions:
1✔
828
                                msg.errChan <- c.handleTerminateSession(msg)
1✔
829

830
                        case <-c.quit:
31✔
831
                                return
31✔
832
                        }
833
                }
834
        }
835
}
836

837
// processTask attempts to schedule the given backupTask on the active
838
// sessionQueue. The task will either be accepted or rejected, after which the
839
// appropriate modifications to the client's state machine will be made. After
840
// every invocation of processTask, the caller should ensure that the
841
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
842
// that are rejected because the active sessionQueue is full will be cached as
843
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
844
func (c *client) processTask(task *wtdb.BackupID) {
480✔
845
        script, ok := c.cfg.getSweepScript(task.ChanID)
480✔
846
        if !ok {
480✔
847
                log.Infof("not processing task for unregistered channel: %s",
×
848
                        task.ChanID)
×
849

×
850
                return
×
851
        }
×
852

853
        backupTask := newBackupTask(*task, script)
480✔
854

480✔
855
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
480✔
856
        if accepted {
954✔
857
                c.taskAccepted(task, status)
474✔
858
        } else {
480✔
859
                c.taskRejected(task, status)
6✔
860
        }
6✔
861
}
862

863
// taskAccepted processes the acceptance of a task by a sessionQueue depending
864
// on the state the sessionQueue is in *after* the task is added. The client's
865
// prevTask is always removed as a result of this call. The client's
866
// sessionQueue will be removed if accepting the task left the sessionQueue in
867
// an exhausted state.
868
func (c *client) taskAccepted(task *wtdb.BackupID,
869
        newStatus sessionQueueStatus) {
474✔
870

474✔
871
        c.log.Infof("Queued %v successfully for session %v", task,
474✔
872
                c.sessionQueue.ID())
474✔
873

474✔
874
        c.stats.taskAccepted()
474✔
875

474✔
876
        // If this task was accepted, we discard anything held in the prevTask.
474✔
877
        // Either it was nil before, or is the task which was just accepted.
474✔
878
        c.prevTask = nil
474✔
879

474✔
880
        switch newStatus {
474✔
881

882
        // The sessionQueue still has capacity after accepting this task.
883
        case sessionQueueAvailable:
431✔
884

885
        // The sessionQueue is full after accepting this task, so we will need
886
        // to request a new one before proceeding.
887
        case sessionQueueExhausted:
43✔
888
                c.stats.sessionExhausted()
43✔
889

43✔
890
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
43✔
891

43✔
892
                // This task left the session exhausted, set it to nil and
43✔
893
                // proceed to the next loop, so we can consume another
43✔
894
                // pre-negotiated session or request another.
43✔
895
                c.sessionQueue = nil
43✔
896
        }
897
}
898

899
// taskRejected process the rejection of a task by a sessionQueue depending on
900
// the state the was in *before* the task was rejected. The client's prevTask
901
// will cache the task if the sessionQueue was exhausted beforehand, and nil
902
// the sessionQueue to find a new session. If the sessionQueue was not
903
// exhausted and not shutting down, the client marks the task as ineligible, as
904
// this implies we couldn't construct a valid justice transaction given the
905
// session's policy.
906
func (c *client) taskRejected(task *wtdb.BackupID,
907
        curStatus sessionQueueStatus) {
6✔
908

6✔
909
        switch curStatus {
6✔
910

911
        // The sessionQueue has available capacity but the task was rejected,
912
        // this indicates that the task was ineligible for backup.
913
        case sessionQueueAvailable:
5✔
914
                c.stats.taskIneligible()
5✔
915

5✔
916
                c.log.Infof("Ignoring ineligible %v", task)
5✔
917

5✔
918
                err := c.cfg.DB.MarkBackupIneligible(
5✔
919
                        task.ChanID, task.CommitHeight,
5✔
920
                )
5✔
921
                if err != nil {
5✔
922
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
923
                                task, err)
×
924

×
925
                        // It is safe to not handle this error, even if we could
×
926
                        // not persist the result. At worst, this task may be
×
927
                        // reprocessed on a subsequent start up, and will either
×
928
                        // succeed do a change in session parameters or fail in
×
929
                        // the same manner.
×
930
                }
×
931

932
                // If this task was rejected *and* the session had available
933
                // capacity, we discard anything held in the prevTask. Either it
934
                // was nil before, or is the task which was just rejected.
935
                c.prevTask = nil
5✔
936

937
        // The sessionQueue rejected the task because it is full, we will stash
938
        // this task and try to add it to the next available sessionQueue.
939
        case sessionQueueExhausted:
1✔
940
                c.stats.sessionExhausted()
1✔
941

1✔
942
                c.log.Debugf("Session %v exhausted, %v queued for next session",
1✔
943
                        c.sessionQueue.ID(), task)
1✔
944

1✔
945
                // Cache the task that we pulled off, so that we can process it
1✔
946
                // once a new session queue is available.
1✔
947
                c.sessionQueue = nil
1✔
948
                c.prevTask = task
1✔
949

950
        // The sessionQueue rejected the task because it is shutting down. We
951
        // will stash this task and try to add it to the next available
952
        // sessionQueue.
953
        case sessionQueueShuttingDown:
×
954
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
955
                        "next session", c.sessionQueue.ID(), task)
×
956

×
957
                // Cache the task that we pulled off, so that we can process it
×
958
                // once a new session queue is available.
×
959
                c.sessionQueue = nil
×
960
                c.prevTask = task
×
961
        }
962
}
963

964
// dial connects the peer at addr using privKey as our secret key for the
965
// connection. The connection will use the configured Net's resolver to resolve
966
// the address for either Tor or clear net connections.
967
func (c *client) dial(localKey keychain.SingleKeyECDH,
968
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
445✔
969

445✔
970
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
445✔
971
}
445✔
972

973
// readMessage receives and parses the next message from the given Peer. An
974
// error is returned if a message is not received before the server's read
975
// timeout, the read off the wire failed, or the message could not be
976
// deserialized.
977
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
1,227✔
978
        // Set a read timeout to ensure we drop the connection if nothing is
1,227✔
979
        // received in a timely manner.
1,227✔
980
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
1,227✔
981
        if err != nil {
1,227✔
982
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
983
                c.log.Errorf("Unable to read msg: %v", err)
×
984
                return nil, err
×
985
        }
×
986

987
        // Pull the next message off the wire,
988
        rawMsg, err := peer.ReadNextMessage()
1,227✔
989
        if err != nil {
1,477✔
990
                err = fmt.Errorf("unable to read message: %w", err)
250✔
991
                c.log.Errorf("Unable to read msg: %v", err)
250✔
992
                return nil, err
250✔
993
        }
250✔
994

995
        // Parse the received message according to the watchtower wire
996
        // specification.
997
        msgReader := bytes.NewReader(rawMsg)
977✔
998
        msg, err := wtwire.ReadMessage(msgReader, 0)
977✔
999
        if err != nil {
977✔
1000
                err = fmt.Errorf("unable to parse message: %w", err)
×
1001
                c.log.Errorf("Unable to read msg: %v", err)
×
1002
                return nil, err
×
1003
        }
×
1004

1005
        c.logMessage(peer, msg, true)
977✔
1006

977✔
1007
        return msg, nil
977✔
1008
}
1009

1010
// sendMessage sends a watchtower wire message to the target peer.
1011
func (c *client) sendMessage(peer wtserver.Peer,
1012
        msg wtwire.Message) error {
1,228✔
1013

1,228✔
1014
        // Encode the next wire message into the buffer.
1,228✔
1015
        // TODO(conner): use buffer pool
1,228✔
1016
        var b bytes.Buffer
1,228✔
1017
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,228✔
1018
        if err != nil {
1,228✔
1019
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1020
                c.log.Errorf("Unable to send msg: %v", err)
×
1021
                return err
×
1022
        }
×
1023

1024
        // Set the write deadline for the connection, ensuring we drop the
1025
        // connection if nothing is sent in a timely manner.
1026
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
1,228✔
1027
        if err != nil {
1,228✔
1028
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1029
                c.log.Errorf("Unable to send msg: %v", err)
×
1030
                return err
×
1031
        }
×
1032

1033
        c.logMessage(peer, msg, false)
1,228✔
1034

1,228✔
1035
        // Write out the full message to the remote peer.
1,228✔
1036
        _, err = peer.Write(b.Bytes())
1,228✔
1037
        if err != nil {
1,229✔
1038
                c.log.Errorf("Unable to send msg: %v", err)
1✔
1039
        }
1✔
1040
        return err
1,228✔
1041
}
1042

1043
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1044
// database and supplying it with the resources needed by the client.
1045
func (c *client) newSessionQueue(s *ClientSession,
1046
        updates []wtdb.CommittedUpdate) *sessionQueue {
82✔
1047

82✔
1048
        return newSessionQueue(&sessionQueueConfig{
82✔
1049
                ClientSession:          s,
82✔
1050
                ChainHash:              c.cfg.ChainHash,
82✔
1051
                Dial:                   c.dial,
82✔
1052
                ReadMessage:            c.readMessage,
82✔
1053
                SendMessage:            c.sendMessage,
82✔
1054
                Signer:                 c.cfg.Signer,
82✔
1055
                DB:                     c.cfg.DB,
82✔
1056
                MinBackoff:             c.cfg.MinBackoff,
82✔
1057
                MaxBackoff:             c.cfg.MaxBackoff,
82✔
1058
                Log:                    c.log,
82✔
1059
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
82✔
1060
                TaskPipeline:           c.pipeline,
82✔
1061
        }, updates)
82✔
1062
}
82✔
1063

1064
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1065
// passed ClientSession. If it exists, the active sessionQueue is returned.
1066
// Otherwise, a new sessionQueue is initialized and added to the set.
1067
func (c *client) getOrInitActiveQueue(s *ClientSession,
1068
        updates []wtdb.CommittedUpdate) *sessionQueue {
84✔
1069

84✔
1070
        if sq, ok := c.activeSessions.Get(s.ID); ok {
88✔
1071
                return sq
4✔
1072
        }
4✔
1073

1074
        return c.initActiveQueue(s, updates)
80✔
1075
}
1076

1077
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1078
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1079
// so that it can deliver any committed updates or begin accepting newly
1080
// assigned tasks.
1081
func (c *client) initActiveQueue(s *ClientSession,
1082
        updates []wtdb.CommittedUpdate) *sessionQueue {
82✔
1083

82✔
1084
        // Initialize the session queue, providing it with all the resources it
82✔
1085
        // requires from the client instance.
82✔
1086
        sq := c.newSessionQueue(s, updates)
82✔
1087

82✔
1088
        // Add the session queue as an active session so that we remember to
82✔
1089
        // stop it on shutdown. This method will also start the queue so that it
82✔
1090
        // can be active in processing newly assigned tasks or to upload
82✔
1091
        // previously committed updates.
82✔
1092
        c.activeSessions.AddAndStart(sq)
82✔
1093

82✔
1094
        return sq
82✔
1095
}
82✔
1096

1097
// terminateSession sets the given session's status to CSessionTerminal meaning
1098
// that it will not be used again.
1099
func (c *client) terminateSession(id wtdb.SessionID) error {
1✔
1100
        errChan := make(chan error, 1)
1✔
1101

1✔
1102
        select {
1✔
1103
        case c.terminateSessions <- &terminateSessMsg{
1104
                id:      id,
1105
                errChan: errChan,
1106
        }:
1✔
1107
        case <-c.pipeline.quit:
×
1108
                return ErrClientExiting
×
1109
        }
1110

1111
        select {
1✔
1112
        case err := <-errChan:
1✔
1113
                return err
1✔
1114
        case <-c.pipeline.quit:
×
1115
                return ErrClientExiting
×
1116
        }
1117
}
1118

1119
// handleTerminateSession handles a request to terminate a session. It will
1120
// first shut down the session if it is part of the active session set, then
1121
// it will ensure that the active session queue is set reset if it is using the
1122
// session in question. Finally, the session's status in the DB will be updated.
1123
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
1✔
1124
        id := msg.id
1✔
1125

1✔
1126
        delete(c.candidateSessions, id)
1✔
1127

1✔
1128
        err := c.activeSessions.StopAndRemove(id, true)
1✔
1129
        if err != nil {
1✔
1130
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1131
        }
×
1132

1133
        // If our active session queue corresponds to the session being
1134
        // terminated, then we'll proceed to negotiate a new one.
1135
        if c.sessionQueue != nil {
2✔
1136
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
2✔
1137
                        c.sessionQueue = nil
1✔
1138
                }
1✔
1139
        }
1140

1141
        return nil
1✔
1142
}
1143

1144
// deactivateTower sends a tower deactivation request to the backupDispatcher
1145
// where it will be handled synchronously. The request should result in all the
1146
// sessions that we have with the given tower being shutdown and removed from
1147
// our in-memory set of active sessions.
1148
func (c *client) deactivateTower(id wtdb.TowerID,
1149
        pubKey *btcec.PublicKey) error {
2✔
1150

2✔
1151
        errChan := make(chan error, 1)
2✔
1152

2✔
1153
        select {
2✔
1154
        case c.deactivateTowers <- &deactivateTowerMsg{
1155
                id:      id,
1156
                pubKey:  pubKey,
1157
                errChan: errChan,
1158
        }:
2✔
1159
        case <-c.pipeline.quit:
×
1160
                return ErrClientExiting
×
1161
        }
1162

1163
        select {
2✔
1164
        case err := <-errChan:
2✔
1165
                return err
2✔
1166
        case <-c.pipeline.quit:
×
1167
                return ErrClientExiting
×
1168
        }
1169
}
1170

1171
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1172
// it from the in-memory candidate set, and we will also stop any active
1173
// sessions we have with this tower.
1174
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
2✔
1175
        // Remove the tower from our in-memory candidate set so that it is not
2✔
1176
        // used for any new session negotiations.
2✔
1177
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
2✔
1178
        if err != nil {
2✔
1179
                return err
×
1180
        }
×
1181

1182
        pubKey := msg.pubKey.SerializeCompressed()
2✔
1183
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
2✔
1184
        if err != nil {
2✔
1185
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1186
                        "%v", pubKey, err)
×
1187
        }
×
1188

1189
        // Iterate over all the sessions we have for this tower and remove them
1190
        // from our candidate set and also from our set of started, active
1191
        // sessions.
1192
        for sessionID := range sessions {
5✔
1193
                delete(c.candidateSessions, sessionID)
3✔
1194

3✔
1195
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1196
                if err != nil {
3✔
1197
                        return fmt.Errorf("could not stop session %s: %w",
×
1198
                                sessionID, err)
×
1199
                }
×
1200
        }
1201

1202
        // If our active session queue corresponds to the stale tower, we'll
1203
        // proceed to negotiate a new one.
1204
        if c.sessionQueue != nil {
4✔
1205
                towerKey := c.sessionQueue.tower.IdentityKey
2✔
1206

2✔
1207
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
4✔
1208
                        c.sessionQueue = nil
2✔
1209
                }
2✔
1210
        }
1211

1212
        return nil
2✔
1213
}
1214

1215
// addTower adds a new watchtower reachable at the given address and considers
1216
// it for new sessions. If the watchtower already exists, then any new addresses
1217
// included will be considered when dialing it for session negotiations and
1218
// backups.
1219
func (c *client) addTower(tower *Tower) error {
47✔
1220
        errChan := make(chan error, 1)
47✔
1221

47✔
1222
        select {
47✔
1223
        case c.newTowers <- &newTowerMsg{
1224
                tower:   tower,
1225
                errChan: errChan,
1226
        }:
47✔
1227
        case <-c.pipeline.quit:
×
1228
                return ErrClientExiting
×
1229
        }
1230

1231
        select {
47✔
1232
        case err := <-errChan:
47✔
1233
                return err
47✔
1234
        case <-c.pipeline.quit:
×
1235
                return ErrClientExiting
×
1236
        }
1237
}
1238

1239
// handleNewTower handles a request for a new tower to be added. If the tower
1240
// already exists, then its corresponding sessions, if any, will be set
1241
// considered as candidates.
1242
func (c *client) handleNewTower(tower *Tower) error {
47✔
1243
        c.candidateTowers.AddCandidate(tower)
47✔
1244

47✔
1245
        // Include all of its corresponding sessions to our set of candidates.
47✔
1246
        sessions, err := getClientSessions(
47✔
1247
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
47✔
1248
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
47✔
1249
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
47✔
1250
        )
47✔
1251
        if err != nil {
47✔
1252
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1253
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1254
        }
×
1255
        for id, session := range sessions {
58✔
1256
                c.candidateSessions[id] = session
11✔
1257
        }
11✔
1258

1259
        return nil
47✔
1260
}
1261

1262
// removeTower removes a watchtower from being considered for future session
1263
// negotiations and from being used for any subsequent backups until it's added
1264
// again. If an address is provided, then this call only serves as a way of
1265
// removing the address from the watchtower instead.
1266
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1267
        addr net.Addr) error {
11✔
1268

11✔
1269
        errChan := make(chan error, 1)
11✔
1270

11✔
1271
        select {
11✔
1272
        case c.staleTowers <- &staleTowerMsg{
1273
                id:      id,
1274
                pubKey:  pubKey,
1275
                addr:    addr,
1276
                errChan: errChan,
1277
        }:
11✔
1278
        case <-c.pipeline.quit:
×
1279
                return ErrClientExiting
×
1280
        }
1281

1282
        select {
11✔
1283
        case err := <-errChan:
11✔
1284
                return err
11✔
1285
        case <-c.pipeline.quit:
×
1286
                return ErrClientExiting
×
1287
        }
1288
}
1289

1290
// handleStaleTower handles a request for an existing tower to be removed. If
1291
// none of the tower's sessions have pending updates, then they will become
1292
// inactive and removed as candidates. If the active session queue corresponds
1293
// to any of these sessions, a new one will be negotiated.
1294
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
11✔
1295
        // We'll first update our in-memory state.
11✔
1296
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
11✔
1297
        if err != nil {
12✔
1298
                return err
1✔
1299
        }
1✔
1300

1301
        // If an address was provided, then we're only meant to remove the
1302
        // address from the tower.
1303
        if msg.addr != nil {
12✔
1304
                return nil
2✔
1305
        }
2✔
1306

1307
        // Otherwise, the tower should no longer be used for future session
1308
        // negotiations and backups.
1309

1310
        pubKey := msg.pubKey.SerializeCompressed()
8✔
1311
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
8✔
1312
        if err != nil {
8✔
1313
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1314
                        "%v", pubKey, err)
×
1315
        }
×
1316
        for sessionID := range sessions {
17✔
1317
                delete(c.candidateSessions, sessionID)
9✔
1318

9✔
1319
                // Shutdown the session so that any pending updates are
9✔
1320
                // replayed back onto the main task pipeline.
9✔
1321
                err = c.activeSessions.StopAndRemove(sessionID, true)
9✔
1322
                if err != nil {
9✔
1323
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1324
                                err)
×
1325
                }
×
1326
        }
1327

1328
        // If our active session queue corresponds to the stale tower, we'll
1329
        // proceed to negotiate a new one.
1330
        if c.sessionQueue != nil {
14✔
1331
                towerKey := c.sessionQueue.tower.IdentityKey
6✔
1332

6✔
1333
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
12✔
1334
                        c.sessionQueue = nil
6✔
1335
                }
6✔
1336
        }
1337

1338
        return nil
8✔
1339
}
1340

1341
// registeredTowers retrieves the list of watchtowers registered with the
1342
// client.
1343
func (c *client) registeredTowers(towers []*wtdb.Tower,
1344
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1345

×
1346
        // Generate a filter that will fetch all the client's sessions
×
1347
        // regardless of if they are active or not.
×
1348
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1349

×
1350
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1351
        if err != nil {
×
1352
                return nil, err
×
1353
        }
×
1354

1355
        // Construct a lookup map that coalesces all the sessions for a specific
1356
        // watchtower.
1357
        towerSessions := make(
×
1358
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1359
        )
×
1360
        for id, s := range clientSessions {
×
1361
                sessions, ok := towerSessions[s.TowerID]
×
1362
                if !ok {
×
1363
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1364
                        towerSessions[s.TowerID] = sessions
×
1365
                }
×
1366
                sessions[id] = s
×
1367
        }
1368

1369
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1370
        for _, tower := range towers {
×
1371
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1372
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1373
                        Tower:                  tower,
×
1374
                        Sessions:               towerSessions[tower.ID],
×
1375
                        ActiveSessionCandidate: isActive,
×
1376
                })
×
1377
        }
×
1378

1379
        return registeredTowers, nil
×
1380
}
1381

1382
// lookupTower retrieves the info of sessions held with the given tower handled
1383
// by this client.
1384
func (c *client) lookupTower(tower *wtdb.Tower,
1385
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
2✔
1386

2✔
1387
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
2✔
1388

2✔
1389
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
2✔
1390
        if err != nil {
2✔
1391
                return nil, err
×
1392
        }
×
1393

1394
        return &RegisteredTower{
2✔
1395
                Tower:                  tower,
2✔
1396
                Sessions:               towerSessions,
2✔
1397
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
2✔
1398
        }, nil
2✔
1399
}
1400

1401
// getStats returns the in-memory statistics of the client since startup.
UNCOV
1402
func (c *client) getStats() ClientStats {
×
UNCOV
1403
        return c.stats.getStatsCopy()
×
UNCOV
1404
}
×
1405

1406
// policy returns the active client policy configuration.
1407
func (c *client) policy() wtpolicy.Policy {
6✔
1408
        return c.cfg.Policy
6✔
1409
}
6✔
1410

1411
// logMessage writes information about a message received from a remote peer,
1412
// using directional prepositions to signal whether the message was sent or
1413
// received.
1414
func (c *client) logMessage(
1415
        peer wtserver.Peer, msg wtwire.Message, read bool) {
2,205✔
1416

2,205✔
1417
        var action = "Received"
2,205✔
1418
        var preposition = "from"
2,205✔
1419
        if !read {
3,433✔
1420
                action = "Sending"
1,228✔
1421
                preposition = "to"
1,228✔
1422
        }
1,228✔
1423

1424
        summary := wtwire.MessageSummary(msg)
2,205✔
1425
        if len(summary) > 0 {
3,535✔
1426
                summary = "(" + summary + ")"
1,330✔
1427
        }
1,330✔
1428

1429
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,205✔
1430
                preposition, peer.RemotePub().SerializeCompressed(),
2,205✔
1431
                peer.RemoteAddr())
2,205✔
1432
}
1433

1434
func newRandomDelay(max uint32) (uint32, error) {
4✔
1435
        var maxDelay big.Int
4✔
1436
        maxDelay.SetUint64(uint64(max))
4✔
1437

4✔
1438
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
4✔
1439
        if err != nil {
4✔
1440
                return 0, err
×
1441
        }
×
1442

1443
        return uint32(randDelay.Uint64()), nil
4✔
1444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc