• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.56
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "maps"
9
        "math/big"
10
        "net"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btclog/v2"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/keychain"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
21
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
22
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
23
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
24
)
25

26
const (
27
        // DefaultReadTimeout specifies the default duration we will wait during
28
        // a read before breaking out of a blocking read.
29
        DefaultReadTimeout = 15 * time.Second
30

31
        // DefaultWriteTimeout specifies the default duration we will wait
32
        // during a write before breaking out of a blocking write.
33
        DefaultWriteTimeout = 15 * time.Second
34

35
        // DefaultStatInterval specifies the default interval between logging
36
        // metrics about the client's operation.
37
        DefaultStatInterval = time.Minute
38

39
        // DefaultSessionCloseRange is the range over which we will generate a
40
        // random number of blocks to delay closing a session after its last
41
        // channel has been closed.
42
        DefaultSessionCloseRange = 288
43

44
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
45
        // in the in-memory queue.
46
        DefaultMaxTasksInMemQueue = 2000
47
)
48

49
// genSessionFilter constructs a filter that can be used to select sessions only
50
// if they match the policy of the client (namely anchor vs legacy). If
51
// activeOnly is set, then only active sessions will be returned.
52
func (c *client) genSessionFilter(
53
        activeOnly bool) wtdb.ClientSessionFilterFn {
3✔
54

3✔
55
        return func(session *wtdb.ClientSession) bool {
6✔
56
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
6✔
57
                        return false
3✔
58
                }
3✔
59

60
                if !activeOnly {
6✔
61
                        return true
3✔
62
                }
3✔
63

64
                return session.Status == wtdb.CSessionActive
3✔
65
        }
66
}
67

68
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
69
// function that will filter out any sessions that have been exhausted. A
70
// session is considered exhausted only if it has no un-acked updates and the
71
// sequence number of the session is equal to the max updates of the session
72
// policy.
73
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
3✔
74
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
6✔
75
                return session.SeqNum < session.Policy.MaxUpdates ||
3✔
76
                        numUnAcked > 0
3✔
77
        }
3✔
78
}
79

80
// RegisteredTower encompasses information about a registered watchtower with
81
// the client.
82
type RegisteredTower struct {
83
        *wtdb.Tower
84

85
        // Sessions is the set of sessions corresponding to the watchtower.
86
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
87

88
        // ActiveSessionCandidate determines whether the watchtower is currently
89
        // being considered for new sessions.
90
        ActiveSessionCandidate bool
91
}
92

93
// BreachRetributionBuilder is a function that can be used to construct a
94
// BreachRetribution from a channel ID and a commitment height.
95
type BreachRetributionBuilder func(id lnwire.ChannelID,
96
        commitHeight uint64) (*lnwallet.BreachRetribution,
97
        channeldb.ChannelType, error)
98

99
// newTowerMsg is an internal message we'll use within the client to signal
100
// that a new tower can be considered.
101
type newTowerMsg struct {
102
        // tower holds the info about the new Tower or new tower address
103
        // required to connect to it.
104
        tower *Tower
105

106
        // errChan is the channel through which we'll send a response back to
107
        // the caller when handling their request.
108
        //
109
        // NOTE: This channel must be buffered.
110
        errChan chan error
111
}
112

113
// staleTowerMsg is an internal message we'll use within the client to
114
// signal that a tower should no longer be considered.
115
type staleTowerMsg struct {
116
        // id is the unique database identifier for the tower.
117
        id wtdb.TowerID
118

119
        // pubKey is the identifying public key of the watchtower.
120
        pubKey *btcec.PublicKey
121

122
        // addr is an optional field that when set signals that the address
123
        // should be removed from the watchtower's set of addresses, indicating
124
        // that it is stale. If it's not set, then the watchtower should be
125
        // no longer be considered for new sessions.
126
        addr net.Addr
127

128
        // errChan is the channel through which we'll send a response back to
129
        // the caller when handling their request.
130
        //
131
        // NOTE: This channel must be buffered.
132
        errChan chan error
133
}
134

135
// deactivateTowerMsg is an internal message we'll use within the TowerClient
136
// to signal that a tower should be marked as inactive.
137
type deactivateTowerMsg struct {
138
        // id is the unique database identifier for the tower.
139
        id wtdb.TowerID
140

141
        // pubKey is the identifying public key of the watchtower.
142
        pubKey *btcec.PublicKey
143

144
        // errChan is the channel through which we'll send a response back to
145
        // the caller when handling their request.
146
        //
147
        // NOTE: This channel must be buffered.
148
        errChan chan error
149
}
150

151
// terminateSessMsg is an internal message we'll use within the TowerClient to
152
// signal that a session should be terminated.
153
type terminateSessMsg struct {
154
        // id is the session identifier.
155
        id wtdb.SessionID
156

157
        // errChan is the channel through which we'll send a response back to
158
        // the caller when handling their request.
159
        //
160
        // NOTE: This channel must be buffered.
161
        errChan chan error
162
}
163

164
// clientCfg holds the configuration values required by a client.
165
type clientCfg struct {
166
        *Config
167

168
        // Policy is the session policy the client will propose when creating
169
        // new sessions with the tower. If the policy differs from any active
170
        // sessions recorded in the database, those sessions will be ignored and
171
        // new sessions will be requested immediately.
172
        Policy wtpolicy.Policy
173

174
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
175
}
176

177
// client manages backing up revoked states for all states that fall under a
178
// specific policy type.
179
type client struct {
180
        cfg *clientCfg
181

182
        log btclog.Logger
183

184
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
185

186
        negotiator        SessionNegotiator
187
        candidateTowers   TowerCandidateIterator
188
        candidateSessions map[wtdb.SessionID]*ClientSession
189
        activeSessions    *sessionQueueSet
190

191
        sessionQueue *sessionQueue
192
        prevTask     *wtdb.BackupID
193

194
        statTicker *time.Ticker
195
        stats      *clientStats
196

197
        newTowers         chan *newTowerMsg
198
        staleTowers       chan *staleTowerMsg
199
        deactivateTowers  chan *deactivateTowerMsg
200
        terminateSessions chan *terminateSessMsg
201

202
        wg   sync.WaitGroup
203
        quit chan struct{}
204
}
205

206
// newClient initializes a new client from the provided clientCfg. An error is
207
// returned if the client could not be initialized.
208
func newClient(cfg *clientCfg) (*client, error) {
3✔
209
        identifier, err := cfg.Policy.BlobType.Identifier()
3✔
210
        if err != nil {
3✔
211
                return nil, err
×
212
        }
×
213
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
3✔
214

3✔
215
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
3✔
216
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
3✔
217
                queueDB, cfg.MaxTasksInMemQueue, plog,
3✔
218
        )
3✔
219
        if err != nil {
3✔
220
                return nil, err
×
221
        }
×
222

223
        c := &client{
3✔
224
                cfg:               cfg,
3✔
225
                log:               plog,
3✔
226
                pipeline:          queue,
3✔
227
                activeSessions:    newSessionQueueSet(),
3✔
228
                statTicker:        time.NewTicker(DefaultStatInterval),
3✔
229
                stats:             new(clientStats),
3✔
230
                newTowers:         make(chan *newTowerMsg),
3✔
231
                staleTowers:       make(chan *staleTowerMsg),
3✔
232
                deactivateTowers:  make(chan *deactivateTowerMsg),
3✔
233
                terminateSessions: make(chan *terminateSessMsg),
3✔
234
                quit:              make(chan struct{}),
3✔
235
        }
3✔
236

3✔
237
        candidateTowers := newTowerListIterator()
3✔
238
        perActiveTower := func(tower *Tower) {
6✔
239
                // If the tower has already been marked as active, then there is
3✔
240
                // no need to add it to the iterator again.
3✔
241
                if candidateTowers.IsActive(tower.ID) {
3✔
242
                        return
×
243
                }
×
244

245
                c.log.Infof("Using private watchtower %x, offering policy %s",
3✔
246
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
3✔
247

3✔
248
                // Add the tower to the set of candidate towers.
3✔
249
                candidateTowers.AddCandidate(tower)
3✔
250
        }
251

252
        // Load all candidate sessions and towers from the database into the
253
        // client. We will use any of these sessions if their policies match the
254
        // current policy of the client, otherwise they will be ignored and new
255
        // sessions will be requested.
256
        candidateSessions, err := getTowerAndSessionCandidates(
3✔
257
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
3✔
258
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
3✔
259
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
3✔
260
        )
3✔
261
        if err != nil {
3✔
262
                return nil, err
×
263
        }
×
264

265
        c.candidateTowers = candidateTowers
3✔
266
        c.candidateSessions = candidateSessions
3✔
267

3✔
268
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
3✔
269
                DB:            cfg.DB,
3✔
270
                SecretKeyRing: cfg.SecretKeyRing,
3✔
271
                Policy:        cfg.Policy,
3✔
272
                ChainHash:     cfg.ChainHash,
3✔
273
                SendMessage:   c.sendMessage,
3✔
274
                ReadMessage:   c.readMessage,
3✔
275
                Dial:          c.dial,
3✔
276
                Candidates:    c.candidateTowers,
3✔
277
                MinBackoff:    cfg.MinBackoff,
3✔
278
                MaxBackoff:    cfg.MaxBackoff,
3✔
279
                Log:           plog,
3✔
280
        })
3✔
281

3✔
282
        return c, nil
3✔
283
}
284

285
// getTowerAndSessionCandidates loads all the towers from the DB and then
286
// fetches the sessions for each of tower. Sessions are only collected if they
287
// pass the sessionFilter check. If a tower has a session that does pass the
288
// sessionFilter check then the perActiveTower call-back will be called on that
289
// tower.
290
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
291
        perActiveTower func(tower *Tower),
292
        opts ...wtdb.ClientSessionListOption) (
293
        map[wtdb.SessionID]*ClientSession, error) {
3✔
294

3✔
295
        // Fetch all active towers from the DB.
3✔
296
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
6✔
297
                return tower.Status == wtdb.TowerStatusActive
3✔
298
        })
3✔
299
        if err != nil {
3✔
300
                return nil, err
×
301
        }
×
302

303
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
3✔
304
        for _, dbTower := range towers {
6✔
305
                tower, err := NewTowerFromDBTower(dbTower)
3✔
306
                if err != nil {
3✔
307
                        return nil, err
×
308
                }
×
309

310
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
3✔
311
                if err != nil {
3✔
312
                        return nil, err
×
313
                }
×
314

315
                for _, s := range sessions {
6✔
316
                        cs, err := NewClientSessionFromDBSession(
3✔
317
                                s, tower, keyRing,
3✔
318
                        )
3✔
319
                        if err != nil {
3✔
320
                                return nil, err
×
321
                        }
×
322

323
                        // Add the session to the set of candidate sessions.
324
                        candidateSessions[s.ID] = cs
3✔
325
                }
326

327
                perActiveTower(tower)
3✔
328
        }
329

330
        return candidateSessions, nil
3✔
331
}
332

333
// getClientSessions retrieves the client sessions for a particular tower if
334
// specified, otherwise all client sessions for all towers are retrieved. An
335
// optional filter can be provided to filter out any undesired client sessions.
336
//
337
// NOTE: This method should only be used when deserialization of a
338
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
339
// ListClientSessions method should be used.
340
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
341
        opts ...wtdb.ClientSessionListOption) (
342
        map[wtdb.SessionID]*ClientSession, error) {
3✔
343

3✔
344
        dbSessions, err := db.ListClientSessions(forTower, opts...)
3✔
345
        if err != nil {
3✔
346
                return nil, err
×
347
        }
×
348

349
        // Reload the tower from disk using the tower ID contained in each
350
        // candidate session. We will also rederive any session keys needed to
351
        // be able to communicate with the towers and authenticate session
352
        // requests. This prevents us from having to store the private keys on
353
        // disk.
354
        sessions := make(map[wtdb.SessionID]*ClientSession)
3✔
355
        for _, s := range dbSessions {
6✔
356
                dbTower, err := db.LoadTowerByID(s.TowerID)
3✔
357
                if err != nil {
3✔
358
                        return nil, err
×
359
                }
×
360

361
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
3✔
362
                        Family: keychain.KeyFamilyTowerSession,
3✔
363
                        Index:  s.KeyIndex,
3✔
364
                })
3✔
365
                if err != nil {
3✔
366
                        return nil, err
×
367
                }
×
368

369
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
3✔
370

3✔
371
                tower, err := NewTowerFromDBTower(dbTower)
3✔
372
                if err != nil {
3✔
373
                        return nil, err
×
374
                }
×
375

376
                sessions[s.ID] = &ClientSession{
3✔
377
                        ID:                s.ID,
3✔
378
                        ClientSessionBody: s.ClientSessionBody,
3✔
379
                        Tower:             tower,
3✔
380
                        SessionKeyECDH:    sessionKeyECDH,
3✔
381
                }
3✔
382
        }
383

384
        return sessions, nil
3✔
385
}
386

387
// start initializes the watchtower client by loading or negotiating an active
388
// session and then begins processing backup tasks from the request pipeline.
389
func (c *client) start() error {
3✔
390
        c.log.Infof("Watchtower client starting")
3✔
391

3✔
392
        // First, restart a session queue for any sessions that have
3✔
393
        // committed but unacked state updates. This ensures that these
3✔
394
        // sessions will be able to flush the committed updates after a
3✔
395
        // restart.
3✔
396
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
3✔
397
        for _, session := range c.candidateSessions {
6✔
398
                committedUpdates, err := fetchCommittedUpdates(
3✔
399
                        &session.ID,
3✔
400
                )
3✔
401
                if err != nil {
3✔
402
                        return err
×
403
                }
×
404

405
                if len(committedUpdates) > 0 {
3✔
UNCOV
406
                        c.log.Infof("Starting session=%s to process "+
×
UNCOV
407
                                "%d committed backups", session.ID,
×
UNCOV
408
                                len(committedUpdates))
×
UNCOV
409

×
UNCOV
410
                        c.initActiveQueue(session, committedUpdates)
×
UNCOV
411
                }
×
412
        }
413

414
        // Now start the session negotiator, which will allow us to request new
415
        // session as soon as the backupDispatcher starts up.
416
        err := c.negotiator.Start()
3✔
417
        if err != nil {
3✔
418
                return err
×
419
        }
×
420

421
        // Start the task pipeline to which new backup tasks will be
422
        // submitted from active links.
423
        err = c.pipeline.Start()
3✔
424
        if err != nil {
3✔
425
                return err
×
426
        }
×
427

428
        c.wg.Add(1)
3✔
429
        go c.backupDispatcher()
3✔
430

3✔
431
        c.log.Infof("Watchtower client started successfully")
3✔
432

3✔
433
        return nil
3✔
434
}
435

436
// stop idempotently initiates a graceful shutdown of the watchtower client.
437
func (c *client) stop() error {
3✔
438
        var returnErr error
3✔
439
        c.log.Debugf("Stopping watchtower client")
3✔
440

3✔
441
        // 1. Stop the session negotiator.
3✔
442
        err := c.negotiator.Stop()
3✔
443
        if err != nil {
3✔
444
                returnErr = err
×
445
        }
×
446

447
        // 2. Stop the backup dispatcher and any other goroutines.
448
        close(c.quit)
3✔
449
        c.wg.Wait()
3✔
450

3✔
451
        // 3. If there was a left over 'prevTask' from the backup
3✔
452
        // dispatcher, replay that onto the pipeline.
3✔
453
        if c.prevTask != nil {
3✔
454
                err = c.pipeline.QueueBackupID(c.prevTask)
×
455
                if err != nil {
×
456
                        returnErr = err
×
457
                }
×
458
        }
459

460
        // 4. Shutdown all active session queues in parallel. These will
461
        // exit once all unhandled updates have been replayed to the
462
        // task pipeline.
463
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
6✔
464
                return func() {
6✔
465
                        err := s.Stop(false)
3✔
466
                        if err != nil {
3✔
467
                                c.log.Errorf("could not stop session "+
×
468
                                        "queue: %s: %v", s.ID(), err)
×
469

×
470
                                returnErr = err
×
471
                        }
×
472
                }
473
        })
474

475
        // 5. Shutdown the backup queue, which will prevent any further
476
        // updates from being accepted.
477
        if err = c.pipeline.Stop(); err != nil {
3✔
478
                returnErr = err
×
479
        }
×
480

481
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
3✔
482

3✔
483
        return returnErr
3✔
484
}
485

486
// backupState initiates a request to back up a particular revoked state. If the
487
// method returns nil, the backup is guaranteed to be successful unless the:
488
//   - justice transaction would create dust outputs when trying to abide by the
489
//     negotiated policy, or
490
//   - breached outputs contain too little value to sweep at the target sweep
491
//     fee rate.
492
func (c *client) backupState(chanID *lnwire.ChannelID,
493
        stateNum uint64) error {
3✔
494

3✔
495
        id := &wtdb.BackupID{
3✔
496
                ChanID:       *chanID,
3✔
497
                CommitHeight: stateNum,
3✔
498
        }
3✔
499

3✔
500
        return c.pipeline.QueueBackupID(id)
3✔
501
}
3✔
502

503
// nextSessionQueue attempts to fetch an active session from our set of
504
// candidate sessions. Candidate sessions with a differing policy from the
505
// active client's advertised policy will be ignored, but may be resumed if the
506
// client is restarted with a matching policy. If no candidates were found, nil
507
// is returned to signal that we need to request a new policy.
508
func (c *client) nextSessionQueue() (*sessionQueue, error) {
3✔
509
        // Select any candidate session at random, and remove it from the set of
3✔
510
        // candidate sessions.
3✔
511
        var candidateSession *ClientSession
3✔
512
        for id, sessionInfo := range c.candidateSessions {
6✔
513
                delete(c.candidateSessions, id)
3✔
514

3✔
515
                // Skip any sessions with policies that don't match the current
3✔
516
                // TxPolicy, as they would result in different justice
3✔
517
                // transactions from what is requested. These can be used again
3✔
518
                // if the client changes their configuration and restarting.
3✔
519
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
3✔
520
                        continue
×
521
                }
522

523
                candidateSession = sessionInfo
3✔
524
                break
3✔
525
        }
526

527
        // If none of the sessions could be used or none were found, we'll
528
        // return nil to signal that we need another session to be negotiated.
529
        if candidateSession == nil {
3✔
530
                return nil, nil
×
531
        }
×
532

533
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
3✔
534
                &candidateSession.ID,
3✔
535
        )
3✔
536
        if err != nil {
3✔
537
                return nil, err
×
538
        }
×
539

540
        // Initialize the session queue and spin it up, so it can begin handling
541
        // updates. If the queue was already made active on startup, this will
542
        // simply return the existing session queue from the set.
543
        return c.getOrInitActiveQueue(candidateSession, updates), nil
3✔
544
}
545

546
// stopAndRemoveSession stops the session with the given ID and removes it from
547
// the in-memory active sessions set.
548
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
3✔
549
        return c.activeSessions.StopAndRemove(id, final)
3✔
550
}
3✔
551

552
// deleteSessionFromTower dials the tower that we created the session with and
553
// attempts to send the tower the DeleteSession message.
554
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
3✔
555
        // First, we check if we have already loaded this tower in our
3✔
556
        // candidate towers iterator.
3✔
557
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
3✔
558
        if errors.Is(err, ErrTowerNotInIterator) {
3✔
559
                // If not, then we attempt to load it from the DB.
×
560
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
561
                if err != nil {
×
562
                        return err
×
563
                }
×
564

565
                tower, err = NewTowerFromDBTower(dbTower)
×
566
                if err != nil {
×
567
                        return err
×
568
                }
×
569
        } else if err != nil {
3✔
570
                return err
×
571
        }
×
572

573
        session, err := NewClientSessionFromDBSession(
3✔
574
                sess, tower, c.cfg.SecretKeyRing,
3✔
575
        )
3✔
576
        if err != nil {
3✔
577
                return err
×
578
        }
×
579

580
        localInit := wtwire.NewInitMessage(
3✔
581
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
3✔
582
                c.cfg.ChainHash,
3✔
583
        )
3✔
584

3✔
585
        var (
3✔
586
                conn wtserver.Peer
3✔
587

3✔
588
                // addrIterator is a copy of the tower's address iterator.
3✔
589
                // We use this copy so that iterating through the addresses does
3✔
590
                // not affect any other threads using this iterator.
3✔
591
                addrIterator = tower.Addresses.Copy()
3✔
592
                towerAddr    = addrIterator.Peek()
3✔
593
        )
3✔
594
        // Attempt to dial the tower with its available addresses.
3✔
595
        for {
6✔
596
                conn, err = c.dial(
3✔
597
                        session.SessionKeyECDH, &lnwire.NetAddress{
3✔
598
                                IdentityKey: tower.IdentityKey,
3✔
599
                                Address:     towerAddr,
3✔
600
                        },
3✔
601
                )
3✔
602
                if err != nil {
3✔
603
                        // If there are more addrs available, immediately try
×
604
                        // those.
×
605
                        nextAddr, iteratorErr := addrIterator.Next()
×
606
                        if iteratorErr == nil {
×
607
                                towerAddr = nextAddr
×
608
                                continue
×
609
                        }
610

611
                        // Otherwise, if we have exhausted the address list,
612
                        // exit.
613
                        addrIterator.Reset()
×
614

×
615
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
616
                                "available addresses",
×
617
                                tower.IdentityKey.SerializeCompressed())
×
618
                }
619

620
                break
3✔
621
        }
622
        defer conn.Close()
3✔
623

3✔
624
        // Send Init to tower.
3✔
625
        err = c.sendMessage(conn, localInit)
3✔
626
        if err != nil {
3✔
627
                return err
×
628
        }
×
629

630
        // Receive Init from tower.
631
        remoteMsg, err := c.readMessage(conn)
3✔
632
        if err != nil {
3✔
633
                return err
×
634
        }
×
635

636
        remoteInit, ok := remoteMsg.(*wtwire.Init)
3✔
637
        if !ok {
3✔
638
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
639
                        towerAddr, remoteMsg)
×
640
        }
×
641

642
        // Validate Init.
643
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
3✔
644
        if err != nil {
3✔
645
                return err
×
646
        }
×
647

648
        // Send DeleteSession to tower.
649
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
3✔
650
        if err != nil {
3✔
651
                return err
×
652
        }
×
653

654
        // Receive DeleteSessionReply from tower.
655
        remoteMsg, err = c.readMessage(conn)
3✔
656
        if err != nil {
3✔
657
                return err
×
658
        }
×
659

660
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
3✔
661
        if !ok {
3✔
662
                return fmt.Errorf("watchtower %s responded with %T to "+
×
663
                        "DeleteSession", towerAddr, remoteMsg)
×
664
        }
×
665

666
        switch deleteSessionReply.Code {
3✔
667
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
3✔
668
                return nil
3✔
669
        default:
×
670
                return fmt.Errorf("received error code %v in "+
×
671
                        "DeleteSessionReply when attempting to delete "+
×
672
                        "session from tower", deleteSessionReply.Code)
×
673
        }
674
}
675

676
// backupDispatcher processes events coming from the taskPipeline and is
677
// responsible for detecting when the client needs to renegotiate a session to
678
// fulfill continuing demand. The event loop exits if the client is quit.
679
//
680
// NOTE: This method MUST be run as a goroutine.
681
func (c *client) backupDispatcher() {
3✔
682
        defer c.wg.Done()
3✔
683

3✔
684
        c.log.Tracef("Starting backup dispatcher")
3✔
685
        defer c.log.Tracef("Stopping backup dispatcher")
3✔
686

3✔
687
        for {
6✔
688
                switch {
3✔
689

690
                // No active session queue and no additional sessions.
691
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
3✔
692
                        c.log.Infof("Requesting new session.")
3✔
693

3✔
694
                        // Immediately request a new session.
3✔
695
                        c.negotiator.RequestSession()
3✔
696

3✔
697
                        // Wait until we receive the newly negotiated session.
3✔
698
                        // All backups sent in the meantime are queued in the
3✔
699
                        // revoke queue, as we cannot process them.
3✔
700
                awaitSession:
3✔
701
                        select {
3✔
702
                        case session := <-c.negotiator.NewSessions():
3✔
703
                                c.log.Infof("Acquired new session with id=%s",
3✔
704
                                        session.ID)
3✔
705
                                c.candidateSessions[session.ID] = session
3✔
706
                                c.stats.sessionAcquired()
3✔
707

3✔
708
                                // We'll continue to choose the newly negotiated
3✔
709
                                // session as our active session queue.
3✔
710
                                continue
3✔
711

712
                        case <-c.statTicker.C:
×
713
                                c.log.Infof("Client stats: %s", c.stats)
×
714

715
                        // A new tower has been requested to be added. We'll
716
                        // update our persisted and in-memory state and consider
717
                        // its corresponding sessions, if any, as new
718
                        // candidates.
719
                        case msg := <-c.newTowers:
3✔
720
                                msg.errChan <- c.handleNewTower(msg.tower)
3✔
721

722
                        // A tower has been requested to be removed. We'll
723
                        // only allow removal of it if the address in question
724
                        // is not currently being used for session negotiation.
725
                        case msg := <-c.staleTowers:
3✔
726
                                msg.errChan <- c.handleStaleTower(msg)
3✔
727

728
                        // A tower has been requested to be de-activated. We'll
729
                        // only allow this if the tower is not currently being
730
                        // used for session negotiation.
731
                        case msg := <-c.deactivateTowers:
×
732
                                msg.errChan <- c.handleDeactivateTower(msg)
×
733

734
                        // A request has come through to terminate a session.
735
                        case msg := <-c.terminateSessions:
×
736
                                msg.errChan <- c.handleTerminateSession(msg)
×
737

738
                        case <-c.quit:
3✔
739
                                return
3✔
740
                        }
741

742
                        // Instead of looping, we'll jump back into the select
743
                        // case and await the delivery of the session to prevent
744
                        // us from re-requesting additional sessions.
745
                        goto awaitSession
3✔
746

747
                // No active session queue but have additional sessions.
748
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
3✔
749
                        // We've exhausted the prior session, we'll pop another
3✔
750
                        // from the remaining sessions and continue processing
3✔
751
                        // backup tasks.
3✔
752
                        var err error
3✔
753
                        c.sessionQueue, err = c.nextSessionQueue()
3✔
754
                        if err != nil {
3✔
755
                                c.log.Errorf("error fetching next session "+
×
756
                                        "queue: %v", err)
×
757
                        }
×
758

759
                        if c.sessionQueue != nil {
6✔
760
                                c.log.Debugf("Loaded next candidate session "+
3✔
761
                                        "queue id=%s", c.sessionQueue.ID())
3✔
762
                        }
3✔
763

764
                // Have active session queue, process backups.
765
                case c.sessionQueue != nil:
3✔
766
                        if c.prevTask != nil {
3✔
UNCOV
767
                                c.processTask(c.prevTask)
×
UNCOV
768

×
UNCOV
769
                                // Continue to ensure the sessionQueue is
×
UNCOV
770
                                // properly initialized before attempting to
×
UNCOV
771
                                // process more tasks from the pipeline.
×
UNCOV
772
                                continue
×
773
                        }
774

775
                        // Normal operation where new tasks are read from the
776
                        // pipeline.
777
                        select {
3✔
778

779
                        // If any sessions are negotiated while we have an
780
                        // active session queue, queue them for future use.
781
                        // This shouldn't happen with the current design, so
782
                        // it doesn't hurt to select here just in case. In the
783
                        // future, we will likely allow more asynchrony so that
784
                        // we can request new sessions before the session is
785
                        // fully empty, which this case would handle.
786
                        case session := <-c.negotiator.NewSessions():
×
787
                                c.log.Warnf("Acquired new session with id=%s "+
×
788
                                        "while processing tasks", session.ID)
×
789
                                c.candidateSessions[session.ID] = session
×
790
                                c.stats.sessionAcquired()
×
791

792
                        case <-c.statTicker.C:
×
793
                                c.log.Infof("Client stats: %s", c.stats)
×
794

795
                        // Process each backup task serially from the queue of
796
                        // revoked states.
797
                        case task, ok := <-c.pipeline.NextBackupID():
3✔
798
                                // All backups in the pipeline have been
3✔
799
                                // processed, it is now safe to exit.
3✔
800
                                if !ok {
3✔
801
                                        return
×
802
                                }
×
803

804
                                c.log.Debugf("Processing %v", task)
3✔
805

3✔
806
                                c.stats.taskReceived()
3✔
807
                                c.processTask(task)
3✔
808

809
                        // A new tower has been requested to be added. We'll
810
                        // update our persisted and in-memory state and consider
811
                        // its corresponding sessions, if any, as new
812
                        // candidates.
813
                        case msg := <-c.newTowers:
3✔
814
                                msg.errChan <- c.handleNewTower(msg.tower)
3✔
815

816
                        // A tower has been removed, so we'll remove certain
817
                        // information that's persisted and also in our
818
                        // in-memory state depending on the request, and set any
819
                        // of its corresponding candidate sessions as inactive.
820
                        case msg := <-c.staleTowers:
3✔
821
                                msg.errChan <- c.handleStaleTower(msg)
3✔
822

823
                        // A tower has been requested to be de-activated.
824
                        case msg := <-c.deactivateTowers:
3✔
825
                                msg.errChan <- c.handleDeactivateTower(msg)
3✔
826

827
                        // A request has come through to terminate a session.
828
                        case msg := <-c.terminateSessions:
3✔
829
                                msg.errChan <- c.handleTerminateSession(msg)
3✔
830

831
                        case <-c.quit:
3✔
832
                                return
3✔
833
                        }
834
                }
835
        }
836
}
837

838
// processTask attempts to schedule the given backupTask on the active
839
// sessionQueue. The task will either be accepted or rejected, after which the
840
// appropriate modifications to the client's state machine will be made. After
841
// every invocation of processTask, the caller should ensure that the
842
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
843
// that are rejected because the active sessionQueue is full will be cached as
844
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
845
func (c *client) processTask(task *wtdb.BackupID) {
3✔
846
        script, ok := c.cfg.getSweepScript(task.ChanID)
3✔
847
        if !ok {
3✔
848
                log.Infof("not processing task for unregistered channel: %s",
×
849
                        task.ChanID)
×
850

×
851
                return
×
852
        }
×
853

854
        backupTask := newBackupTask(*task, script)
3✔
855

3✔
856
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
3✔
857
        if accepted {
6✔
858
                c.taskAccepted(task, status)
3✔
859
        } else {
3✔
UNCOV
860
                c.taskRejected(task, status)
×
UNCOV
861
        }
×
862
}
863

864
// taskAccepted processes the acceptance of a task by a sessionQueue depending
865
// on the state the sessionQueue is in *after* the task is added. The client's
866
// prevTask is always removed as a result of this call. The client's
867
// sessionQueue will be removed if accepting the task left the sessionQueue in
868
// an exhausted state.
869
func (c *client) taskAccepted(task *wtdb.BackupID,
870
        newStatus sessionQueueStatus) {
3✔
871

3✔
872
        c.log.Infof("Queued %v successfully for session %v", task,
3✔
873
                c.sessionQueue.ID())
3✔
874

3✔
875
        c.stats.taskAccepted()
3✔
876

3✔
877
        // If this task was accepted, we discard anything held in the prevTask.
3✔
878
        // Either it was nil before, or is the task which was just accepted.
3✔
879
        c.prevTask = nil
3✔
880

3✔
881
        switch newStatus {
3✔
882

883
        // The sessionQueue still has capacity after accepting this task.
884
        case sessionQueueAvailable:
3✔
885

886
        // The sessionQueue is full after accepting this task, so we will need
887
        // to request a new one before proceeding.
888
        case sessionQueueExhausted:
3✔
889
                c.stats.sessionExhausted()
3✔
890

3✔
891
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
3✔
892

3✔
893
                // This task left the session exhausted, set it to nil and
3✔
894
                // proceed to the next loop, so we can consume another
3✔
895
                // pre-negotiated session or request another.
3✔
896
                c.sessionQueue = nil
3✔
897
        }
898
}
899

900
// taskRejected process the rejection of a task by a sessionQueue depending on
901
// the state the was in *before* the task was rejected. The client's prevTask
902
// will cache the task if the sessionQueue was exhausted beforehand, and nil
903
// the sessionQueue to find a new session. If the sessionQueue was not
904
// exhausted and not shutting down, the client marks the task as ineligible, as
905
// this implies we couldn't construct a valid justice transaction given the
906
// session's policy.
907
func (c *client) taskRejected(task *wtdb.BackupID,
UNCOV
908
        curStatus sessionQueueStatus) {
×
UNCOV
909

×
UNCOV
910
        switch curStatus {
×
911

912
        // The sessionQueue has available capacity but the task was rejected,
913
        // this indicates that the task was ineligible for backup.
UNCOV
914
        case sessionQueueAvailable:
×
UNCOV
915
                c.stats.taskIneligible()
×
UNCOV
916

×
UNCOV
917
                c.log.Infof("Ignoring ineligible %v", task)
×
UNCOV
918

×
UNCOV
919
                err := c.cfg.DB.MarkBackupIneligible(
×
UNCOV
920
                        task.ChanID, task.CommitHeight,
×
UNCOV
921
                )
×
UNCOV
922
                if err != nil {
×
923
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
924
                                task, err)
×
925

×
926
                        // It is safe to not handle this error, even if we could
×
927
                        // not persist the result. At worst, this task may be
×
928
                        // reprocessed on a subsequent start up, and will either
×
929
                        // succeed do a change in session parameters or fail in
×
930
                        // the same manner.
×
931
                }
×
932

933
                // If this task was rejected *and* the session had available
934
                // capacity, we discard anything held in the prevTask. Either it
935
                // was nil before, or is the task which was just rejected.
UNCOV
936
                c.prevTask = nil
×
937

938
        // The sessionQueue rejected the task because it is full, we will stash
939
        // this task and try to add it to the next available sessionQueue.
UNCOV
940
        case sessionQueueExhausted:
×
UNCOV
941
                c.stats.sessionExhausted()
×
UNCOV
942

×
UNCOV
943
                c.log.Debugf("Session %v exhausted, %v queued for next session",
×
UNCOV
944
                        c.sessionQueue.ID(), task)
×
UNCOV
945

×
UNCOV
946
                // Cache the task that we pulled off, so that we can process it
×
UNCOV
947
                // once a new session queue is available.
×
UNCOV
948
                c.sessionQueue = nil
×
UNCOV
949
                c.prevTask = task
×
950

951
        // The sessionQueue rejected the task because it is shutting down. We
952
        // will stash this task and try to add it to the next available
953
        // sessionQueue.
954
        case sessionQueueShuttingDown:
×
955
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
956
                        "next session", c.sessionQueue.ID(), task)
×
957

×
958
                // Cache the task that we pulled off, so that we can process it
×
959
                // once a new session queue is available.
×
960
                c.sessionQueue = nil
×
961
                c.prevTask = task
×
962
        }
963
}
964

965
// dial connects the peer at addr using privKey as our secret key for the
966
// connection. The connection will use the configured Net's resolver to resolve
967
// the address for either Tor or clear net connections.
968
func (c *client) dial(localKey keychain.SingleKeyECDH,
969
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
3✔
970

3✔
971
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
3✔
972
}
3✔
973

974
// readMessage receives and parses the next message from the given Peer. An
975
// error is returned if a message is not received before the server's read
976
// timeout, the read off the wire failed, or the message could not be
977
// deserialized.
978
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
3✔
979
        // Set a read timeout to ensure we drop the connection if nothing is
3✔
980
        // received in a timely manner.
3✔
981
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
3✔
982
        if err != nil {
3✔
983
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
984
                c.log.Errorf("Unable to read msg: %v", err)
×
985
                return nil, err
×
986
        }
×
987

988
        // Pull the next message off the wire,
989
        rawMsg, err := peer.ReadNextMessage()
3✔
990
        if err != nil {
3✔
UNCOV
991
                err = fmt.Errorf("unable to read message: %w", err)
×
UNCOV
992
                c.log.Errorf("Unable to read msg: %v", err)
×
UNCOV
993
                return nil, err
×
UNCOV
994
        }
×
995

996
        // Parse the received message according to the watchtower wire
997
        // specification.
998
        msgReader := bytes.NewReader(rawMsg)
3✔
999
        msg, err := wtwire.ReadMessage(msgReader, 0)
3✔
1000
        if err != nil {
3✔
1001
                err = fmt.Errorf("unable to parse message: %w", err)
×
1002
                c.log.Errorf("Unable to read msg: %v", err)
×
1003
                return nil, err
×
1004
        }
×
1005

1006
        c.logMessage(peer, msg, true)
3✔
1007

3✔
1008
        return msg, nil
3✔
1009
}
1010

1011
// sendMessage sends a watchtower wire message to the target peer.
1012
func (c *client) sendMessage(peer wtserver.Peer,
1013
        msg wtwire.Message) error {
3✔
1014

3✔
1015
        // Encode the next wire message into the buffer.
3✔
1016
        // TODO(conner): use buffer pool
3✔
1017
        var b bytes.Buffer
3✔
1018
        _, err := wtwire.WriteMessage(&b, msg, 0)
3✔
1019
        if err != nil {
3✔
1020
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1021
                c.log.Errorf("Unable to send msg: %v", err)
×
1022
                return err
×
1023
        }
×
1024

1025
        // Set the write deadline for the connection, ensuring we drop the
1026
        // connection if nothing is sent in a timely manner.
1027
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
3✔
1028
        if err != nil {
3✔
1029
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1030
                c.log.Errorf("Unable to send msg: %v", err)
×
1031
                return err
×
1032
        }
×
1033

1034
        c.logMessage(peer, msg, false)
3✔
1035

3✔
1036
        // Write out the full message to the remote peer.
3✔
1037
        _, err = peer.Write(b.Bytes())
3✔
1038
        if err != nil {
3✔
UNCOV
1039
                c.log.Errorf("Unable to send msg: %v", err)
×
UNCOV
1040
        }
×
1041
        return err
3✔
1042
}
1043

1044
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1045
// database and supplying it with the resources needed by the client.
1046
func (c *client) newSessionQueue(s *ClientSession,
1047
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1048

3✔
1049
        return newSessionQueue(&sessionQueueConfig{
3✔
1050
                ClientSession:          s,
3✔
1051
                ChainHash:              c.cfg.ChainHash,
3✔
1052
                Dial:                   c.dial,
3✔
1053
                ReadMessage:            c.readMessage,
3✔
1054
                SendMessage:            c.sendMessage,
3✔
1055
                Signer:                 c.cfg.Signer,
3✔
1056
                DB:                     c.cfg.DB,
3✔
1057
                MinBackoff:             c.cfg.MinBackoff,
3✔
1058
                MaxBackoff:             c.cfg.MaxBackoff,
3✔
1059
                Log:                    c.log,
3✔
1060
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
3✔
1061
                TaskPipeline:           c.pipeline,
3✔
1062
        }, updates)
3✔
1063
}
3✔
1064

1065
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1066
// passed ClientSession. If it exists, the active sessionQueue is returned.
1067
// Otherwise, a new sessionQueue is initialized and added to the set.
1068
func (c *client) getOrInitActiveQueue(s *ClientSession,
1069
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1070

3✔
1071
        if sq, ok := c.activeSessions.Get(s.ID); ok {
3✔
UNCOV
1072
                return sq
×
UNCOV
1073
        }
×
1074

1075
        return c.initActiveQueue(s, updates)
3✔
1076
}
1077

1078
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1079
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1080
// so that it can deliver any committed updates or begin accepting newly
1081
// assigned tasks.
1082
func (c *client) initActiveQueue(s *ClientSession,
1083
        updates []wtdb.CommittedUpdate) *sessionQueue {
3✔
1084

3✔
1085
        // Initialize the session queue, providing it with all the resources it
3✔
1086
        // requires from the client instance.
3✔
1087
        sq := c.newSessionQueue(s, updates)
3✔
1088

3✔
1089
        // Add the session queue as an active session so that we remember to
3✔
1090
        // stop it on shutdown. This method will also start the queue so that it
3✔
1091
        // can be active in processing newly assigned tasks or to upload
3✔
1092
        // previously committed updates.
3✔
1093
        c.activeSessions.AddAndStart(sq)
3✔
1094

3✔
1095
        return sq
3✔
1096
}
3✔
1097

1098
// terminateSession sets the given session's status to CSessionTerminal meaning
1099
// that it will not be used again.
1100
func (c *client) terminateSession(id wtdb.SessionID) error {
3✔
1101
        errChan := make(chan error, 1)
3✔
1102

3✔
1103
        select {
3✔
1104
        case c.terminateSessions <- &terminateSessMsg{
1105
                id:      id,
1106
                errChan: errChan,
1107
        }:
3✔
1108
        case <-c.pipeline.quit:
×
1109
                return ErrClientExiting
×
1110
        }
1111

1112
        select {
3✔
1113
        case err := <-errChan:
3✔
1114
                return err
3✔
1115
        case <-c.pipeline.quit:
×
1116
                return ErrClientExiting
×
1117
        }
1118
}
1119

1120
// handleTerminateSession handles a request to terminate a session. It will
1121
// first shut down the session if it is part of the active session set, then
1122
// it will ensure that the active session queue is set reset if it is using the
1123
// session in question. Finally, the session's status in the DB will be updated.
1124
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
3✔
1125
        id := msg.id
3✔
1126

3✔
1127
        delete(c.candidateSessions, id)
3✔
1128

3✔
1129
        err := c.activeSessions.StopAndRemove(id, true)
3✔
1130
        if err != nil {
3✔
1131
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1132
        }
×
1133

1134
        // If our active session queue corresponds to the session being
1135
        // terminated, then we'll proceed to negotiate a new one.
1136
        if c.sessionQueue != nil {
6✔
1137
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
6✔
1138
                        c.sessionQueue = nil
3✔
1139
                }
3✔
1140
        }
1141

1142
        return nil
3✔
1143
}
1144

1145
// deactivateTower sends a tower deactivation request to the backupDispatcher
1146
// where it will be handled synchronously. The request should result in all the
1147
// sessions that we have with the given tower being shutdown and removed from
1148
// our in-memory set of active sessions.
1149
func (c *client) deactivateTower(id wtdb.TowerID,
1150
        pubKey *btcec.PublicKey) error {
3✔
1151

3✔
1152
        errChan := make(chan error, 1)
3✔
1153

3✔
1154
        select {
3✔
1155
        case c.deactivateTowers <- &deactivateTowerMsg{
1156
                id:      id,
1157
                pubKey:  pubKey,
1158
                errChan: errChan,
1159
        }:
3✔
1160
        case <-c.pipeline.quit:
×
1161
                return ErrClientExiting
×
1162
        }
1163

1164
        select {
3✔
1165
        case err := <-errChan:
3✔
1166
                return err
3✔
1167
        case <-c.pipeline.quit:
×
1168
                return ErrClientExiting
×
1169
        }
1170
}
1171

1172
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1173
// it from the in-memory candidate set, and we will also stop any active
1174
// sessions we have with this tower.
1175
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
3✔
1176
        // Remove the tower from our in-memory candidate set so that it is not
3✔
1177
        // used for any new session negotiations.
3✔
1178
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
3✔
1179
        if err != nil {
3✔
1180
                return err
×
1181
        }
×
1182

1183
        pubKey := msg.pubKey.SerializeCompressed()
3✔
1184
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
3✔
1185
        if err != nil {
3✔
1186
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1187
                        "%v", pubKey, err)
×
1188
        }
×
1189

1190
        // Iterate over all the sessions we have for this tower and remove them
1191
        // from our candidate set and also from our set of started, active
1192
        // sessions.
1193
        for sessionID := range sessions {
6✔
1194
                delete(c.candidateSessions, sessionID)
3✔
1195

3✔
1196
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1197
                if err != nil {
3✔
1198
                        return fmt.Errorf("could not stop session %s: %w",
×
1199
                                sessionID, err)
×
1200
                }
×
1201
        }
1202

1203
        // If our active session queue corresponds to the stale tower, we'll
1204
        // proceed to negotiate a new one.
1205
        if c.sessionQueue != nil {
6✔
1206
                towerKey := c.sessionQueue.tower.IdentityKey
3✔
1207

3✔
1208
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
6✔
1209
                        c.sessionQueue = nil
3✔
1210
                }
3✔
1211
        }
1212

1213
        return nil
3✔
1214
}
1215

1216
// addTower adds a new watchtower reachable at the given address and considers
1217
// it for new sessions. If the watchtower already exists, then any new addresses
1218
// included will be considered when dialing it for session negotiations and
1219
// backups.
1220
func (c *client) addTower(tower *Tower) error {
3✔
1221
        errChan := make(chan error, 1)
3✔
1222

3✔
1223
        select {
3✔
1224
        case c.newTowers <- &newTowerMsg{
1225
                tower:   tower,
1226
                errChan: errChan,
1227
        }:
3✔
1228
        case <-c.pipeline.quit:
×
1229
                return ErrClientExiting
×
1230
        }
1231

1232
        select {
3✔
1233
        case err := <-errChan:
3✔
1234
                return err
3✔
1235
        case <-c.pipeline.quit:
×
1236
                return ErrClientExiting
×
1237
        }
1238
}
1239

1240
// handleNewTower handles a request for a new tower to be added. If the tower
1241
// already exists, then its corresponding sessions, if any, will be set
1242
// considered as candidates.
1243
func (c *client) handleNewTower(tower *Tower) error {
3✔
1244
        c.candidateTowers.AddCandidate(tower)
3✔
1245

3✔
1246
        // Include all of its corresponding sessions to our set of candidates.
3✔
1247
        sessions, err := getClientSessions(
3✔
1248
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
3✔
1249
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
3✔
1250
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
3✔
1251
        )
3✔
1252
        if err != nil {
3✔
1253
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1254
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1255
        }
×
1256
        maps.Copy(c.candidateSessions, sessions)
3✔
1257

3✔
1258
        return nil
3✔
1259
}
1260

1261
// removeTower removes a watchtower from being considered for future session
1262
// negotiations and from being used for any subsequent backups until it's added
1263
// again. If an address is provided, then this call only serves as a way of
1264
// removing the address from the watchtower instead.
1265
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1266
        addr net.Addr) error {
3✔
1267

3✔
1268
        errChan := make(chan error, 1)
3✔
1269

3✔
1270
        select {
3✔
1271
        case c.staleTowers <- &staleTowerMsg{
1272
                id:      id,
1273
                pubKey:  pubKey,
1274
                addr:    addr,
1275
                errChan: errChan,
1276
        }:
3✔
1277
        case <-c.pipeline.quit:
×
1278
                return ErrClientExiting
×
1279
        }
1280

1281
        select {
3✔
1282
        case err := <-errChan:
3✔
1283
                return err
3✔
1284
        case <-c.pipeline.quit:
×
1285
                return ErrClientExiting
×
1286
        }
1287
}
1288

1289
// handleStaleTower handles a request for an existing tower to be removed. If
1290
// none of the tower's sessions have pending updates, then they will become
1291
// inactive and removed as candidates. If the active session queue corresponds
1292
// to any of these sessions, a new one will be negotiated.
1293
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
3✔
1294
        // We'll first update our in-memory state.
3✔
1295
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
3✔
1296
        if err != nil {
3✔
UNCOV
1297
                return err
×
UNCOV
1298
        }
×
1299

1300
        // If an address was provided, then we're only meant to remove the
1301
        // address from the tower.
1302
        if msg.addr != nil {
3✔
UNCOV
1303
                return nil
×
UNCOV
1304
        }
×
1305

1306
        // Otherwise, the tower should no longer be used for future session
1307
        // negotiations and backups.
1308

1309
        pubKey := msg.pubKey.SerializeCompressed()
3✔
1310
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
3✔
1311
        if err != nil {
3✔
1312
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1313
                        "%v", pubKey, err)
×
1314
        }
×
1315
        for sessionID := range sessions {
6✔
1316
                delete(c.candidateSessions, sessionID)
3✔
1317

3✔
1318
                // Shutdown the session so that any pending updates are
3✔
1319
                // replayed back onto the main task pipeline.
3✔
1320
                err = c.activeSessions.StopAndRemove(sessionID, true)
3✔
1321
                if err != nil {
3✔
1322
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1323
                                err)
×
1324
                }
×
1325
        }
1326

1327
        // If our active session queue corresponds to the stale tower, we'll
1328
        // proceed to negotiate a new one.
1329
        if c.sessionQueue != nil {
6✔
1330
                towerKey := c.sessionQueue.tower.IdentityKey
3✔
1331

3✔
1332
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
6✔
1333
                        c.sessionQueue = nil
3✔
1334
                }
3✔
1335
        }
1336

1337
        return nil
3✔
1338
}
1339

1340
// registeredTowers retrieves the list of watchtowers registered with the
1341
// client.
1342
func (c *client) registeredTowers(towers []*wtdb.Tower,
1343
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1344

×
1345
        // Generate a filter that will fetch all the client's sessions
×
1346
        // regardless of if they are active or not.
×
1347
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1348

×
1349
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1350
        if err != nil {
×
1351
                return nil, err
×
1352
        }
×
1353

1354
        // Construct a lookup map that coalesces all the sessions for a specific
1355
        // watchtower.
1356
        towerSessions := make(
×
1357
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1358
        )
×
1359
        for id, s := range clientSessions {
×
1360
                sessions, ok := towerSessions[s.TowerID]
×
1361
                if !ok {
×
1362
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1363
                        towerSessions[s.TowerID] = sessions
×
1364
                }
×
1365
                sessions[id] = s
×
1366
        }
1367

1368
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1369
        for _, tower := range towers {
×
1370
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1371
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1372
                        Tower:                  tower,
×
1373
                        Sessions:               towerSessions[tower.ID],
×
1374
                        ActiveSessionCandidate: isActive,
×
1375
                })
×
1376
        }
×
1377

1378
        return registeredTowers, nil
×
1379
}
1380

1381
// lookupTower retrieves the info of sessions held with the given tower handled
1382
// by this client.
1383
func (c *client) lookupTower(tower *wtdb.Tower,
1384
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
3✔
1385

3✔
1386
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
3✔
1387

3✔
1388
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
3✔
1389
        if err != nil {
3✔
1390
                return nil, err
×
1391
        }
×
1392

1393
        return &RegisteredTower{
3✔
1394
                Tower:                  tower,
3✔
1395
                Sessions:               towerSessions,
3✔
1396
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
3✔
1397
        }, nil
3✔
1398
}
1399

1400
// getStats returns the in-memory statistics of the client since startup.
1401
func (c *client) getStats() ClientStats {
3✔
1402
        return c.stats.getStatsCopy()
3✔
1403
}
3✔
1404

1405
// policy returns the active client policy configuration.
1406
func (c *client) policy() wtpolicy.Policy {
3✔
1407
        return c.cfg.Policy
3✔
1408
}
3✔
1409

1410
// logMessage writes information about a message received from a remote peer,
1411
// using directional prepositions to signal whether the message was sent or
1412
// received.
1413
func (c *client) logMessage(
1414
        peer wtserver.Peer, msg wtwire.Message, read bool) {
3✔
1415

3✔
1416
        var action = "Received"
3✔
1417
        var preposition = "from"
3✔
1418
        if !read {
6✔
1419
                action = "Sending"
3✔
1420
                preposition = "to"
3✔
1421
        }
3✔
1422

1423
        summary := wtwire.MessageSummary(msg)
3✔
1424
        if len(summary) > 0 {
6✔
1425
                summary = "(" + summary + ")"
3✔
1426
        }
3✔
1427

1428
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
3✔
1429
                preposition, peer.RemotePub().SerializeCompressed(),
3✔
1430
                peer.RemoteAddr())
3✔
1431
}
1432

1433
func newRandomDelay(max uint32) (uint32, error) {
3✔
1434
        var maxDelay big.Int
3✔
1435
        maxDelay.SetUint64(uint64(max))
3✔
1436

3✔
1437
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
3✔
1438
        if err != nil {
3✔
1439
                return 0, err
×
1440
        }
×
1441

1442
        return uint32(randDelay.Uint64()), nil
3✔
1443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc