• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15951470896

29 Jun 2025 04:23AM UTC coverage: 67.594% (-0.01%) from 67.606%
15951470896

Pull #9751

github

web-flow
Merge 599d9b051 into 6290edf14
Pull Request #9751: multi: update Go to 1.23.10 and update some packages

135088 of 199851 relevant lines covered (67.59%)

21909.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.54
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "maps"
9
        "math/big"
10
        "net"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btclog/v2"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/keychain"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
21
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
22
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
23
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
24
)
25

26
const (
27
        // DefaultReadTimeout specifies the default duration we will wait during
28
        // a read before breaking out of a blocking read.
29
        DefaultReadTimeout = 15 * time.Second
30

31
        // DefaultWriteTimeout specifies the default duration we will wait
32
        // during a write before breaking out of a blocking write.
33
        DefaultWriteTimeout = 15 * time.Second
34

35
        // DefaultStatInterval specifies the default interval between logging
36
        // metrics about the client's operation.
37
        DefaultStatInterval = time.Minute
38

39
        // DefaultSessionCloseRange is the range over which we will generate a
40
        // random number of blocks to delay closing a session after its last
41
        // channel has been closed.
42
        DefaultSessionCloseRange = 288
43

44
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
45
        // in the in-memory queue.
46
        DefaultMaxTasksInMemQueue = 2000
47
)
48

49
// genSessionFilter constructs a filter that can be used to select sessions only
50
// if they match the policy of the client (namely anchor vs legacy). If
51
// activeOnly is set, then only active sessions will be returned.
52
func (c *client) genSessionFilter(
53
        activeOnly bool) wtdb.ClientSessionFilterFn {
88✔
54

88✔
55
        return func(session *wtdb.ClientSession) bool {
119✔
56
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
34✔
57
                        return false
3✔
58
                }
3✔
59

60
                if !activeOnly {
38✔
61
                        return true
7✔
62
                }
7✔
63

64
                return session.Status == wtdb.CSessionActive
27✔
65
        }
66
}
67

68
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
69
// function that will filter out any sessions that have been exhausted. A
70
// session is considered exhausted only if it has no un-acked updates and the
71
// sequence number of the session is equal to the max updates of the session
72
// policy.
73
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
86✔
74
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
113✔
75
                return session.SeqNum < session.Policy.MaxUpdates ||
27✔
76
                        numUnAcked > 0
27✔
77
        }
27✔
78
}
79

80
// RegisteredTower encompasses information about a registered watchtower with
81
// the client.
82
type RegisteredTower struct {
83
        *wtdb.Tower
84

85
        // Sessions is the set of sessions corresponding to the watchtower.
86
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
87

88
        // ActiveSessionCandidate determines whether the watchtower is currently
89
        // being considered for new sessions.
90
        ActiveSessionCandidate bool
91
}
92

93
// BreachRetributionBuilder is a function that can be used to construct a
94
// BreachRetribution from a channel ID and a commitment height.
95
type BreachRetributionBuilder func(id lnwire.ChannelID,
96
        commitHeight uint64) (*lnwallet.BreachRetribution,
97
        channeldb.ChannelType, error)
98

99
// newTowerMsg is an internal message we'll use within the client to signal
100
// that a new tower can be considered.
101
type newTowerMsg struct {
102
        // tower holds the info about the new Tower or new tower address
103
        // required to connect to it.
104
        tower *Tower
105

106
        // errChan is the channel through which we'll send a response back to
107
        // the caller when handling their request.
108
        //
109
        // NOTE: This channel must be buffered.
110
        errChan chan error
111
}
112

113
// staleTowerMsg is an internal message we'll use within the client to
114
// signal that a tower should no longer be considered.
115
type staleTowerMsg struct {
116
        // id is the unique database identifier for the tower.
117
        id wtdb.TowerID
118

119
        // pubKey is the identifying public key of the watchtower.
120
        pubKey *btcec.PublicKey
121

122
        // addr is an optional field that when set signals that the address
123
        // should be removed from the watchtower's set of addresses, indicating
124
        // that it is stale. If it's not set, then the watchtower should be
125
        // no longer be considered for new sessions.
126
        addr net.Addr
127

128
        // errChan is the channel through which we'll send a response back to
129
        // the caller when handling their request.
130
        //
131
        // NOTE: This channel must be buffered.
132
        errChan chan error
133
}
134

135
// deactivateTowerMsg is an internal message we'll use within the TowerClient
136
// to signal that a tower should be marked as inactive.
137
type deactivateTowerMsg struct {
138
        // id is the unique database identifier for the tower.
139
        id wtdb.TowerID
140

141
        // pubKey is the identifying public key of the watchtower.
142
        pubKey *btcec.PublicKey
143

144
        // errChan is the channel through which we'll send a response back to
145
        // the caller when handling their request.
146
        //
147
        // NOTE: This channel must be buffered.
148
        errChan chan error
149
}
150

151
// terminateSessMsg is an internal message we'll use within the TowerClient to
152
// signal that a session should be terminated.
153
type terminateSessMsg struct {
154
        // id is the session identifier.
155
        id wtdb.SessionID
156

157
        // errChan is the channel through which we'll send a response back to
158
        // the caller when handling their request.
159
        //
160
        // NOTE: This channel must be buffered.
161
        errChan chan error
162
}
163

164
// clientCfg holds the configuration values required by a client.
165
type clientCfg struct {
166
        *Config
167

168
        // Policy is the session policy the client will propose when creating
169
        // new sessions with the tower. If the policy differs from any active
170
        // sessions recorded in the database, those sessions will be ignored and
171
        // new sessions will be requested immediately.
172
        Policy wtpolicy.Policy
173

174
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
175
}
176

177
// client manages backing up revoked states for all states that fall under a
178
// specific policy type.
179
type client struct {
180
        cfg *clientCfg
181

182
        log btclog.Logger
183

184
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
185

186
        negotiator        SessionNegotiator
187
        candidateTowers   TowerCandidateIterator
188
        candidateSessions map[wtdb.SessionID]*ClientSession
189
        activeSessions    *sessionQueueSet
190

191
        sessionQueue *sessionQueue
192
        prevTask     *wtdb.BackupID
193

194
        statTicker *time.Ticker
195
        stats      *clientStats
196

197
        newTowers         chan *newTowerMsg
198
        staleTowers       chan *staleTowerMsg
199
        deactivateTowers  chan *deactivateTowerMsg
200
        terminateSessions chan *terminateSessMsg
201

202
        wg   sync.WaitGroup
203
        quit chan struct{}
204
}
205

206
// newClient initializes a new client from the provided clientCfg. An error is
207
// returned if the client could not be initialized.
208
func newClient(cfg *clientCfg) (*client, error) {
39✔
209
        identifier, err := cfg.Policy.BlobType.Identifier()
39✔
210
        if err != nil {
39✔
211
                return nil, err
×
212
        }
×
213
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
39✔
214

39✔
215
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
39✔
216
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
39✔
217
                queueDB, cfg.MaxTasksInMemQueue, plog,
39✔
218
        )
39✔
219
        if err != nil {
39✔
220
                return nil, err
×
221
        }
×
222

223
        c := &client{
39✔
224
                cfg:               cfg,
39✔
225
                log:               plog,
39✔
226
                pipeline:          queue,
39✔
227
                activeSessions:    newSessionQueueSet(),
39✔
228
                statTicker:        time.NewTicker(DefaultStatInterval),
39✔
229
                stats:             new(clientStats),
39✔
230
                newTowers:         make(chan *newTowerMsg),
39✔
231
                staleTowers:       make(chan *staleTowerMsg),
39✔
232
                deactivateTowers:  make(chan *deactivateTowerMsg),
39✔
233
                terminateSessions: make(chan *terminateSessMsg),
39✔
234
                quit:              make(chan struct{}),
39✔
235
        }
39✔
236

39✔
237
        candidateTowers := newTowerListIterator()
39✔
238
        perActiveTower := func(tower *Tower) {
51✔
239
                // If the tower has already been marked as active, then there is
12✔
240
                // no need to add it to the iterator again.
12✔
241
                if candidateTowers.IsActive(tower.ID) {
12✔
242
                        return
×
243
                }
×
244

245
                c.log.Infof("Using private watchtower %x, offering policy %s",
12✔
246
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
12✔
247

12✔
248
                // Add the tower to the set of candidate towers.
12✔
249
                candidateTowers.AddCandidate(tower)
12✔
250
        }
251

252
        // Load all candidate sessions and towers from the database into the
253
        // client. We will use any of these sessions if their policies match the
254
        // current policy of the client, otherwise they will be ignored and new
255
        // sessions will be requested.
256
        candidateSessions, err := getTowerAndSessionCandidates(
39✔
257
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
39✔
258
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
39✔
259
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
39✔
260
        )
39✔
261
        if err != nil {
39✔
262
                return nil, err
×
263
        }
×
264

265
        c.candidateTowers = candidateTowers
39✔
266
        c.candidateSessions = candidateSessions
39✔
267

39✔
268
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
39✔
269
                DB:            cfg.DB,
39✔
270
                SecretKeyRing: cfg.SecretKeyRing,
39✔
271
                Policy:        cfg.Policy,
39✔
272
                ChainHash:     cfg.ChainHash,
39✔
273
                SendMessage:   c.sendMessage,
39✔
274
                ReadMessage:   c.readMessage,
39✔
275
                Dial:          c.dial,
39✔
276
                Candidates:    c.candidateTowers,
39✔
277
                MinBackoff:    cfg.MinBackoff,
39✔
278
                MaxBackoff:    cfg.MaxBackoff,
39✔
279
                Log:           plog,
39✔
280
        })
39✔
281

39✔
282
        return c, nil
39✔
283
}
284

285
// getTowerAndSessionCandidates loads all the towers from the DB and then
286
// fetches the sessions for each of tower. Sessions are only collected if they
287
// pass the sessionFilter check. If a tower has a session that does pass the
288
// sessionFilter check then the perActiveTower call-back will be called on that
289
// tower.
290
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
291
        perActiveTower func(tower *Tower),
292
        opts ...wtdb.ClientSessionListOption) (
293
        map[wtdb.SessionID]*ClientSession, error) {
39✔
294

39✔
295
        // Fetch all active towers from the DB.
39✔
296
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
51✔
297
                return tower.Status == wtdb.TowerStatusActive
12✔
298
        })
12✔
299
        if err != nil {
39✔
300
                return nil, err
×
301
        }
×
302

303
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
39✔
304
        for _, dbTower := range towers {
51✔
305
                tower, err := NewTowerFromDBTower(dbTower)
12✔
306
                if err != nil {
12✔
307
                        return nil, err
×
308
                }
×
309

310
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
12✔
311
                if err != nil {
12✔
312
                        return nil, err
×
313
                }
×
314

315
                for _, s := range sessions {
23✔
316
                        cs, err := NewClientSessionFromDBSession(
11✔
317
                                s, tower, keyRing,
11✔
318
                        )
11✔
319
                        if err != nil {
11✔
320
                                return nil, err
×
321
                        }
×
322

323
                        // Add the session to the set of candidate sessions.
324
                        candidateSessions[s.ID] = cs
11✔
325
                }
326

327
                perActiveTower(tower)
12✔
328
        }
329

330
        return candidateSessions, nil
39✔
331
}
332

333
// getClientSessions retrieves the client sessions for a particular tower if
334
// specified, otherwise all client sessions for all towers are retrieved. An
335
// optional filter can be provided to filter out any undesired client sessions.
336
//
337
// NOTE: This method should only be used when deserialization of a
338
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
339
// ListClientSessions method should be used.
340
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
341
        opts ...wtdb.ClientSessionListOption) (
342
        map[wtdb.SessionID]*ClientSession, error) {
50✔
343

50✔
344
        dbSessions, err := db.ListClientSessions(forTower, opts...)
50✔
345
        if err != nil {
50✔
346
                return nil, err
×
347
        }
×
348

349
        // Reload the tower from disk using the tower ID contained in each
350
        // candidate session. We will also rederive any session keys needed to
351
        // be able to communicate with the towers and authenticate session
352
        // requests. This prevents us from having to store the private keys on
353
        // disk.
354
        sessions := make(map[wtdb.SessionID]*ClientSession)
50✔
355
        for _, s := range dbSessions {
64✔
356
                dbTower, err := db.LoadTowerByID(s.TowerID)
14✔
357
                if err != nil {
14✔
358
                        return nil, err
×
359
                }
×
360

361
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
14✔
362
                        Family: keychain.KeyFamilyTowerSession,
14✔
363
                        Index:  s.KeyIndex,
14✔
364
                })
14✔
365
                if err != nil {
14✔
366
                        return nil, err
×
367
                }
×
368

369
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
14✔
370

14✔
371
                tower, err := NewTowerFromDBTower(dbTower)
14✔
372
                if err != nil {
14✔
373
                        return nil, err
×
374
                }
×
375

376
                sessions[s.ID] = &ClientSession{
14✔
377
                        ID:                s.ID,
14✔
378
                        ClientSessionBody: s.ClientSessionBody,
14✔
379
                        Tower:             tower,
14✔
380
                        SessionKeyECDH:    sessionKeyECDH,
14✔
381
                }
14✔
382
        }
383

384
        return sessions, nil
50✔
385
}
386

387
// start initializes the watchtower client by loading or negotiating an active
388
// session and then begins processing backup tasks from the request pipeline.
389
func (c *client) start() error {
39✔
390
        c.log.Infof("Watchtower client starting")
39✔
391

39✔
392
        // First, restart a session queue for any sessions that have
39✔
393
        // committed but unacked state updates. This ensures that these
39✔
394
        // sessions will be able to flush the committed updates after a
39✔
395
        // restart.
39✔
396
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
39✔
397
        for _, session := range c.candidateSessions {
50✔
398
                committedUpdates, err := fetchCommittedUpdates(
11✔
399
                        &session.ID,
11✔
400
                )
11✔
401
                if err != nil {
11✔
402
                        return err
×
403
                }
×
404

405
                if len(committedUpdates) > 0 {
13✔
406
                        c.log.Infof("Starting session=%s to process "+
2✔
407
                                "%d committed backups", session.ID,
2✔
408
                                len(committedUpdates))
2✔
409

2✔
410
                        c.initActiveQueue(session, committedUpdates)
2✔
411
                }
2✔
412
        }
413

414
        // Now start the session negotiator, which will allow us to request new
415
        // session as soon as the backupDispatcher starts up.
416
        err := c.negotiator.Start()
39✔
417
        if err != nil {
39✔
418
                return err
×
419
        }
×
420

421
        // Start the task pipeline to which new backup tasks will be
422
        // submitted from active links.
423
        err = c.pipeline.Start()
39✔
424
        if err != nil {
39✔
425
                return err
×
426
        }
×
427

428
        c.wg.Add(1)
39✔
429
        go c.backupDispatcher()
39✔
430

39✔
431
        c.log.Infof("Watchtower client started successfully")
39✔
432

39✔
433
        return nil
39✔
434
}
435

436
// stop idempotently initiates a graceful shutdown of the watchtower client.
437
func (c *client) stop() error {
39✔
438
        var returnErr error
39✔
439
        c.log.Debugf("Stopping watchtower client")
39✔
440

39✔
441
        // 1. Stop the session negotiator.
39✔
442
        err := c.negotiator.Stop()
39✔
443
        if err != nil {
39✔
444
                returnErr = err
×
445
        }
×
446

447
        // 2. Stop the backup dispatcher and any other goroutines.
448
        close(c.quit)
39✔
449
        c.wg.Wait()
39✔
450

39✔
451
        // 3. If there was a left over 'prevTask' from the backup
39✔
452
        // dispatcher, replay that onto the pipeline.
39✔
453
        if c.prevTask != nil {
39✔
454
                err = c.pipeline.QueueBackupID(c.prevTask)
×
455
                if err != nil {
×
456
                        returnErr = err
×
457
                }
×
458
        }
459

460
        // 4. Shutdown all active session queues in parallel. These will
461
        // exit once all unhandled updates have been replayed to the
462
        // task pipeline.
463
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
108✔
464
                return func() {
138✔
465
                        err := s.Stop(false)
69✔
466
                        if err != nil {
69✔
467
                                c.log.Errorf("could not stop session "+
×
468
                                        "queue: %s: %v", s.ID(), err)
×
469

×
470
                                returnErr = err
×
471
                        }
×
472
                }
473
        })
474

475
        // 5. Shutdown the backup queue, which will prevent any further
476
        // updates from being accepted.
477
        if err = c.pipeline.Stop(); err != nil {
39✔
478
                returnErr = err
×
479
        }
×
480

481
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
39✔
482

39✔
483
        return returnErr
39✔
484
}
485

486
// backupState initiates a request to back up a particular revoked state. If the
487
// method returns nil, the backup is guaranteed to be successful unless the:
488
//   - justice transaction would create dust outputs when trying to abide by the
489
//     negotiated policy, or
490
//   - breached outputs contain too little value to sweep at the target sweep
491
//     fee rate.
492
func (c *client) backupState(chanID *lnwire.ChannelID,
493
        stateNum uint64) error {
472✔
494

472✔
495
        id := &wtdb.BackupID{
472✔
496
                ChanID:       *chanID,
472✔
497
                CommitHeight: stateNum,
472✔
498
        }
472✔
499

472✔
500
        return c.pipeline.QueueBackupID(id)
472✔
501
}
472✔
502

503
// nextSessionQueue attempts to fetch an active session from our set of
504
// candidate sessions. Candidate sessions with a differing policy from the
505
// active client's advertised policy will be ignored, but may be resumed if the
506
// client is restarted with a matching policy. If no candidates were found, nil
507
// is returned to signal that we need to request a new policy.
508
func (c *client) nextSessionQueue() (*sessionQueue, error) {
86✔
509
        // Select any candidate session at random, and remove it from the set of
86✔
510
        // candidate sessions.
86✔
511
        var candidateSession *ClientSession
86✔
512
        for id, sessionInfo := range c.candidateSessions {
172✔
513
                delete(c.candidateSessions, id)
86✔
514

86✔
515
                // Skip any sessions with policies that don't match the current
86✔
516
                // TxPolicy, as they would result in different justice
86✔
517
                // transactions from what is requested. These can be used again
86✔
518
                // if the client changes their configuration and restarting.
86✔
519
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
86✔
520
                        continue
×
521
                }
522

523
                candidateSession = sessionInfo
86✔
524
                break
86✔
525
        }
526

527
        // If none of the sessions could be used or none were found, we'll
528
        // return nil to signal that we need another session to be negotiated.
529
        if candidateSession == nil {
86✔
530
                return nil, nil
×
531
        }
×
532

533
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
86✔
534
                &candidateSession.ID,
86✔
535
        )
86✔
536
        if err != nil {
86✔
537
                return nil, err
×
538
        }
×
539

540
        // Initialize the session queue and spin it up, so it can begin handling
541
        // updates. If the queue was already made active on startup, this will
542
        // simply return the existing session queue from the set.
543
        return c.getOrInitActiveQueue(candidateSession, updates), nil
86✔
544
}
545

546
// stopAndRemoveSession stops the session with the given ID and removes it from
547
// the in-memory active sessions set.
548
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
7✔
549
        return c.activeSessions.StopAndRemove(id, final)
7✔
550
}
7✔
551

552
// deleteSessionFromTower dials the tower that we created the session with and
553
// attempts to send the tower the DeleteSession message.
554
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
7✔
555
        // First, we check if we have already loaded this tower in our
7✔
556
        // candidate towers iterator.
7✔
557
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
7✔
558
        if errors.Is(err, ErrTowerNotInIterator) {
7✔
559
                // If not, then we attempt to load it from the DB.
×
560
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
561
                if err != nil {
×
562
                        return err
×
563
                }
×
564

565
                tower, err = NewTowerFromDBTower(dbTower)
×
566
                if err != nil {
×
567
                        return err
×
568
                }
×
569
        } else if err != nil {
7✔
570
                return err
×
571
        }
×
572

573
        session, err := NewClientSessionFromDBSession(
7✔
574
                sess, tower, c.cfg.SecretKeyRing,
7✔
575
        )
7✔
576
        if err != nil {
7✔
577
                return err
×
578
        }
×
579

580
        localInit := wtwire.NewInitMessage(
7✔
581
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
7✔
582
                c.cfg.ChainHash,
7✔
583
        )
7✔
584

7✔
585
        var (
7✔
586
                conn wtserver.Peer
7✔
587

7✔
588
                // addrIterator is a copy of the tower's address iterator.
7✔
589
                // We use this copy so that iterating through the addresses does
7✔
590
                // not affect any other threads using this iterator.
7✔
591
                addrIterator = tower.Addresses.Copy()
7✔
592
                towerAddr    = addrIterator.Peek()
7✔
593
        )
7✔
594
        // Attempt to dial the tower with its available addresses.
7✔
595
        for {
14✔
596
                conn, err = c.dial(
7✔
597
                        session.SessionKeyECDH, &lnwire.NetAddress{
7✔
598
                                IdentityKey: tower.IdentityKey,
7✔
599
                                Address:     towerAddr,
7✔
600
                        },
7✔
601
                )
7✔
602
                if err != nil {
7✔
603
                        // If there are more addrs available, immediately try
×
604
                        // those.
×
605
                        nextAddr, iteratorErr := addrIterator.Next()
×
606
                        if iteratorErr == nil {
×
607
                                towerAddr = nextAddr
×
608
                                continue
×
609
                        }
610

611
                        // Otherwise, if we have exhausted the address list,
612
                        // exit.
613
                        addrIterator.Reset()
×
614

×
615
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
616
                                "available addresses",
×
617
                                tower.IdentityKey.SerializeCompressed())
×
618
                }
619

620
                break
7✔
621
        }
622
        defer conn.Close()
7✔
623

7✔
624
        // Send Init to tower.
7✔
625
        err = c.sendMessage(conn, localInit)
7✔
626
        if err != nil {
7✔
627
                return err
×
628
        }
×
629

630
        // Receive Init from tower.
631
        remoteMsg, err := c.readMessage(conn)
7✔
632
        if err != nil {
7✔
633
                return err
×
634
        }
×
635

636
        remoteInit, ok := remoteMsg.(*wtwire.Init)
7✔
637
        if !ok {
7✔
638
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
639
                        towerAddr, remoteMsg)
×
640
        }
×
641

642
        // Validate Init.
643
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
7✔
644
        if err != nil {
7✔
645
                return err
×
646
        }
×
647

648
        // Send DeleteSession to tower.
649
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
7✔
650
        if err != nil {
7✔
651
                return err
×
652
        }
×
653

654
        // Receive DeleteSessionReply from tower.
655
        remoteMsg, err = c.readMessage(conn)
7✔
656
        if err != nil {
7✔
657
                return err
×
658
        }
×
659

660
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
7✔
661
        if !ok {
7✔
662
                return fmt.Errorf("watchtower %s responded with %T to "+
×
663
                        "DeleteSession", towerAddr, remoteMsg)
×
664
        }
×
665

666
        switch deleteSessionReply.Code {
7✔
667
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
7✔
668
                return nil
7✔
669
        default:
×
670
                return fmt.Errorf("received error code %v in "+
×
671
                        "DeleteSessionReply when attempting to delete "+
×
672
                        "session from tower", deleteSessionReply.Code)
×
673
        }
674
}
675

676
// backupDispatcher processes events coming from the taskPipeline and is
677
// responsible for detecting when the client needs to renegotiate a session to
678
// fulfill continuing demand. The event loop exits if the client is quit.
679
//
680
// NOTE: This method MUST be run as a goroutine.
681
func (c *client) backupDispatcher() {
39✔
682
        defer c.wg.Done()
39✔
683

39✔
684
        c.log.Tracef("Starting backup dispatcher")
39✔
685
        defer c.log.Tracef("Stopping backup dispatcher")
39✔
686

39✔
687
        for {
735✔
688
                switch {
696✔
689

690
                // No active session queue and no additional sessions.
691
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
82✔
692
                        c.log.Infof("Requesting new session.")
82✔
693

82✔
694
                        // Immediately request a new session.
82✔
695
                        c.negotiator.RequestSession()
82✔
696

82✔
697
                        // Wait until we receive the newly negotiated session.
82✔
698
                        // All backups sent in the meantime are queued in the
82✔
699
                        // revoke queue, as we cannot process them.
82✔
700
                awaitSession:
82✔
701
                        select {
82✔
702
                        case session := <-c.negotiator.NewSessions():
76✔
703
                                c.log.Infof("Acquired new session with id=%s",
76✔
704
                                        session.ID)
76✔
705
                                c.candidateSessions[session.ID] = session
76✔
706
                                c.stats.sessionAcquired()
76✔
707

76✔
708
                                // We'll continue to choose the newly negotiated
76✔
709
                                // session as our active session queue.
76✔
710
                                continue
76✔
711

712
                        case <-c.statTicker.C:
×
713
                                c.log.Infof("Client stats: %s", c.stats)
×
714

715
                        // A new tower has been requested to be added. We'll
716
                        // update our persisted and in-memory state and consider
717
                        // its corresponding sessions, if any, as new
718
                        // candidates.
719
                        case msg := <-c.newTowers:
40✔
720
                                msg.errChan <- c.handleNewTower(msg.tower)
40✔
721

722
                        // A tower has been requested to be removed. We'll
723
                        // only allow removal of it if the address in question
724
                        // is not currently being used for session negotiation.
725
                        case msg := <-c.staleTowers:
7✔
726
                                msg.errChan <- c.handleStaleTower(msg)
7✔
727

728
                        // A tower has been requested to be de-activated. We'll
729
                        // only allow this if the tower is not currently being
730
                        // used for session negotiation.
731
                        case msg := <-c.deactivateTowers:
×
732
                                msg.errChan <- c.handleDeactivateTower(msg)
×
733

734
                        // A request has come through to terminate a session.
735
                        case msg := <-c.terminateSessions:
×
736
                                msg.errChan <- c.handleTerminateSession(msg)
×
737

738
                        case <-c.quit:
9✔
739
                                return
9✔
740
                        }
741

742
                        // Instead of looping, we'll jump back into the select
743
                        // case and await the delivery of the session to prevent
744
                        // us from re-requesting additional sessions.
745
                        goto awaitSession
44✔
746

747
                // No active session queue but have additional sessions.
748
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
86✔
749
                        // We've exhausted the prior session, we'll pop another
86✔
750
                        // from the remaining sessions and continue processing
86✔
751
                        // backup tasks.
86✔
752
                        var err error
86✔
753
                        c.sessionQueue, err = c.nextSessionQueue()
86✔
754
                        if err != nil {
86✔
755
                                c.log.Errorf("error fetching next session "+
×
756
                                        "queue: %v", err)
×
757
                        }
×
758

759
                        if c.sessionQueue != nil {
172✔
760
                                c.log.Debugf("Loaded next candidate session "+
86✔
761
                                        "queue id=%s", c.sessionQueue.ID())
86✔
762
                        }
86✔
763

764
                // Have active session queue, process backups.
765
                case c.sessionQueue != nil:
534✔
766
                        if c.prevTask != nil {
535✔
767
                                c.processTask(c.prevTask)
1✔
768

1✔
769
                                // Continue to ensure the sessionQueue is
1✔
770
                                // properly initialized before attempting to
1✔
771
                                // process more tasks from the pipeline.
1✔
772
                                continue
1✔
773
                        }
774

775
                        // Normal operation where new tasks are read from the
776
                        // pipeline.
777
                        select {
533✔
778

779
                        // If any sessions are negotiated while we have an
780
                        // active session queue, queue them for future use.
781
                        // This shouldn't happen with the current design, so
782
                        // it doesn't hurt to select here just in case. In the
783
                        // future, we will likely allow more asynchrony so that
784
                        // we can request new sessions before the session is
785
                        // fully empty, which this case would handle.
786
                        case session := <-c.negotiator.NewSessions():
×
787
                                c.log.Warnf("Acquired new session with id=%s "+
×
788
                                        "while processing tasks", session.ID)
×
789
                                c.candidateSessions[session.ID] = session
×
790
                                c.stats.sessionAcquired()
×
791

792
                        case <-c.statTicker.C:
×
793
                                c.log.Infof("Client stats: %s", c.stats)
×
794

795
                        // Process each backup task serially from the queue of
796
                        // revoked states.
797
                        case task, ok := <-c.pipeline.NextBackupID():
483✔
798
                                // All backups in the pipeline have been
483✔
799
                                // processed, it is now safe to exit.
483✔
800
                                if !ok {
483✔
801
                                        return
×
802
                                }
×
803

804
                                c.log.Debugf("Processing %v", task)
483✔
805

483✔
806
                                c.stats.taskReceived()
483✔
807
                                c.processTask(task)
483✔
808

809
                        // A new tower has been requested to be added. We'll
810
                        // update our persisted and in-memory state and consider
811
                        // its corresponding sessions, if any, as new
812
                        // candidates.
813
                        case msg := <-c.newTowers:
13✔
814
                                msg.errChan <- c.handleNewTower(msg.tower)
13✔
815

816
                        // A tower has been removed, so we'll remove certain
817
                        // information that's persisted and also in our
818
                        // in-memory state depending on the request, and set any
819
                        // of its corresponding candidate sessions as inactive.
820
                        case msg := <-c.staleTowers:
10✔
821
                                msg.errChan <- c.handleStaleTower(msg)
10✔
822

823
                        // A tower has been requested to be de-activated.
824
                        case msg := <-c.deactivateTowers:
5✔
825
                                msg.errChan <- c.handleDeactivateTower(msg)
5✔
826

827
                        // A request has come through to terminate a session.
828
                        case msg := <-c.terminateSessions:
4✔
829
                                msg.errChan <- c.handleTerminateSession(msg)
4✔
830

831
                        case <-c.quit:
33✔
832
                                return
33✔
833
                        }
834
                }
835
        }
836
}
837

838
// processTask attempts to schedule the given backupTask on the active
839
// sessionQueue. The task will either be accepted or rejected, after which the
840
// appropriate modifications to the client's state machine will be made. After
841
// every invocation of processTask, the caller should ensure that the
842
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
843
// that are rejected because the active sessionQueue is full will be cached as
844
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
845
func (c *client) processTask(task *wtdb.BackupID) {
484✔
846
        script, ok := c.cfg.getSweepScript(task.ChanID)
484✔
847
        if !ok {
484✔
848
                log.Infof("not processing task for unregistered channel: %s",
×
849
                        task.ChanID)
×
850

×
851
                return
×
852
        }
×
853

854
        backupTask := newBackupTask(*task, script)
484✔
855

484✔
856
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
484✔
857
        if accepted {
962✔
858
                c.taskAccepted(task, status)
478✔
859
        } else {
484✔
860
                c.taskRejected(task, status)
6✔
861
        }
6✔
862
}
863

864
// taskAccepted processes the acceptance of a task by a sessionQueue depending
865
// on the state the sessionQueue is in *after* the task is added. The client's
866
// prevTask is always removed as a result of this call. The client's
867
// sessionQueue will be removed if accepting the task left the sessionQueue in
868
// an exhausted state.
869
func (c *client) taskAccepted(task *wtdb.BackupID,
870
        newStatus sessionQueueStatus) {
478✔
871

478✔
872
        c.log.Infof("Queued %v successfully for session %v", task,
478✔
873
                c.sessionQueue.ID())
478✔
874

478✔
875
        c.stats.taskAccepted()
478✔
876

478✔
877
        // If this task was accepted, we discard anything held in the prevTask.
478✔
878
        // Either it was nil before, or is the task which was just accepted.
478✔
879
        c.prevTask = nil
478✔
880

478✔
881
        switch newStatus {
478✔
882

883
        // The sessionQueue still has capacity after accepting this task.
884
        case sessionQueueAvailable:
435✔
885

886
        // The sessionQueue is full after accepting this task, so we will need
887
        // to request a new one before proceeding.
888
        case sessionQueueExhausted:
46✔
889
                c.stats.sessionExhausted()
46✔
890

46✔
891
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
46✔
892

46✔
893
                // This task left the session exhausted, set it to nil and
46✔
894
                // proceed to the next loop, so we can consume another
46✔
895
                // pre-negotiated session or request another.
46✔
896
                c.sessionQueue = nil
46✔
897
        }
898
}
899

900
// taskRejected process the rejection of a task by a sessionQueue depending on
901
// the state the was in *before* the task was rejected. The client's prevTask
902
// will cache the task if the sessionQueue was exhausted beforehand, and nil
903
// the sessionQueue to find a new session. If the sessionQueue was not
904
// exhausted and not shutting down, the client marks the task as ineligible, as
905
// this implies we couldn't construct a valid justice transaction given the
906
// session's policy.
907
func (c *client) taskRejected(task *wtdb.BackupID,
908
        curStatus sessionQueueStatus) {
6✔
909

6✔
910
        switch curStatus {
6✔
911

912
        // The sessionQueue has available capacity but the task was rejected,
913
        // this indicates that the task was ineligible for backup.
914
        case sessionQueueAvailable:
5✔
915
                c.stats.taskIneligible()
5✔
916

5✔
917
                c.log.Infof("Ignoring ineligible %v", task)
5✔
918

5✔
919
                err := c.cfg.DB.MarkBackupIneligible(
5✔
920
                        task.ChanID, task.CommitHeight,
5✔
921
                )
5✔
922
                if err != nil {
5✔
923
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
924
                                task, err)
×
925

×
926
                        // It is safe to not handle this error, even if we could
×
927
                        // not persist the result. At worst, this task may be
×
928
                        // reprocessed on a subsequent start up, and will either
×
929
                        // succeed do a change in session parameters or fail in
×
930
                        // the same manner.
×
931
                }
×
932

933
                // If this task was rejected *and* the session had available
934
                // capacity, we discard anything held in the prevTask. Either it
935
                // was nil before, or is the task which was just rejected.
936
                c.prevTask = nil
5✔
937

938
        // The sessionQueue rejected the task because it is full, we will stash
939
        // this task and try to add it to the next available sessionQueue.
940
        case sessionQueueExhausted:
1✔
941
                c.stats.sessionExhausted()
1✔
942

1✔
943
                c.log.Debugf("Session %v exhausted, %v queued for next session",
1✔
944
                        c.sessionQueue.ID(), task)
1✔
945

1✔
946
                // Cache the task that we pulled off, so that we can process it
1✔
947
                // once a new session queue is available.
1✔
948
                c.sessionQueue = nil
1✔
949
                c.prevTask = task
1✔
950

951
        // The sessionQueue rejected the task because it is shutting down. We
952
        // will stash this task and try to add it to the next available
953
        // sessionQueue.
954
        case sessionQueueShuttingDown:
×
955
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
956
                        "next session", c.sessionQueue.ID(), task)
×
957

×
958
                // Cache the task that we pulled off, so that we can process it
×
959
                // once a new session queue is available.
×
960
                c.sessionQueue = nil
×
961
                c.prevTask = task
×
962
        }
963
}
964

965
// dial connects the peer at addr using privKey as our secret key for the
966
// connection. The connection will use the configured Net's resolver to resolve
967
// the address for either Tor or clear net connections.
968
func (c *client) dial(localKey keychain.SingleKeyECDH,
969
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
455✔
970

455✔
971
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
455✔
972
}
455✔
973

974
// readMessage receives and parses the next message from the given Peer. An
975
// error is returned if a message is not received before the server's read
976
// timeout, the read off the wire failed, or the message could not be
977
// deserialized.
978
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
1,236✔
979
        // Set a read timeout to ensure we drop the connection if nothing is
1,236✔
980
        // received in a timely manner.
1,236✔
981
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
1,236✔
982
        if err != nil {
1,236✔
983
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
984
                c.log.Errorf("Unable to read msg: %v", err)
×
985
                return nil, err
×
986
        }
×
987

988
        // Pull the next message off the wire,
989
        rawMsg, err := peer.ReadNextMessage()
1,236✔
990
        if err != nil {
1,486✔
991
                err = fmt.Errorf("unable to read message: %w", err)
250✔
992
                c.log.Errorf("Unable to read msg: %v", err)
250✔
993
                return nil, err
250✔
994
        }
250✔
995

996
        // Parse the received message according to the watchtower wire
997
        // specification.
998
        msgReader := bytes.NewReader(rawMsg)
986✔
999
        msg, err := wtwire.ReadMessage(msgReader, 0)
986✔
1000
        if err != nil {
986✔
1001
                err = fmt.Errorf("unable to parse message: %w", err)
×
1002
                c.log.Errorf("Unable to read msg: %v", err)
×
1003
                return nil, err
×
1004
        }
×
1005

1006
        c.logMessage(peer, msg, true)
986✔
1007

986✔
1008
        return msg, nil
986✔
1009
}
1010

1011
// sendMessage sends a watchtower wire message to the target peer.
1012
func (c *client) sendMessage(peer wtserver.Peer,
1013
        msg wtwire.Message) error {
1,237✔
1014

1,237✔
1015
        // Encode the next wire message into the buffer.
1,237✔
1016
        // TODO(conner): use buffer pool
1,237✔
1017
        var b bytes.Buffer
1,237✔
1018
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,237✔
1019
        if err != nil {
1,237✔
1020
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1021
                c.log.Errorf("Unable to send msg: %v", err)
×
1022
                return err
×
1023
        }
×
1024

1025
        // Set the write deadline for the connection, ensuring we drop the
1026
        // connection if nothing is sent in a timely manner.
1027
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
1,237✔
1028
        if err != nil {
1,237✔
1029
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1030
                c.log.Errorf("Unable to send msg: %v", err)
×
1031
                return err
×
1032
        }
×
1033

1034
        c.logMessage(peer, msg, false)
1,237✔
1035

1,237✔
1036
        // Write out the full message to the remote peer.
1,237✔
1037
        _, err = peer.Write(b.Bytes())
1,237✔
1038
        if err != nil {
1,238✔
1039
                c.log.Errorf("Unable to send msg: %v", err)
1✔
1040
        }
1✔
1041
        return err
1,237✔
1042
}
1043

1044
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1045
// database and supplying it with the resources needed by the client.
1046
func (c *client) newSessionQueue(s *ClientSession,
1047
        updates []wtdb.CommittedUpdate) *sessionQueue {
85✔
1048

85✔
1049
        return newSessionQueue(&sessionQueueConfig{
85✔
1050
                ClientSession:          s,
85✔
1051
                ChainHash:              c.cfg.ChainHash,
85✔
1052
                Dial:                   c.dial,
85✔
1053
                ReadMessage:            c.readMessage,
85✔
1054
                SendMessage:            c.sendMessage,
85✔
1055
                Signer:                 c.cfg.Signer,
85✔
1056
                DB:                     c.cfg.DB,
85✔
1057
                MinBackoff:             c.cfg.MinBackoff,
85✔
1058
                MaxBackoff:             c.cfg.MaxBackoff,
85✔
1059
                Log:                    c.log,
85✔
1060
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
85✔
1061
                TaskPipeline:           c.pipeline,
85✔
1062
        }, updates)
85✔
1063
}
85✔
1064

1065
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1066
// passed ClientSession. If it exists, the active sessionQueue is returned.
1067
// Otherwise, a new sessionQueue is initialized and added to the set.
1068
func (c *client) getOrInitActiveQueue(s *ClientSession,
1069
        updates []wtdb.CommittedUpdate) *sessionQueue {
86✔
1070

86✔
1071
        if sq, ok := c.activeSessions.Get(s.ID); ok {
89✔
1072
                return sq
3✔
1073
        }
3✔
1074

1075
        return c.initActiveQueue(s, updates)
83✔
1076
}
1077

1078
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1079
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1080
// so that it can deliver any committed updates or begin accepting newly
1081
// assigned tasks.
1082
func (c *client) initActiveQueue(s *ClientSession,
1083
        updates []wtdb.CommittedUpdate) *sessionQueue {
85✔
1084

85✔
1085
        // Initialize the session queue, providing it with all the resources it
85✔
1086
        // requires from the client instance.
85✔
1087
        sq := c.newSessionQueue(s, updates)
85✔
1088

85✔
1089
        // Add the session queue as an active session so that we remember to
85✔
1090
        // stop it on shutdown. This method will also start the queue so that it
85✔
1091
        // can be active in processing newly assigned tasks or to upload
85✔
1092
        // previously committed updates.
85✔
1093
        c.activeSessions.AddAndStart(sq)
85✔
1094

85✔
1095
        return sq
85✔
1096
}
85✔
1097

1098
// terminateSession sets the given session's status to CSessionTerminal meaning
1099
// that it will not be used again.
1100
func (c *client) terminateSession(id wtdb.SessionID) error {
4✔
1101
        errChan := make(chan error, 1)
4✔
1102

4✔
1103
        select {
4✔
1104
        case c.terminateSessions <- &terminateSessMsg{
1105
                id:      id,
1106
                errChan: errChan,
1107
        }:
4✔
1108
        case <-c.pipeline.quit:
×
1109
                return ErrClientExiting
×
1110
        }
1111

1112
        select {
4✔
1113
        case err := <-errChan:
4✔
1114
                return err
4✔
1115
        case <-c.pipeline.quit:
×
1116
                return ErrClientExiting
×
1117
        }
1118
}
1119

1120
// handleTerminateSession handles a request to terminate a session. It will
1121
// first shut down the session if it is part of the active session set, then
1122
// it will ensure that the active session queue is set reset if it is using the
1123
// session in question. Finally, the session's status in the DB will be updated.
1124
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
4✔
1125
        id := msg.id
4✔
1126

4✔
1127
        delete(c.candidateSessions, id)
4✔
1128

4✔
1129
        err := c.activeSessions.StopAndRemove(id, true)
4✔
1130
        if err != nil {
4✔
1131
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1132
        }
×
1133

1134
        // If our active session queue corresponds to the session being
1135
        // terminated, then we'll proceed to negotiate a new one.
1136
        if c.sessionQueue != nil {
8✔
1137
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
8✔
1138
                        c.sessionQueue = nil
4✔
1139
                }
4✔
1140
        }
1141

1142
        return nil
4✔
1143
}
1144

1145
// deactivateTower sends a tower deactivation request to the backupDispatcher
1146
// where it will be handled synchronously. The request should result in all the
1147
// sessions that we have with the given tower being shutdown and removed from
1148
// our in-memory set of active sessions.
1149
func (c *client) deactivateTower(id wtdb.TowerID,
1150
        pubKey *btcec.PublicKey) error {
5✔
1151

5✔
1152
        errChan := make(chan error, 1)
5✔
1153

5✔
1154
        select {
5✔
1155
        case c.deactivateTowers <- &deactivateTowerMsg{
1156
                id:      id,
1157
                pubKey:  pubKey,
1158
                errChan: errChan,
1159
        }:
5✔
1160
        case <-c.pipeline.quit:
×
1161
                return ErrClientExiting
×
1162
        }
1163

1164
        select {
5✔
1165
        case err := <-errChan:
5✔
1166
                return err
5✔
1167
        case <-c.pipeline.quit:
×
1168
                return ErrClientExiting
×
1169
        }
1170
}
1171

1172
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1173
// it from the in-memory candidate set, and we will also stop any active
1174
// sessions we have with this tower.
1175
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
5✔
1176
        // Remove the tower from our in-memory candidate set so that it is not
5✔
1177
        // used for any new session negotiations.
5✔
1178
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
5✔
1179
        if err != nil {
5✔
1180
                return err
×
1181
        }
×
1182

1183
        pubKey := msg.pubKey.SerializeCompressed()
5✔
1184
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
5✔
1185
        if err != nil {
5✔
1186
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1187
                        "%v", pubKey, err)
×
1188
        }
×
1189

1190
        // Iterate over all the sessions we have for this tower and remove them
1191
        // from our candidate set and also from our set of started, active
1192
        // sessions.
1193
        for sessionID := range sessions {
11✔
1194
                delete(c.candidateSessions, sessionID)
6✔
1195

6✔
1196
                err = c.activeSessions.StopAndRemove(sessionID, false)
6✔
1197
                if err != nil {
6✔
1198
                        return fmt.Errorf("could not stop session %s: %w",
×
1199
                                sessionID, err)
×
1200
                }
×
1201
        }
1202

1203
        // If our active session queue corresponds to the stale tower, we'll
1204
        // proceed to negotiate a new one.
1205
        if c.sessionQueue != nil {
10✔
1206
                towerKey := c.sessionQueue.tower.IdentityKey
5✔
1207

5✔
1208
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
10✔
1209
                        c.sessionQueue = nil
5✔
1210
                }
5✔
1211
        }
1212

1213
        return nil
5✔
1214
}
1215

1216
// addTower adds a new watchtower reachable at the given address and considers
1217
// it for new sessions. If the watchtower already exists, then any new addresses
1218
// included will be considered when dialing it for session negotiations and
1219
// backups.
1220
func (c *client) addTower(tower *Tower) error {
50✔
1221
        errChan := make(chan error, 1)
50✔
1222

50✔
1223
        select {
50✔
1224
        case c.newTowers <- &newTowerMsg{
1225
                tower:   tower,
1226
                errChan: errChan,
1227
        }:
50✔
1228
        case <-c.pipeline.quit:
×
1229
                return ErrClientExiting
×
1230
        }
1231

1232
        select {
50✔
1233
        case err := <-errChan:
50✔
1234
                return err
50✔
1235
        case <-c.pipeline.quit:
×
1236
                return ErrClientExiting
×
1237
        }
1238
}
1239

1240
// handleNewTower handles a request for a new tower to be added. If the tower
1241
// already exists, then its corresponding sessions, if any, will be set
1242
// considered as candidates.
1243
func (c *client) handleNewTower(tower *Tower) error {
50✔
1244
        c.candidateTowers.AddCandidate(tower)
50✔
1245

50✔
1246
        // Include all of its corresponding sessions to our set of candidates.
50✔
1247
        sessions, err := getClientSessions(
50✔
1248
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
50✔
1249
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
50✔
1250
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
50✔
1251
        )
50✔
1252
        if err != nil {
50✔
1253
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1254
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1255
        }
×
1256
        maps.Copy(c.candidateSessions, sessions)
50✔
1257

50✔
1258
        return nil
50✔
1259
}
1260

1261
// removeTower removes a watchtower from being considered for future session
1262
// negotiations and from being used for any subsequent backups until it's added
1263
// again. If an address is provided, then this call only serves as a way of
1264
// removing the address from the watchtower instead.
1265
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1266
        addr net.Addr) error {
14✔
1267

14✔
1268
        errChan := make(chan error, 1)
14✔
1269

14✔
1270
        select {
14✔
1271
        case c.staleTowers <- &staleTowerMsg{
1272
                id:      id,
1273
                pubKey:  pubKey,
1274
                addr:    addr,
1275
                errChan: errChan,
1276
        }:
14✔
1277
        case <-c.pipeline.quit:
×
1278
                return ErrClientExiting
×
1279
        }
1280

1281
        select {
14✔
1282
        case err := <-errChan:
14✔
1283
                return err
14✔
1284
        case <-c.pipeline.quit:
×
1285
                return ErrClientExiting
×
1286
        }
1287
}
1288

1289
// handleStaleTower handles a request for an existing tower to be removed. If
1290
// none of the tower's sessions have pending updates, then they will become
1291
// inactive and removed as candidates. If the active session queue corresponds
1292
// to any of these sessions, a new one will be negotiated.
1293
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
14✔
1294
        // We'll first update our in-memory state.
14✔
1295
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
14✔
1296
        if err != nil {
15✔
1297
                return err
1✔
1298
        }
1✔
1299

1300
        // If an address was provided, then we're only meant to remove the
1301
        // address from the tower.
1302
        if msg.addr != nil {
15✔
1303
                return nil
2✔
1304
        }
2✔
1305

1306
        // Otherwise, the tower should no longer be used for future session
1307
        // negotiations and backups.
1308

1309
        pubKey := msg.pubKey.SerializeCompressed()
11✔
1310
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
11✔
1311
        if err != nil {
11✔
1312
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1313
                        "%v", pubKey, err)
×
1314
        }
×
1315
        for sessionID := range sessions {
23✔
1316
                delete(c.candidateSessions, sessionID)
12✔
1317

12✔
1318
                // Shutdown the session so that any pending updates are
12✔
1319
                // replayed back onto the main task pipeline.
12✔
1320
                err = c.activeSessions.StopAndRemove(sessionID, true)
12✔
1321
                if err != nil {
12✔
1322
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1323
                                err)
×
1324
                }
×
1325
        }
1326

1327
        // If our active session queue corresponds to the stale tower, we'll
1328
        // proceed to negotiate a new one.
1329
        if c.sessionQueue != nil {
20✔
1330
                towerKey := c.sessionQueue.tower.IdentityKey
9✔
1331

9✔
1332
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
18✔
1333
                        c.sessionQueue = nil
9✔
1334
                }
9✔
1335
        }
1336

1337
        return nil
11✔
1338
}
1339

1340
// registeredTowers retrieves the list of watchtowers registered with the
1341
// client.
1342
func (c *client) registeredTowers(towers []*wtdb.Tower,
1343
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1344

×
1345
        // Generate a filter that will fetch all the client's sessions
×
1346
        // regardless of if they are active or not.
×
1347
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1348

×
1349
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1350
        if err != nil {
×
1351
                return nil, err
×
1352
        }
×
1353

1354
        // Construct a lookup map that coalesces all the sessions for a specific
1355
        // watchtower.
1356
        towerSessions := make(
×
1357
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1358
        )
×
1359
        for id, s := range clientSessions {
×
1360
                sessions, ok := towerSessions[s.TowerID]
×
1361
                if !ok {
×
1362
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1363
                        towerSessions[s.TowerID] = sessions
×
1364
                }
×
1365
                sessions[id] = s
×
1366
        }
1367

1368
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1369
        for _, tower := range towers {
×
1370
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1371
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1372
                        Tower:                  tower,
×
1373
                        Sessions:               towerSessions[tower.ID],
×
1374
                        ActiveSessionCandidate: isActive,
×
1375
                })
×
1376
        }
×
1377

1378
        return registeredTowers, nil
×
1379
}
1380

1381
// lookupTower retrieves the info of sessions held with the given tower handled
1382
// by this client.
1383
func (c *client) lookupTower(tower *wtdb.Tower,
1384
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
5✔
1385

5✔
1386
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
5✔
1387

5✔
1388
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
5✔
1389
        if err != nil {
5✔
1390
                return nil, err
×
1391
        }
×
1392

1393
        return &RegisteredTower{
5✔
1394
                Tower:                  tower,
5✔
1395
                Sessions:               towerSessions,
5✔
1396
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
5✔
1397
        }, nil
5✔
1398
}
1399

1400
// getStats returns the in-memory statistics of the client since startup.
1401
func (c *client) getStats() ClientStats {
3✔
1402
        return c.stats.getStatsCopy()
3✔
1403
}
3✔
1404

1405
// policy returns the active client policy configuration.
1406
func (c *client) policy() wtpolicy.Policy {
9✔
1407
        return c.cfg.Policy
9✔
1408
}
9✔
1409

1410
// logMessage writes information about a message received from a remote peer,
1411
// using directional prepositions to signal whether the message was sent or
1412
// received.
1413
func (c *client) logMessage(
1414
        peer wtserver.Peer, msg wtwire.Message, read bool) {
2,220✔
1415

2,220✔
1416
        var action = "Received"
2,220✔
1417
        var preposition = "from"
2,220✔
1418
        if !read {
3,457✔
1419
                action = "Sending"
1,237✔
1420
                preposition = "to"
1,237✔
1421
        }
1,237✔
1422

1423
        summary := wtwire.MessageSummary(msg)
2,220✔
1424
        if len(summary) > 0 {
3,553✔
1425
                summary = "(" + summary + ")"
1,333✔
1426
        }
1,333✔
1427

1428
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,220✔
1429
                preposition, peer.RemotePub().SerializeCompressed(),
2,220✔
1430
                peer.RemoteAddr())
2,220✔
1431
}
1432

1433
func newRandomDelay(max uint32) (uint32, error) {
7✔
1434
        var maxDelay big.Int
7✔
1435
        maxDelay.SetUint64(uint64(max))
7✔
1436

7✔
1437
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
7✔
1438
        if err != nil {
7✔
1439
                return 0, err
×
1440
        }
×
1441

1442
        return uint32(randDelay.Uint64()), nil
7✔
1443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc