• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.93
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "maps"
9
        "math/big"
10
        "net"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btclog/v2"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/keychain"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
21
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
22
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
23
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
24
)
25

26
const (
27
        // DefaultReadTimeout specifies the default duration we will wait during
28
        // a read before breaking out of a blocking read.
29
        DefaultReadTimeout = 15 * time.Second
30

31
        // DefaultWriteTimeout specifies the default duration we will wait
32
        // during a write before breaking out of a blocking write.
33
        DefaultWriteTimeout = 15 * time.Second
34

35
        // DefaultStatInterval specifies the default interval between logging
36
        // metrics about the client's operation.
37
        DefaultStatInterval = time.Minute
38

39
        // DefaultSessionCloseRange is the range over which we will generate a
40
        // random number of blocks to delay closing a session after its last
41
        // channel has been closed.
42
        DefaultSessionCloseRange = 288
43

44
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
45
        // in the in-memory queue.
46
        DefaultMaxTasksInMemQueue = 2000
47
)
48

49
// genSessionFilter constructs a filter that can be used to select sessions only
50
// if they match the policy of the client (namely anchor vs legacy). If
51
// activeOnly is set, then only active sessions will be returned.
52
func (c *client) genSessionFilter(
53
        activeOnly bool) wtdb.ClientSessionFilterFn {
85✔
54

85✔
55
        return func(session *wtdb.ClientSession) bool {
113✔
56
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
28✔
57
                        return false
×
58
                }
×
59

60
                if !activeOnly {
32✔
61
                        return true
4✔
62
                }
4✔
63

64
                return session.Status == wtdb.CSessionActive
24✔
65
        }
66
}
67

68
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
69
// function that will filter out any sessions that have been exhausted. A
70
// session is considered exhausted only if it has no un-acked updates and the
71
// sequence number of the session is equal to the max updates of the session
72
// policy.
73
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
83✔
74
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
107✔
75
                return session.SeqNum < session.Policy.MaxUpdates ||
24✔
76
                        numUnAcked > 0
24✔
77
        }
24✔
78
}
79

80
// RegisteredTower encompasses information about a registered watchtower with
81
// the client.
82
type RegisteredTower struct {
83
        *wtdb.Tower
84

85
        // Sessions is the set of sessions corresponding to the watchtower.
86
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
87

88
        // ActiveSessionCandidate determines whether the watchtower is currently
89
        // being considered for new sessions.
90
        ActiveSessionCandidate bool
91
}
92

93
// BreachRetributionBuilder is a function that can be used to construct a
94
// BreachRetribution from a channel ID and a commitment height.
95
type BreachRetributionBuilder func(id lnwire.ChannelID,
96
        commitHeight uint64) (*lnwallet.BreachRetribution,
97
        channeldb.ChannelType, error)
98

99
// newTowerMsg is an internal message we'll use within the client to signal
100
// that a new tower can be considered.
101
type newTowerMsg struct {
102
        // tower holds the info about the new Tower or new tower address
103
        // required to connect to it.
104
        tower *Tower
105

106
        // errChan is the channel through which we'll send a response back to
107
        // the caller when handling their request.
108
        //
109
        // NOTE: This channel must be buffered.
110
        errChan chan error
111
}
112

113
// staleTowerMsg is an internal message we'll use within the client to
114
// signal that a tower should no longer be considered.
115
type staleTowerMsg struct {
116
        // id is the unique database identifier for the tower.
117
        id wtdb.TowerID
118

119
        // pubKey is the identifying public key of the watchtower.
120
        pubKey *btcec.PublicKey
121

122
        // addr is an optional field that when set signals that the address
123
        // should be removed from the watchtower's set of addresses, indicating
124
        // that it is stale. If it's not set, then the watchtower should be
125
        // no longer be considered for new sessions.
126
        addr net.Addr
127

128
        // errChan is the channel through which we'll send a response back to
129
        // the caller when handling their request.
130
        //
131
        // NOTE: This channel must be buffered.
132
        errChan chan error
133
}
134

135
// deactivateTowerMsg is an internal message we'll use within the TowerClient
136
// to signal that a tower should be marked as inactive.
137
type deactivateTowerMsg struct {
138
        // id is the unique database identifier for the tower.
139
        id wtdb.TowerID
140

141
        // pubKey is the identifying public key of the watchtower.
142
        pubKey *btcec.PublicKey
143

144
        // errChan is the channel through which we'll send a response back to
145
        // the caller when handling their request.
146
        //
147
        // NOTE: This channel must be buffered.
148
        errChan chan error
149
}
150

151
// terminateSessMsg is an internal message we'll use within the TowerClient to
152
// signal that a session should be terminated.
153
type terminateSessMsg struct {
154
        // id is the session identifier.
155
        id wtdb.SessionID
156

157
        // errChan is the channel through which we'll send a response back to
158
        // the caller when handling their request.
159
        //
160
        // NOTE: This channel must be buffered.
161
        errChan chan error
162
}
163

164
// clientCfg holds the configuration values required by a client.
165
type clientCfg struct {
166
        *Config
167

168
        // Policy is the session policy the client will propose when creating
169
        // new sessions with the tower. If the policy differs from any active
170
        // sessions recorded in the database, those sessions will be ignored and
171
        // new sessions will be requested immediately.
172
        Policy wtpolicy.Policy
173

174
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
175
}
176

177
// client manages backing up revoked states for all states that fall under a
178
// specific policy type.
179
type client struct {
180
        cfg *clientCfg
181

182
        log btclog.Logger
183

184
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
185

186
        negotiator        SessionNegotiator
187
        candidateTowers   TowerCandidateIterator
188
        candidateSessions map[wtdb.SessionID]*ClientSession
189
        activeSessions    *sessionQueueSet
190

191
        sessionQueue *sessionQueue
192
        prevTask     *wtdb.BackupID
193

194
        statTicker *time.Ticker
195
        stats      *clientStats
196

197
        newTowers         chan *newTowerMsg
198
        staleTowers       chan *staleTowerMsg
199
        deactivateTowers  chan *deactivateTowerMsg
200
        terminateSessions chan *terminateSessMsg
201

202
        wg   sync.WaitGroup
203
        quit chan struct{}
204
}
205

206
// newClient initializes a new client from the provided clientCfg. An error is
207
// returned if the client could not be initialized.
208
func newClient(cfg *clientCfg) (*client, error) {
36✔
209
        identifier, err := cfg.Policy.BlobType.Identifier()
36✔
210
        if err != nil {
36✔
211
                return nil, err
×
212
        }
×
213
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
36✔
214

36✔
215
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
36✔
216
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
36✔
217
                queueDB, cfg.MaxTasksInMemQueue, plog,
36✔
218
        )
36✔
219
        if err != nil {
36✔
220
                return nil, err
×
221
        }
×
222

223
        c := &client{
36✔
224
                cfg:               cfg,
36✔
225
                log:               plog,
36✔
226
                pipeline:          queue,
36✔
227
                activeSessions:    newSessionQueueSet(),
36✔
228
                statTicker:        time.NewTicker(DefaultStatInterval),
36✔
229
                stats:             new(clientStats),
36✔
230
                newTowers:         make(chan *newTowerMsg),
36✔
231
                staleTowers:       make(chan *staleTowerMsg),
36✔
232
                deactivateTowers:  make(chan *deactivateTowerMsg),
36✔
233
                terminateSessions: make(chan *terminateSessMsg),
36✔
234
                quit:              make(chan struct{}),
36✔
235
        }
36✔
236

36✔
237
        candidateTowers := newTowerListIterator()
36✔
238
        perActiveTower := func(tower *Tower) {
45✔
239
                // If the tower has already been marked as active, then there is
9✔
240
                // no need to add it to the iterator again.
9✔
241
                if candidateTowers.IsActive(tower.ID) {
9✔
242
                        return
×
243
                }
×
244

245
                c.log.Infof("Using private watchtower %x, offering policy %s",
9✔
246
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
9✔
247

9✔
248
                // Add the tower to the set of candidate towers.
9✔
249
                candidateTowers.AddCandidate(tower)
9✔
250
        }
251

252
        // Load all candidate sessions and towers from the database into the
253
        // client. We will use any of these sessions if their policies match the
254
        // current policy of the client, otherwise they will be ignored and new
255
        // sessions will be requested.
256
        candidateSessions, err := getTowerAndSessionCandidates(
36✔
257
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
36✔
258
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
36✔
259
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
36✔
260
        )
36✔
261
        if err != nil {
36✔
262
                return nil, err
×
263
        }
×
264

265
        c.candidateTowers = candidateTowers
36✔
266
        c.candidateSessions = candidateSessions
36✔
267

36✔
268
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
36✔
269
                DB:            cfg.DB,
36✔
270
                SecretKeyRing: cfg.SecretKeyRing,
36✔
271
                Policy:        cfg.Policy,
36✔
272
                ChainHash:     cfg.ChainHash,
36✔
273
                SendMessage:   c.sendMessage,
36✔
274
                ReadMessage:   c.readMessage,
36✔
275
                Dial:          c.dial,
36✔
276
                Candidates:    c.candidateTowers,
36✔
277
                MinBackoff:    cfg.MinBackoff,
36✔
278
                MaxBackoff:    cfg.MaxBackoff,
36✔
279
                Log:           plog,
36✔
280
        })
36✔
281

36✔
282
        return c, nil
36✔
283
}
284

285
// getTowerAndSessionCandidates loads all the towers from the DB and then
286
// fetches the sessions for each of tower. Sessions are only collected if they
287
// pass the sessionFilter check. If a tower has a session that does pass the
288
// sessionFilter check then the perActiveTower call-back will be called on that
289
// tower.
290
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
291
        perActiveTower func(tower *Tower),
292
        opts ...wtdb.ClientSessionListOption) (
293
        map[wtdb.SessionID]*ClientSession, error) {
36✔
294

36✔
295
        // Fetch all active towers from the DB.
36✔
296
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
45✔
297
                return tower.Status == wtdb.TowerStatusActive
9✔
298
        })
9✔
299
        if err != nil {
36✔
300
                return nil, err
×
301
        }
×
302

303
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
36✔
304
        for _, dbTower := range towers {
45✔
305
                tower, err := NewTowerFromDBTower(dbTower)
9✔
306
                if err != nil {
9✔
307
                        return nil, err
×
308
                }
×
309

310
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
9✔
311
                if err != nil {
9✔
312
                        return nil, err
×
313
                }
×
314

315
                for _, s := range sessions {
17✔
316
                        cs, err := NewClientSessionFromDBSession(
8✔
317
                                s, tower, keyRing,
8✔
318
                        )
8✔
319
                        if err != nil {
8✔
320
                                return nil, err
×
321
                        }
×
322

323
                        // Add the session to the set of candidate sessions.
324
                        candidateSessions[s.ID] = cs
8✔
325
                }
326

327
                perActiveTower(tower)
9✔
328
        }
329

330
        return candidateSessions, nil
36✔
331
}
332

333
// getClientSessions retrieves the client sessions for a particular tower if
334
// specified, otherwise all client sessions for all towers are retrieved. An
335
// optional filter can be provided to filter out any undesired client sessions.
336
//
337
// NOTE: This method should only be used when deserialization of a
338
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
339
// ListClientSessions method should be used.
340
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
341
        opts ...wtdb.ClientSessionListOption) (
342
        map[wtdb.SessionID]*ClientSession, error) {
47✔
343

47✔
344
        dbSessions, err := db.ListClientSessions(forTower, opts...)
47✔
345
        if err != nil {
47✔
346
                return nil, err
×
347
        }
×
348

349
        // Reload the tower from disk using the tower ID contained in each
350
        // candidate session. We will also rederive any session keys needed to
351
        // be able to communicate with the towers and authenticate session
352
        // requests. This prevents us from having to store the private keys on
353
        // disk.
354
        sessions := make(map[wtdb.SessionID]*ClientSession)
47✔
355
        for _, s := range dbSessions {
58✔
356
                dbTower, err := db.LoadTowerByID(s.TowerID)
11✔
357
                if err != nil {
11✔
358
                        return nil, err
×
359
                }
×
360

361
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
11✔
362
                        Family: keychain.KeyFamilyTowerSession,
11✔
363
                        Index:  s.KeyIndex,
11✔
364
                })
11✔
365
                if err != nil {
11✔
366
                        return nil, err
×
367
                }
×
368

369
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
11✔
370

11✔
371
                tower, err := NewTowerFromDBTower(dbTower)
11✔
372
                if err != nil {
11✔
373
                        return nil, err
×
374
                }
×
375

376
                sessions[s.ID] = &ClientSession{
11✔
377
                        ID:                s.ID,
11✔
378
                        ClientSessionBody: s.ClientSessionBody,
11✔
379
                        Tower:             tower,
11✔
380
                        SessionKeyECDH:    sessionKeyECDH,
11✔
381
                }
11✔
382
        }
383

384
        return sessions, nil
47✔
385
}
386

387
// start initializes the watchtower client by loading or negotiating an active
388
// session and then begins processing backup tasks from the request pipeline.
389
func (c *client) start() error {
36✔
390
        c.log.Infof("Watchtower client starting")
36✔
391

36✔
392
        // First, restart a session queue for any sessions that have
36✔
393
        // committed but unacked state updates. This ensures that these
36✔
394
        // sessions will be able to flush the committed updates after a
36✔
395
        // restart.
36✔
396
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
36✔
397
        for _, session := range c.candidateSessions {
44✔
398
                committedUpdates, err := fetchCommittedUpdates(
8✔
399
                        &session.ID,
8✔
400
                )
8✔
401
                if err != nil {
8✔
402
                        return err
×
403
                }
×
404

405
                if len(committedUpdates) > 0 {
10✔
406
                        c.log.Infof("Starting session=%s to process "+
2✔
407
                                "%d committed backups", session.ID,
2✔
408
                                len(committedUpdates))
2✔
409

2✔
410
                        c.initActiveQueue(session, committedUpdates)
2✔
411
                }
2✔
412
        }
413

414
        // Now start the session negotiator, which will allow us to request new
415
        // session as soon as the backupDispatcher starts up.
416
        err := c.negotiator.Start()
36✔
417
        if err != nil {
36✔
418
                return err
×
419
        }
×
420

421
        // Start the task pipeline to which new backup tasks will be
422
        // submitted from active links.
423
        err = c.pipeline.Start()
36✔
424
        if err != nil {
36✔
425
                return err
×
426
        }
×
427

428
        c.wg.Add(1)
36✔
429
        go c.backupDispatcher()
36✔
430

36✔
431
        c.log.Infof("Watchtower client started successfully")
36✔
432

36✔
433
        return nil
36✔
434
}
435

436
// stop idempotently initiates a graceful shutdown of the watchtower client.
437
func (c *client) stop() error {
36✔
438
        var returnErr error
36✔
439
        c.log.Debugf("Stopping watchtower client")
36✔
440

36✔
441
        // 1. Stop the session negotiator.
36✔
442
        err := c.negotiator.Stop()
36✔
443
        if err != nil {
36✔
444
                returnErr = err
×
445
        }
×
446

447
        // 2. Stop the backup dispatcher and any other goroutines.
448
        close(c.quit)
36✔
449
        c.wg.Wait()
36✔
450

36✔
451
        // 3. If there was a left over 'prevTask' from the backup
36✔
452
        // dispatcher, replay that onto the pipeline.
36✔
453
        if c.prevTask != nil {
36✔
454
                err = c.pipeline.QueueBackupID(c.prevTask)
×
455
                if err != nil {
×
456
                        returnErr = err
×
457
                }
×
458
        }
459

460
        // 4. Shutdown all active session queues in parallel. These will
461
        // exit once all unhandled updates have been replayed to the
462
        // task pipeline.
463
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
102✔
464
                return func() {
132✔
465
                        err := s.Stop(false)
66✔
466
                        if err != nil {
66✔
467
                                c.log.Errorf("could not stop session "+
×
468
                                        "queue: %s: %v", s.ID(), err)
×
469

×
470
                                returnErr = err
×
471
                        }
×
472
                }
473
        })
474

475
        // 5. Shutdown the backup queue, which will prevent any further
476
        // updates from being accepted.
477
        if err = c.pipeline.Stop(); err != nil {
36✔
478
                returnErr = err
×
479
        }
×
480

481
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
36✔
482

36✔
483
        return returnErr
36✔
484
}
485

486
// backupState initiates a request to back up a particular revoked state. If the
487
// method returns nil, the backup is guaranteed to be successful unless the:
488
//   - justice transaction would create dust outputs when trying to abide by the
489
//     negotiated policy, or
490
//   - breached outputs contain too little value to sweep at the target sweep
491
//     fee rate.
492
func (c *client) backupState(chanID *lnwire.ChannelID,
493
        stateNum uint64) error {
469✔
494

469✔
495
        id := &wtdb.BackupID{
469✔
496
                ChanID:       *chanID,
469✔
497
                CommitHeight: stateNum,
469✔
498
        }
469✔
499

469✔
500
        return c.pipeline.QueueBackupID(id)
469✔
501
}
469✔
502

503
// nextSessionQueue attempts to fetch an active session from our set of
504
// candidate sessions. Candidate sessions with a differing policy from the
505
// active client's advertised policy will be ignored, but may be resumed if the
506
// client is restarted with a matching policy. If no candidates were found, nil
507
// is returned to signal that we need to request a new policy.
508
func (c *client) nextSessionQueue() (*sessionQueue, error) {
83✔
509
        // Select any candidate session at random, and remove it from the set of
83✔
510
        // candidate sessions.
83✔
511
        var candidateSession *ClientSession
83✔
512
        for id, sessionInfo := range c.candidateSessions {
166✔
513
                delete(c.candidateSessions, id)
83✔
514

83✔
515
                // Skip any sessions with policies that don't match the current
83✔
516
                // TxPolicy, as they would result in different justice
83✔
517
                // transactions from what is requested. These can be used again
83✔
518
                // if the client changes their configuration and restarting.
83✔
519
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
83✔
520
                        continue
×
521
                }
522

523
                candidateSession = sessionInfo
83✔
524
                break
83✔
525
        }
526

527
        // If none of the sessions could be used or none were found, we'll
528
        // return nil to signal that we need another session to be negotiated.
529
        if candidateSession == nil {
83✔
530
                return nil, nil
×
531
        }
×
532

533
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
83✔
534
                &candidateSession.ID,
83✔
535
        )
83✔
536
        if err != nil {
83✔
537
                return nil, err
×
538
        }
×
539

540
        // Initialize the session queue and spin it up, so it can begin handling
541
        // updates. If the queue was already made active on startup, this will
542
        // simply return the existing session queue from the set.
543
        return c.getOrInitActiveQueue(candidateSession, updates), nil
83✔
544
}
545

546
// stopAndRemoveSession stops the session with the given ID and removes it from
547
// the in-memory active sessions set.
548
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
4✔
549
        return c.activeSessions.StopAndRemove(id, final)
4✔
550
}
4✔
551

552
// deleteSessionFromTower dials the tower that we created the session with and
553
// attempts to send the tower the DeleteSession message.
554
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
4✔
555
        // First, we check if we have already loaded this tower in our
4✔
556
        // candidate towers iterator.
4✔
557
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
4✔
558
        if errors.Is(err, ErrTowerNotInIterator) {
4✔
559
                // If not, then we attempt to load it from the DB.
×
560
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
561
                if err != nil {
×
562
                        return err
×
563
                }
×
564

565
                tower, err = NewTowerFromDBTower(dbTower)
×
566
                if err != nil {
×
567
                        return err
×
568
                }
×
569
        } else if err != nil {
4✔
570
                return err
×
571
        }
×
572

573
        session, err := NewClientSessionFromDBSession(
4✔
574
                sess, tower, c.cfg.SecretKeyRing,
4✔
575
        )
4✔
576
        if err != nil {
4✔
577
                return err
×
578
        }
×
579

580
        localInit := wtwire.NewInitMessage(
4✔
581
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
4✔
582
                c.cfg.ChainHash,
4✔
583
        )
4✔
584

4✔
585
        var (
4✔
586
                conn wtserver.Peer
4✔
587

4✔
588
                // addrIterator is a copy of the tower's address iterator.
4✔
589
                // We use this copy so that iterating through the addresses does
4✔
590
                // not affect any other threads using this iterator.
4✔
591
                addrIterator = tower.Addresses.Copy()
4✔
592
                towerAddr    = addrIterator.Peek()
4✔
593
        )
4✔
594
        // Attempt to dial the tower with its available addresses.
4✔
595
        for {
8✔
596
                conn, err = c.dial(
4✔
597
                        session.SessionKeyECDH, &lnwire.NetAddress{
4✔
598
                                IdentityKey: tower.IdentityKey,
4✔
599
                                Address:     towerAddr,
4✔
600
                        },
4✔
601
                )
4✔
602
                if err != nil {
4✔
603
                        // If there are more addrs available, immediately try
×
604
                        // those.
×
605
                        nextAddr, iteratorErr := addrIterator.Next()
×
606
                        if iteratorErr == nil {
×
607
                                towerAddr = nextAddr
×
608
                                continue
×
609
                        }
610

611
                        // Otherwise, if we have exhausted the address list,
612
                        // exit.
613
                        addrIterator.Reset()
×
614

×
615
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
616
                                "available addresses",
×
617
                                tower.IdentityKey.SerializeCompressed())
×
618
                }
619

620
                break
4✔
621
        }
622
        defer conn.Close()
4✔
623

4✔
624
        // Send Init to tower.
4✔
625
        err = c.sendMessage(conn, localInit)
4✔
626
        if err != nil {
4✔
627
                return err
×
628
        }
×
629

630
        // Receive Init from tower.
631
        remoteMsg, err := c.readMessage(conn)
4✔
632
        if err != nil {
4✔
633
                return err
×
634
        }
×
635

636
        remoteInit, ok := remoteMsg.(*wtwire.Init)
4✔
637
        if !ok {
4✔
638
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
639
                        towerAddr, remoteMsg)
×
640
        }
×
641

642
        // Validate Init.
643
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
4✔
644
        if err != nil {
4✔
645
                return err
×
646
        }
×
647

648
        // Send DeleteSession to tower.
649
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
4✔
650
        if err != nil {
4✔
651
                return err
×
652
        }
×
653

654
        // Receive DeleteSessionReply from tower.
655
        remoteMsg, err = c.readMessage(conn)
4✔
656
        if err != nil {
4✔
657
                return err
×
658
        }
×
659

660
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
4✔
661
        if !ok {
4✔
662
                return fmt.Errorf("watchtower %s responded with %T to "+
×
663
                        "DeleteSession", towerAddr, remoteMsg)
×
664
        }
×
665

666
        switch deleteSessionReply.Code {
4✔
667
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
4✔
668
                return nil
4✔
669
        default:
×
670
                return fmt.Errorf("received error code %v in "+
×
671
                        "DeleteSessionReply when attempting to delete "+
×
672
                        "session from tower", deleteSessionReply.Code)
×
673
        }
674
}
675

676
// backupDispatcher processes events coming from the taskPipeline and is
677
// responsible for detecting when the client needs to renegotiate a session to
678
// fulfill continuing demand. The event loop exits if the client is quit.
679
//
680
// NOTE: This method MUST be run as a goroutine.
681
func (c *client) backupDispatcher() {
36✔
682
        defer c.wg.Done()
36✔
683

36✔
684
        c.log.Tracef("Starting backup dispatcher")
36✔
685
        defer c.log.Tracef("Stopping backup dispatcher")
36✔
686

36✔
687
        for {
728✔
688
                switch {
692✔
689

690
                // No active session queue and no additional sessions.
691
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
79✔
692
                        c.log.Infof("Requesting new session.")
79✔
693

79✔
694
                        // Immediately request a new session.
79✔
695
                        c.negotiator.RequestSession()
79✔
696

79✔
697
                        // Wait until we receive the newly negotiated session.
79✔
698
                        // All backups sent in the meantime are queued in the
79✔
699
                        // revoke queue, as we cannot process them.
79✔
700
                awaitSession:
79✔
701
                        select {
79✔
702
                        case session := <-c.negotiator.NewSessions():
73✔
703
                                c.log.Infof("Acquired new session with id=%s",
73✔
704
                                        session.ID)
73✔
705
                                c.candidateSessions[session.ID] = session
73✔
706
                                c.stats.sessionAcquired()
73✔
707

73✔
708
                                // We'll continue to choose the newly negotiated
73✔
709
                                // session as our active session queue.
73✔
710
                                continue
73✔
711

712
                        case <-c.statTicker.C:
×
713
                                c.log.Infof("Client stats: %s", c.stats)
×
714

715
                        // A new tower has been requested to be added. We'll
716
                        // update our persisted and in-memory state and consider
717
                        // its corresponding sessions, if any, as new
718
                        // candidates.
719
                        case msg := <-c.newTowers:
37✔
720
                                msg.errChan <- c.handleNewTower(msg.tower)
37✔
721

722
                        // A tower has been requested to be removed. We'll
723
                        // only allow removal of it if the address in question
724
                        // is not currently being used for session negotiation.
725
                        case msg := <-c.staleTowers:
4✔
726
                                msg.errChan <- c.handleStaleTower(msg)
4✔
727

728
                        // A tower has been requested to be de-activated. We'll
729
                        // only allow this if the tower is not currently being
730
                        // used for session negotiation.
731
                        case msg := <-c.deactivateTowers:
×
732
                                msg.errChan <- c.handleDeactivateTower(msg)
×
733

734
                        // A request has come through to terminate a session.
735
                        case msg := <-c.terminateSessions:
×
736
                                msg.errChan <- c.handleTerminateSession(msg)
×
737

738
                        case <-c.quit:
6✔
739
                                return
6✔
740
                        }
741

742
                        // Instead of looping, we'll jump back into the select
743
                        // case and await the delivery of the session to prevent
744
                        // us from re-requesting additional sessions.
745
                        goto awaitSession
41✔
746

747
                // No active session queue but have additional sessions.
748
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
83✔
749
                        // We've exhausted the prior session, we'll pop another
83✔
750
                        // from the remaining sessions and continue processing
83✔
751
                        // backup tasks.
83✔
752
                        var err error
83✔
753
                        c.sessionQueue, err = c.nextSessionQueue()
83✔
754
                        if err != nil {
83✔
755
                                c.log.Errorf("error fetching next session "+
×
756
                                        "queue: %v", err)
×
757
                        }
×
758

759
                        if c.sessionQueue != nil {
166✔
760
                                c.log.Debugf("Loaded next candidate session "+
83✔
761
                                        "queue id=%s", c.sessionQueue.ID())
83✔
762
                        }
83✔
763

764
                // Have active session queue, process backups.
765
                case c.sessionQueue != nil:
530✔
766
                        if c.prevTask != nil {
531✔
767
                                c.processTask(c.prevTask)
1✔
768

1✔
769
                                // Continue to ensure the sessionQueue is
1✔
770
                                // properly initialized before attempting to
1✔
771
                                // process more tasks from the pipeline.
1✔
772
                                continue
1✔
773
                        }
774

775
                        // Normal operation where new tasks are read from the
776
                        // pipeline.
777
                        select {
529✔
778

779
                        // If any sessions are negotiated while we have an
780
                        // active session queue, queue them for future use.
781
                        // This shouldn't happen with the current design, so
782
                        // it doesn't hurt to select here just in case. In the
783
                        // future, we will likely allow more asynchrony so that
784
                        // we can request new sessions before the session is
785
                        // fully empty, which this case would handle.
786
                        case session := <-c.negotiator.NewSessions():
×
787
                                c.log.Warnf("Acquired new session with id=%s "+
×
788
                                        "while processing tasks", session.ID)
×
789
                                c.candidateSessions[session.ID] = session
×
790
                                c.stats.sessionAcquired()
×
791

792
                        case <-c.statTicker.C:
×
793
                                c.log.Infof("Client stats: %s", c.stats)
×
794

795
                        // Process each backup task serially from the queue of
796
                        // revoked states.
797
                        case task, ok := <-c.pipeline.NextBackupID():
479✔
798
                                // All backups in the pipeline have been
479✔
799
                                // processed, it is now safe to exit.
479✔
800
                                if !ok {
479✔
801
                                        return
×
802
                                }
×
803

804
                                c.log.Debugf("Processing %v", task)
479✔
805

479✔
806
                                c.stats.taskReceived()
479✔
807
                                c.processTask(task)
479✔
808

809
                        // A new tower has been requested to be added. We'll
810
                        // update our persisted and in-memory state and consider
811
                        // its corresponding sessions, if any, as new
812
                        // candidates.
813
                        case msg := <-c.newTowers:
10✔
814
                                msg.errChan <- c.handleNewTower(msg.tower)
10✔
815

816
                        // A tower has been removed, so we'll remove certain
817
                        // information that's persisted and also in our
818
                        // in-memory state depending on the request, and set any
819
                        // of its corresponding candidate sessions as inactive.
820
                        case msg := <-c.staleTowers:
7✔
821
                                msg.errChan <- c.handleStaleTower(msg)
7✔
822

823
                        // A tower has been requested to be de-activated.
824
                        case msg := <-c.deactivateTowers:
2✔
825
                                msg.errChan <- c.handleDeactivateTower(msg)
2✔
826

827
                        // A request has come through to terminate a session.
828
                        case msg := <-c.terminateSessions:
1✔
829
                                msg.errChan <- c.handleTerminateSession(msg)
1✔
830

831
                        case <-c.quit:
30✔
832
                                return
30✔
833
                        }
834
                }
835
        }
836
}
837

838
// processTask attempts to schedule the given backupTask on the active
839
// sessionQueue. The task will either be accepted or rejected, after which the
840
// appropriate modifications to the client's state machine will be made. After
841
// every invocation of processTask, the caller should ensure that the
842
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
843
// that are rejected because the active sessionQueue is full will be cached as
844
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
845
func (c *client) processTask(task *wtdb.BackupID) {
480✔
846
        script, ok := c.cfg.getSweepScript(task.ChanID)
480✔
847
        if !ok {
480✔
848
                log.Infof("not processing task for unregistered channel: %s",
×
849
                        task.ChanID)
×
850

×
851
                return
×
852
        }
×
853

854
        backupTask := newBackupTask(*task, script)
480✔
855

480✔
856
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
480✔
857
        if accepted {
954✔
858
                c.taskAccepted(task, status)
474✔
859
        } else {
480✔
860
                c.taskRejected(task, status)
6✔
861
        }
6✔
862
}
863

864
// taskAccepted processes the acceptance of a task by a sessionQueue depending
865
// on the state the sessionQueue is in *after* the task is added. The client's
866
// prevTask is always removed as a result of this call. The client's
867
// sessionQueue will be removed if accepting the task left the sessionQueue in
868
// an exhausted state.
869
func (c *client) taskAccepted(task *wtdb.BackupID,
870
        newStatus sessionQueueStatus) {
474✔
871

474✔
872
        c.log.Infof("Queued %v successfully for session %v", task,
474✔
873
                c.sessionQueue.ID())
474✔
874

474✔
875
        c.stats.taskAccepted()
474✔
876

474✔
877
        // If this task was accepted, we discard anything held in the prevTask.
474✔
878
        // Either it was nil before, or is the task which was just accepted.
474✔
879
        c.prevTask = nil
474✔
880

474✔
881
        switch newStatus {
474✔
882

883
        // The sessionQueue still has capacity after accepting this task.
884
        case sessionQueueAvailable:
431✔
885

886
        // The sessionQueue is full after accepting this task, so we will need
887
        // to request a new one before proceeding.
888
        case sessionQueueExhausted:
43✔
889
                c.stats.sessionExhausted()
43✔
890

43✔
891
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
43✔
892

43✔
893
                // This task left the session exhausted, set it to nil and
43✔
894
                // proceed to the next loop, so we can consume another
43✔
895
                // pre-negotiated session or request another.
43✔
896
                c.sessionQueue = nil
43✔
897
        }
898
}
899

900
// taskRejected process the rejection of a task by a sessionQueue depending on
901
// the state the was in *before* the task was rejected. The client's prevTask
902
// will cache the task if the sessionQueue was exhausted beforehand, and nil
903
// the sessionQueue to find a new session. If the sessionQueue was not
904
// exhausted and not shutting down, the client marks the task as ineligible, as
905
// this implies we couldn't construct a valid justice transaction given the
906
// session's policy.
907
func (c *client) taskRejected(task *wtdb.BackupID,
908
        curStatus sessionQueueStatus) {
6✔
909

6✔
910
        switch curStatus {
6✔
911

912
        // The sessionQueue has available capacity but the task was rejected,
913
        // this indicates that the task was ineligible for backup.
914
        case sessionQueueAvailable:
5✔
915
                c.stats.taskIneligible()
5✔
916

5✔
917
                c.log.Infof("Ignoring ineligible %v", task)
5✔
918

5✔
919
                err := c.cfg.DB.MarkBackupIneligible(
5✔
920
                        task.ChanID, task.CommitHeight,
5✔
921
                )
5✔
922
                if err != nil {
5✔
923
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
924
                                task, err)
×
925

×
926
                        // It is safe to not handle this error, even if we could
×
927
                        // not persist the result. At worst, this task may be
×
928
                        // reprocessed on a subsequent start up, and will either
×
929
                        // succeed do a change in session parameters or fail in
×
930
                        // the same manner.
×
931
                }
×
932

933
                // If this task was rejected *and* the session had available
934
                // capacity, we discard anything held in the prevTask. Either it
935
                // was nil before, or is the task which was just rejected.
936
                c.prevTask = nil
5✔
937

938
        // The sessionQueue rejected the task because it is full, we will stash
939
        // this task and try to add it to the next available sessionQueue.
940
        case sessionQueueExhausted:
1✔
941
                c.stats.sessionExhausted()
1✔
942

1✔
943
                c.log.Debugf("Session %v exhausted, %v queued for next session",
1✔
944
                        c.sessionQueue.ID(), task)
1✔
945

1✔
946
                // Cache the task that we pulled off, so that we can process it
1✔
947
                // once a new session queue is available.
1✔
948
                c.sessionQueue = nil
1✔
949
                c.prevTask = task
1✔
950

951
        // The sessionQueue rejected the task because it is shutting down. We
952
        // will stash this task and try to add it to the next available
953
        // sessionQueue.
954
        case sessionQueueShuttingDown:
×
955
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
956
                        "next session", c.sessionQueue.ID(), task)
×
957

×
958
                // Cache the task that we pulled off, so that we can process it
×
959
                // once a new session queue is available.
×
960
                c.sessionQueue = nil
×
961
                c.prevTask = task
×
962
        }
963
}
964

965
// dial connects the peer at addr using privKey as our secret key for the
966
// connection. The connection will use the configured Net's resolver to resolve
967
// the address for either Tor or clear net connections.
968
func (c *client) dial(localKey keychain.SingleKeyECDH,
969
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
452✔
970

452✔
971
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
452✔
972
}
452✔
973

974
// readMessage receives and parses the next message from the given Peer. An
975
// error is returned if a message is not received before the server's read
976
// timeout, the read off the wire failed, or the message could not be
977
// deserialized.
978
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
1,234✔
979
        // Set a read timeout to ensure we drop the connection if nothing is
1,234✔
980
        // received in a timely manner.
1,234✔
981
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
1,234✔
982
        if err != nil {
1,234✔
983
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
984
                c.log.Errorf("Unable to read msg: %v", err)
×
985
                return nil, err
×
986
        }
×
987

988
        // Pull the next message off the wire,
989
        rawMsg, err := peer.ReadNextMessage()
1,234✔
990
        if err != nil {
1,485✔
991
                err = fmt.Errorf("unable to read message: %w", err)
251✔
992
                c.log.Errorf("Unable to read msg: %v", err)
251✔
993
                return nil, err
251✔
994
        }
251✔
995

996
        // Parse the received message according to the watchtower wire
997
        // specification.
998
        msgReader := bytes.NewReader(rawMsg)
983✔
999
        msg, err := wtwire.ReadMessage(msgReader, 0)
983✔
1000
        if err != nil {
983✔
1001
                err = fmt.Errorf("unable to parse message: %w", err)
×
1002
                c.log.Errorf("Unable to read msg: %v", err)
×
1003
                return nil, err
×
1004
        }
×
1005

1006
        c.logMessage(peer, msg, true)
983✔
1007

983✔
1008
        return msg, nil
983✔
1009
}
1010

1011
// sendMessage sends a watchtower wire message to the target peer.
1012
func (c *client) sendMessage(peer wtserver.Peer,
1013
        msg wtwire.Message) error {
1,235✔
1014

1,235✔
1015
        // Encode the next wire message into the buffer.
1,235✔
1016
        // TODO(conner): use buffer pool
1,235✔
1017
        var b bytes.Buffer
1,235✔
1018
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,235✔
1019
        if err != nil {
1,235✔
1020
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1021
                c.log.Errorf("Unable to send msg: %v", err)
×
1022
                return err
×
1023
        }
×
1024

1025
        // Set the write deadline for the connection, ensuring we drop the
1026
        // connection if nothing is sent in a timely manner.
1027
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
1,235✔
1028
        if err != nil {
1,235✔
1029
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1030
                c.log.Errorf("Unable to send msg: %v", err)
×
1031
                return err
×
1032
        }
×
1033

1034
        c.logMessage(peer, msg, false)
1,235✔
1035

1,235✔
1036
        // Write out the full message to the remote peer.
1,235✔
1037
        _, err = peer.Write(b.Bytes())
1,235✔
1038
        if err != nil {
1,236✔
1039
                c.log.Errorf("Unable to send msg: %v", err)
1✔
1040
        }
1✔
1041
        return err
1,235✔
1042
}
1043

1044
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1045
// database and supplying it with the resources needed by the client.
1046
func (c *client) newSessionQueue(s *ClientSession,
1047
        updates []wtdb.CommittedUpdate) *sessionQueue {
82✔
1048

82✔
1049
        return newSessionQueue(&sessionQueueConfig{
82✔
1050
                ClientSession:          s,
82✔
1051
                ChainHash:              c.cfg.ChainHash,
82✔
1052
                Dial:                   c.dial,
82✔
1053
                ReadMessage:            c.readMessage,
82✔
1054
                SendMessage:            c.sendMessage,
82✔
1055
                Signer:                 c.cfg.Signer,
82✔
1056
                DB:                     c.cfg.DB,
82✔
1057
                MinBackoff:             c.cfg.MinBackoff,
82✔
1058
                MaxBackoff:             c.cfg.MaxBackoff,
82✔
1059
                Log:                    c.log,
82✔
1060
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
82✔
1061
                TaskPipeline:           c.pipeline,
82✔
1062
        }, updates)
82✔
1063
}
82✔
1064

1065
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1066
// passed ClientSession. If it exists, the active sessionQueue is returned.
1067
// Otherwise, a new sessionQueue is initialized and added to the set.
1068
func (c *client) getOrInitActiveQueue(s *ClientSession,
1069
        updates []wtdb.CommittedUpdate) *sessionQueue {
83✔
1070

83✔
1071
        if sq, ok := c.activeSessions.Get(s.ID); ok {
86✔
1072
                return sq
3✔
1073
        }
3✔
1074

1075
        return c.initActiveQueue(s, updates)
80✔
1076
}
1077

1078
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1079
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1080
// so that it can deliver any committed updates or begin accepting newly
1081
// assigned tasks.
1082
func (c *client) initActiveQueue(s *ClientSession,
1083
        updates []wtdb.CommittedUpdate) *sessionQueue {
82✔
1084

82✔
1085
        // Initialize the session queue, providing it with all the resources it
82✔
1086
        // requires from the client instance.
82✔
1087
        sq := c.newSessionQueue(s, updates)
82✔
1088

82✔
1089
        // Add the session queue as an active session so that we remember to
82✔
1090
        // stop it on shutdown. This method will also start the queue so that it
82✔
1091
        // can be active in processing newly assigned tasks or to upload
82✔
1092
        // previously committed updates.
82✔
1093
        c.activeSessions.AddAndStart(sq)
82✔
1094

82✔
1095
        return sq
82✔
1096
}
82✔
1097

1098
// terminateSession sets the given session's status to CSessionTerminal meaning
1099
// that it will not be used again.
1100
func (c *client) terminateSession(id wtdb.SessionID) error {
1✔
1101
        errChan := make(chan error, 1)
1✔
1102

1✔
1103
        select {
1✔
1104
        case c.terminateSessions <- &terminateSessMsg{
1105
                id:      id,
1106
                errChan: errChan,
1107
        }:
1✔
1108
        case <-c.pipeline.quit:
×
1109
                return ErrClientExiting
×
1110
        }
1111

1112
        select {
1✔
1113
        case err := <-errChan:
1✔
1114
                return err
1✔
1115
        case <-c.pipeline.quit:
×
1116
                return ErrClientExiting
×
1117
        }
1118
}
1119

1120
// handleTerminateSession handles a request to terminate a session. It will
1121
// first shut down the session if it is part of the active session set, then
1122
// it will ensure that the active session queue is set reset if it is using the
1123
// session in question. Finally, the session's status in the DB will be updated.
1124
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
1✔
1125
        id := msg.id
1✔
1126

1✔
1127
        delete(c.candidateSessions, id)
1✔
1128

1✔
1129
        err := c.activeSessions.StopAndRemove(id, true)
1✔
1130
        if err != nil {
1✔
1131
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1132
        }
×
1133

1134
        // If our active session queue corresponds to the session being
1135
        // terminated, then we'll proceed to negotiate a new one.
1136
        if c.sessionQueue != nil {
2✔
1137
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
2✔
1138
                        c.sessionQueue = nil
1✔
1139
                }
1✔
1140
        }
1141

1142
        return nil
1✔
1143
}
1144

1145
// deactivateTower sends a tower deactivation request to the backupDispatcher
1146
// where it will be handled synchronously. The request should result in all the
1147
// sessions that we have with the given tower being shutdown and removed from
1148
// our in-memory set of active sessions.
1149
func (c *client) deactivateTower(id wtdb.TowerID,
1150
        pubKey *btcec.PublicKey) error {
2✔
1151

2✔
1152
        errChan := make(chan error, 1)
2✔
1153

2✔
1154
        select {
2✔
1155
        case c.deactivateTowers <- &deactivateTowerMsg{
1156
                id:      id,
1157
                pubKey:  pubKey,
1158
                errChan: errChan,
1159
        }:
2✔
1160
        case <-c.pipeline.quit:
×
1161
                return ErrClientExiting
×
1162
        }
1163

1164
        select {
2✔
1165
        case err := <-errChan:
2✔
1166
                return err
2✔
1167
        case <-c.pipeline.quit:
×
1168
                return ErrClientExiting
×
1169
        }
1170
}
1171

1172
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1173
// it from the in-memory candidate set, and we will also stop any active
1174
// sessions we have with this tower.
1175
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
2✔
1176
        // Remove the tower from our in-memory candidate set so that it is not
2✔
1177
        // used for any new session negotiations.
2✔
1178
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
2✔
1179
        if err != nil {
2✔
1180
                return err
×
1181
        }
×
1182

1183
        pubKey := msg.pubKey.SerializeCompressed()
2✔
1184
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
2✔
1185
        if err != nil {
2✔
1186
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1187
                        "%v", pubKey, err)
×
1188
        }
×
1189

1190
        // Iterate over all the sessions we have for this tower and remove them
1191
        // from our candidate set and also from our set of started, active
1192
        // sessions.
1193
        for sessionID := range sessions {
5✔
1194
                delete(c.candidateSessions, sessionID)
3✔
1195

3✔
1196
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1197
                if err != nil {
3✔
1198
                        return fmt.Errorf("could not stop session %s: %w",
×
1199
                                sessionID, err)
×
1200
                }
×
1201
        }
1202

1203
        // If our active session queue corresponds to the stale tower, we'll
1204
        // proceed to negotiate a new one.
1205
        if c.sessionQueue != nil {
4✔
1206
                towerKey := c.sessionQueue.tower.IdentityKey
2✔
1207

2✔
1208
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
4✔
1209
                        c.sessionQueue = nil
2✔
1210
                }
2✔
1211
        }
1212

1213
        return nil
2✔
1214
}
1215

1216
// addTower adds a new watchtower reachable at the given address and considers
1217
// it for new sessions. If the watchtower already exists, then any new addresses
1218
// included will be considered when dialing it for session negotiations and
1219
// backups.
1220
func (c *client) addTower(tower *Tower) error {
47✔
1221
        errChan := make(chan error, 1)
47✔
1222

47✔
1223
        select {
47✔
1224
        case c.newTowers <- &newTowerMsg{
1225
                tower:   tower,
1226
                errChan: errChan,
1227
        }:
47✔
1228
        case <-c.pipeline.quit:
×
1229
                return ErrClientExiting
×
1230
        }
1231

1232
        select {
47✔
1233
        case err := <-errChan:
47✔
1234
                return err
47✔
1235
        case <-c.pipeline.quit:
×
1236
                return ErrClientExiting
×
1237
        }
1238
}
1239

1240
// handleNewTower handles a request for a new tower to be added. If the tower
1241
// already exists, then its corresponding sessions, if any, will be set
1242
// considered as candidates.
1243
func (c *client) handleNewTower(tower *Tower) error {
47✔
1244
        c.candidateTowers.AddCandidate(tower)
47✔
1245

47✔
1246
        // Include all of its corresponding sessions to our set of candidates.
47✔
1247
        sessions, err := getClientSessions(
47✔
1248
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
47✔
1249
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
47✔
1250
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
47✔
1251
        )
47✔
1252
        if err != nil {
47✔
1253
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1254
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1255
        }
×
1256
        maps.Copy(c.candidateSessions, sessions)
47✔
1257

47✔
1258
        return nil
47✔
1259
}
1260

1261
// removeTower removes a watchtower from being considered for future session
1262
// negotiations and from being used for any subsequent backups until it's added
1263
// again. If an address is provided, then this call only serves as a way of
1264
// removing the address from the watchtower instead.
1265
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1266
        addr net.Addr) error {
11✔
1267

11✔
1268
        errChan := make(chan error, 1)
11✔
1269

11✔
1270
        select {
11✔
1271
        case c.staleTowers <- &staleTowerMsg{
1272
                id:      id,
1273
                pubKey:  pubKey,
1274
                addr:    addr,
1275
                errChan: errChan,
1276
        }:
11✔
1277
        case <-c.pipeline.quit:
×
1278
                return ErrClientExiting
×
1279
        }
1280

1281
        select {
11✔
1282
        case err := <-errChan:
11✔
1283
                return err
11✔
1284
        case <-c.pipeline.quit:
×
1285
                return ErrClientExiting
×
1286
        }
1287
}
1288

1289
// handleStaleTower handles a request for an existing tower to be removed. If
1290
// none of the tower's sessions have pending updates, then they will become
1291
// inactive and removed as candidates. If the active session queue corresponds
1292
// to any of these sessions, a new one will be negotiated.
1293
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
11✔
1294
        // We'll first update our in-memory state.
11✔
1295
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
11✔
1296
        if err != nil {
12✔
1297
                return err
1✔
1298
        }
1✔
1299

1300
        // If an address was provided, then we're only meant to remove the
1301
        // address from the tower.
1302
        if msg.addr != nil {
12✔
1303
                return nil
2✔
1304
        }
2✔
1305

1306
        // Otherwise, the tower should no longer be used for future session
1307
        // negotiations and backups.
1308

1309
        pubKey := msg.pubKey.SerializeCompressed()
8✔
1310
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
8✔
1311
        if err != nil {
8✔
1312
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1313
                        "%v", pubKey, err)
×
1314
        }
×
1315
        for sessionID := range sessions {
17✔
1316
                delete(c.candidateSessions, sessionID)
9✔
1317

9✔
1318
                // Shutdown the session so that any pending updates are
9✔
1319
                // replayed back onto the main task pipeline.
9✔
1320
                err = c.activeSessions.StopAndRemove(sessionID, true)
9✔
1321
                if err != nil {
9✔
1322
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1323
                                err)
×
1324
                }
×
1325
        }
1326

1327
        // If our active session queue corresponds to the stale tower, we'll
1328
        // proceed to negotiate a new one.
1329
        if c.sessionQueue != nil {
14✔
1330
                towerKey := c.sessionQueue.tower.IdentityKey
6✔
1331

6✔
1332
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
12✔
1333
                        c.sessionQueue = nil
6✔
1334
                }
6✔
1335
        }
1336

1337
        return nil
8✔
1338
}
1339

1340
// registeredTowers retrieves the list of watchtowers registered with the
1341
// client.
1342
func (c *client) registeredTowers(towers []*wtdb.Tower,
1343
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1344

×
1345
        // Generate a filter that will fetch all the client's sessions
×
1346
        // regardless of if they are active or not.
×
1347
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1348

×
1349
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1350
        if err != nil {
×
1351
                return nil, err
×
1352
        }
×
1353

1354
        // Construct a lookup map that coalesces all the sessions for a specific
1355
        // watchtower.
1356
        towerSessions := make(
×
1357
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1358
        )
×
1359
        for id, s := range clientSessions {
×
1360
                sessions, ok := towerSessions[s.TowerID]
×
1361
                if !ok {
×
1362
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1363
                        towerSessions[s.TowerID] = sessions
×
1364
                }
×
1365
                sessions[id] = s
×
1366
        }
1367

1368
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1369
        for _, tower := range towers {
×
1370
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1371
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1372
                        Tower:                  tower,
×
1373
                        Sessions:               towerSessions[tower.ID],
×
1374
                        ActiveSessionCandidate: isActive,
×
1375
                })
×
1376
        }
×
1377

1378
        return registeredTowers, nil
×
1379
}
1380

1381
// lookupTower retrieves the info of sessions held with the given tower handled
1382
// by this client.
1383
func (c *client) lookupTower(tower *wtdb.Tower,
1384
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
2✔
1385

2✔
1386
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
2✔
1387

2✔
1388
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
2✔
1389
        if err != nil {
2✔
1390
                return nil, err
×
1391
        }
×
1392

1393
        return &RegisteredTower{
2✔
1394
                Tower:                  tower,
2✔
1395
                Sessions:               towerSessions,
2✔
1396
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
2✔
1397
        }, nil
2✔
1398
}
1399

1400
// getStats returns the in-memory statistics of the client since startup.
1401
func (c *client) getStats() ClientStats {
×
1402
        return c.stats.getStatsCopy()
×
1403
}
×
1404

1405
// policy returns the active client policy configuration.
1406
func (c *client) policy() wtpolicy.Policy {
6✔
1407
        return c.cfg.Policy
6✔
1408
}
6✔
1409

1410
// logMessage writes information about a message received from a remote peer,
1411
// using directional prepositions to signal whether the message was sent or
1412
// received.
1413
func (c *client) logMessage(
1414
        peer wtserver.Peer, msg wtwire.Message, read bool) {
2,218✔
1415

2,218✔
1416
        var action = "Received"
2,218✔
1417
        var preposition = "from"
2,218✔
1418
        if !read {
3,453✔
1419
                action = "Sending"
1,235✔
1420
                preposition = "to"
1,235✔
1421
        }
1,235✔
1422

1423
        summary := wtwire.MessageSummary(msg)
2,218✔
1424
        if len(summary) > 0 {
3,549✔
1425
                summary = "(" + summary + ")"
1,331✔
1426
        }
1,331✔
1427

1428
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,218✔
1429
                preposition, peer.RemotePub().SerializeCompressed(),
2,218✔
1430
                peer.RemoteAddr())
2,218✔
1431
}
1432

1433
func newRandomDelay(max uint32) (uint32, error) {
4✔
1434
        var maxDelay big.Int
4✔
1435
        maxDelay.SetUint64(uint64(max))
4✔
1436

4✔
1437
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
4✔
1438
        if err != nil {
4✔
1439
                return 0, err
×
1440
        }
×
1441

1442
        return uint32(randDelay.Uint64()), nil
4✔
1443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc