• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.96
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "math/big"
9
        "net"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btclog/v2"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/keychain"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
20
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
21
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
22
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
23
)
24

25
const (
26
        // DefaultReadTimeout specifies the default duration we will wait during
27
        // a read before breaking out of a blocking read.
28
        DefaultReadTimeout = 15 * time.Second
29

30
        // DefaultWriteTimeout specifies the default duration we will wait
31
        // during a write before breaking out of a blocking write.
32
        DefaultWriteTimeout = 15 * time.Second
33

34
        // DefaultStatInterval specifies the default interval between logging
35
        // metrics about the client's operation.
36
        DefaultStatInterval = time.Minute
37

38
        // DefaultSessionCloseRange is the range over which we will generate a
39
        // random number of blocks to delay closing a session after its last
40
        // channel has been closed.
41
        DefaultSessionCloseRange = 288
42

43
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
44
        // in the in-memory queue.
45
        DefaultMaxTasksInMemQueue = 2000
46
)
47

48
// genSessionFilter constructs a filter that can be used to select sessions only
49
// if they match the policy of the client (namely anchor vs legacy). If
50
// activeOnly is set, then only active sessions will be returned.
51
func (c *client) genSessionFilter(
52
        activeOnly bool) wtdb.ClientSessionFilterFn {
85✔
53

85✔
54
        return func(session *wtdb.ClientSession) bool {
113✔
55
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
28✔
UNCOV
56
                        return false
×
UNCOV
57
                }
×
58

59
                if !activeOnly {
32✔
60
                        return true
4✔
61
                }
4✔
62

63
                return session.Status == wtdb.CSessionActive
24✔
64
        }
65
}
66

67
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
68
// function that will filter out any sessions that have been exhausted. A
69
// session is considered exhausted only if it has no un-acked updates and the
70
// sequence number of the session is equal to the max updates of the session
71
// policy.
72
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
83✔
73
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
107✔
74
                return session.SeqNum < session.Policy.MaxUpdates ||
24✔
75
                        numUnAcked > 0
24✔
76
        }
24✔
77
}
78

79
// RegisteredTower encompasses information about a registered watchtower with
80
// the client.
81
type RegisteredTower struct {
82
        *wtdb.Tower
83

84
        // Sessions is the set of sessions corresponding to the watchtower.
85
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
86

87
        // ActiveSessionCandidate determines whether the watchtower is currently
88
        // being considered for new sessions.
89
        ActiveSessionCandidate bool
90
}
91

92
// BreachRetributionBuilder is a function that can be used to construct a
93
// BreachRetribution from a channel ID and a commitment height.
94
type BreachRetributionBuilder func(id lnwire.ChannelID,
95
        commitHeight uint64) (*lnwallet.BreachRetribution,
96
        channeldb.ChannelType, error)
97

98
// newTowerMsg is an internal message we'll use within the client to signal
99
// that a new tower can be considered.
100
type newTowerMsg struct {
101
        // tower holds the info about the new Tower or new tower address
102
        // required to connect to it.
103
        tower *Tower
104

105
        // errChan is the channel through which we'll send a response back to
106
        // the caller when handling their request.
107
        //
108
        // NOTE: This channel must be buffered.
109
        errChan chan error
110
}
111

112
// staleTowerMsg is an internal message we'll use within the client to
113
// signal that a tower should no longer be considered.
114
type staleTowerMsg struct {
115
        // id is the unique database identifier for the tower.
116
        id wtdb.TowerID
117

118
        // pubKey is the identifying public key of the watchtower.
119
        pubKey *btcec.PublicKey
120

121
        // addr is an optional field that when set signals that the address
122
        // should be removed from the watchtower's set of addresses, indicating
123
        // that it is stale. If it's not set, then the watchtower should be
124
        // no longer be considered for new sessions.
125
        addr net.Addr
126

127
        // errChan is the channel through which we'll send a response back to
128
        // the caller when handling their request.
129
        //
130
        // NOTE: This channel must be buffered.
131
        errChan chan error
132
}
133

134
// deactivateTowerMsg is an internal message we'll use within the TowerClient
135
// to signal that a tower should be marked as inactive.
136
type deactivateTowerMsg struct {
137
        // id is the unique database identifier for the tower.
138
        id wtdb.TowerID
139

140
        // pubKey is the identifying public key of the watchtower.
141
        pubKey *btcec.PublicKey
142

143
        // errChan is the channel through which we'll send a response back to
144
        // the caller when handling their request.
145
        //
146
        // NOTE: This channel must be buffered.
147
        errChan chan error
148
}
149

150
// terminateSessMsg is an internal message we'll use within the TowerClient to
151
// signal that a session should be terminated.
152
type terminateSessMsg struct {
153
        // id is the session identifier.
154
        id wtdb.SessionID
155

156
        // errChan is the channel through which we'll send a response back to
157
        // the caller when handling their request.
158
        //
159
        // NOTE: This channel must be buffered.
160
        errChan chan error
161
}
162

163
// clientCfg holds the configuration values required by a client.
164
type clientCfg struct {
165
        *Config
166

167
        // Policy is the session policy the client will propose when creating
168
        // new sessions with the tower. If the policy differs from any active
169
        // sessions recorded in the database, those sessions will be ignored and
170
        // new sessions will be requested immediately.
171
        Policy wtpolicy.Policy
172

173
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
174
}
175

176
// client manages backing up revoked states for all states that fall under a
177
// specific policy type.
178
type client struct {
179
        cfg *clientCfg
180

181
        log btclog.Logger
182

183
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
184

185
        negotiator        SessionNegotiator
186
        candidateTowers   TowerCandidateIterator
187
        candidateSessions map[wtdb.SessionID]*ClientSession
188
        activeSessions    *sessionQueueSet
189

190
        sessionQueue *sessionQueue
191
        prevTask     *wtdb.BackupID
192

193
        statTicker *time.Ticker
194
        stats      *clientStats
195

196
        newTowers         chan *newTowerMsg
197
        staleTowers       chan *staleTowerMsg
198
        deactivateTowers  chan *deactivateTowerMsg
199
        terminateSessions chan *terminateSessMsg
200

201
        wg   sync.WaitGroup
202
        quit chan struct{}
203
}
204

205
// newClient initializes a new client from the provided clientCfg. An error is
206
// returned if the client could not be initialized.
207
func newClient(cfg *clientCfg) (*client, error) {
36✔
208
        identifier, err := cfg.Policy.BlobType.Identifier()
36✔
209
        if err != nil {
36✔
210
                return nil, err
×
211
        }
×
212
        plog := log.WithPrefix(fmt.Sprintf("(%s)", identifier))
36✔
213

36✔
214
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
36✔
215
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
36✔
216
                queueDB, cfg.MaxTasksInMemQueue, plog,
36✔
217
        )
36✔
218
        if err != nil {
36✔
219
                return nil, err
×
220
        }
×
221

222
        c := &client{
36✔
223
                cfg:               cfg,
36✔
224
                log:               plog,
36✔
225
                pipeline:          queue,
36✔
226
                activeSessions:    newSessionQueueSet(),
36✔
227
                statTicker:        time.NewTicker(DefaultStatInterval),
36✔
228
                stats:             new(clientStats),
36✔
229
                newTowers:         make(chan *newTowerMsg),
36✔
230
                staleTowers:       make(chan *staleTowerMsg),
36✔
231
                deactivateTowers:  make(chan *deactivateTowerMsg),
36✔
232
                terminateSessions: make(chan *terminateSessMsg),
36✔
233
                quit:              make(chan struct{}),
36✔
234
        }
36✔
235

36✔
236
        candidateTowers := newTowerListIterator()
36✔
237
        perActiveTower := func(tower *Tower) {
45✔
238
                // If the tower has already been marked as active, then there is
9✔
239
                // no need to add it to the iterator again.
9✔
240
                if candidateTowers.IsActive(tower.ID) {
9✔
241
                        return
×
242
                }
×
243

244
                c.log.Infof("Using private watchtower %x, offering policy %s",
9✔
245
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
9✔
246

9✔
247
                // Add the tower to the set of candidate towers.
9✔
248
                candidateTowers.AddCandidate(tower)
9✔
249
        }
250

251
        // Load all candidate sessions and towers from the database into the
252
        // client. We will use any of these sessions if their policies match the
253
        // current policy of the client, otherwise they will be ignored and new
254
        // sessions will be requested.
255
        candidateSessions, err := getTowerAndSessionCandidates(
36✔
256
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
36✔
257
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
36✔
258
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
36✔
259
        )
36✔
260
        if err != nil {
36✔
261
                return nil, err
×
262
        }
×
263

264
        c.candidateTowers = candidateTowers
36✔
265
        c.candidateSessions = candidateSessions
36✔
266

36✔
267
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
36✔
268
                DB:            cfg.DB,
36✔
269
                SecretKeyRing: cfg.SecretKeyRing,
36✔
270
                Policy:        cfg.Policy,
36✔
271
                ChainHash:     cfg.ChainHash,
36✔
272
                SendMessage:   c.sendMessage,
36✔
273
                ReadMessage:   c.readMessage,
36✔
274
                Dial:          c.dial,
36✔
275
                Candidates:    c.candidateTowers,
36✔
276
                MinBackoff:    cfg.MinBackoff,
36✔
277
                MaxBackoff:    cfg.MaxBackoff,
36✔
278
                Log:           plog,
36✔
279
        })
36✔
280

36✔
281
        return c, nil
36✔
282
}
283

284
// getTowerAndSessionCandidates loads all the towers from the DB and then
285
// fetches the sessions for each of tower. Sessions are only collected if they
286
// pass the sessionFilter check. If a tower has a session that does pass the
287
// sessionFilter check then the perActiveTower call-back will be called on that
288
// tower.
289
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
290
        perActiveTower func(tower *Tower),
291
        opts ...wtdb.ClientSessionListOption) (
292
        map[wtdb.SessionID]*ClientSession, error) {
36✔
293

36✔
294
        // Fetch all active towers from the DB.
36✔
295
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
45✔
296
                return tower.Status == wtdb.TowerStatusActive
9✔
297
        })
9✔
298
        if err != nil {
36✔
299
                return nil, err
×
300
        }
×
301

302
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
36✔
303
        for _, dbTower := range towers {
45✔
304
                tower, err := NewTowerFromDBTower(dbTower)
9✔
305
                if err != nil {
9✔
306
                        return nil, err
×
307
                }
×
308

309
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
9✔
310
                if err != nil {
9✔
311
                        return nil, err
×
312
                }
×
313

314
                for _, s := range sessions {
17✔
315
                        cs, err := NewClientSessionFromDBSession(
8✔
316
                                s, tower, keyRing,
8✔
317
                        )
8✔
318
                        if err != nil {
8✔
319
                                return nil, err
×
320
                        }
×
321

322
                        // Add the session to the set of candidate sessions.
323
                        candidateSessions[s.ID] = cs
8✔
324
                }
325

326
                perActiveTower(tower)
9✔
327
        }
328

329
        return candidateSessions, nil
36✔
330
}
331

332
// getClientSessions retrieves the client sessions for a particular tower if
333
// specified, otherwise all client sessions for all towers are retrieved. An
334
// optional filter can be provided to filter out any undesired client sessions.
335
//
336
// NOTE: This method should only be used when deserialization of a
337
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
338
// ListClientSessions method should be used.
339
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
340
        opts ...wtdb.ClientSessionListOption) (
341
        map[wtdb.SessionID]*ClientSession, error) {
47✔
342

47✔
343
        dbSessions, err := db.ListClientSessions(forTower, opts...)
47✔
344
        if err != nil {
47✔
345
                return nil, err
×
346
        }
×
347

348
        // Reload the tower from disk using the tower ID contained in each
349
        // candidate session. We will also rederive any session keys needed to
350
        // be able to communicate with the towers and authenticate session
351
        // requests. This prevents us from having to store the private keys on
352
        // disk.
353
        sessions := make(map[wtdb.SessionID]*ClientSession)
47✔
354
        for _, s := range dbSessions {
58✔
355
                dbTower, err := db.LoadTowerByID(s.TowerID)
11✔
356
                if err != nil {
11✔
357
                        return nil, err
×
358
                }
×
359

360
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
11✔
361
                        Family: keychain.KeyFamilyTowerSession,
11✔
362
                        Index:  s.KeyIndex,
11✔
363
                })
11✔
364
                if err != nil {
11✔
365
                        return nil, err
×
366
                }
×
367

368
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
11✔
369

11✔
370
                tower, err := NewTowerFromDBTower(dbTower)
11✔
371
                if err != nil {
11✔
372
                        return nil, err
×
373
                }
×
374

375
                sessions[s.ID] = &ClientSession{
11✔
376
                        ID:                s.ID,
11✔
377
                        ClientSessionBody: s.ClientSessionBody,
11✔
378
                        Tower:             tower,
11✔
379
                        SessionKeyECDH:    sessionKeyECDH,
11✔
380
                }
11✔
381
        }
382

383
        return sessions, nil
47✔
384
}
385

386
// start initializes the watchtower client by loading or negotiating an active
387
// session and then begins processing backup tasks from the request pipeline.
388
func (c *client) start() error {
36✔
389
        c.log.Infof("Watchtower client starting")
36✔
390

36✔
391
        // First, restart a session queue for any sessions that have
36✔
392
        // committed but unacked state updates. This ensures that these
36✔
393
        // sessions will be able to flush the committed updates after a
36✔
394
        // restart.
36✔
395
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
36✔
396
        for _, session := range c.candidateSessions {
44✔
397
                committedUpdates, err := fetchCommittedUpdates(
8✔
398
                        &session.ID,
8✔
399
                )
8✔
400
                if err != nil {
8✔
401
                        return err
×
402
                }
×
403

404
                if len(committedUpdates) > 0 {
10✔
405
                        c.log.Infof("Starting session=%s to process "+
2✔
406
                                "%d committed backups", session.ID,
2✔
407
                                len(committedUpdates))
2✔
408

2✔
409
                        c.initActiveQueue(session, committedUpdates)
2✔
410
                }
2✔
411
        }
412

413
        // Now start the session negotiator, which will allow us to request new
414
        // session as soon as the backupDispatcher starts up.
415
        err := c.negotiator.Start()
36✔
416
        if err != nil {
36✔
417
                return err
×
418
        }
×
419

420
        // Start the task pipeline to which new backup tasks will be
421
        // submitted from active links.
422
        err = c.pipeline.Start()
36✔
423
        if err != nil {
36✔
424
                return err
×
425
        }
×
426

427
        c.wg.Add(1)
36✔
428
        go c.backupDispatcher()
36✔
429

36✔
430
        c.log.Infof("Watchtower client started successfully")
36✔
431

36✔
432
        return nil
36✔
433
}
434

435
// stop idempotently initiates a graceful shutdown of the watchtower client.
436
func (c *client) stop() error {
36✔
437
        var returnErr error
36✔
438
        c.log.Debugf("Stopping watchtower client")
36✔
439

36✔
440
        // 1. Stop the session negotiator.
36✔
441
        err := c.negotiator.Stop()
36✔
442
        if err != nil {
36✔
443
                returnErr = err
×
444
        }
×
445

446
        // 2. Stop the backup dispatcher and any other goroutines.
447
        close(c.quit)
36✔
448
        c.wg.Wait()
36✔
449

36✔
450
        // 3. If there was a left over 'prevTask' from the backup
36✔
451
        // dispatcher, replay that onto the pipeline.
36✔
452
        if c.prevTask != nil {
36✔
453
                err = c.pipeline.QueueBackupID(c.prevTask)
×
454
                if err != nil {
×
455
                        returnErr = err
×
456
                }
×
457
        }
458

459
        // 4. Shutdown all active session queues in parallel. These will
460
        // exit once all unhandled updates have been replayed to the
461
        // task pipeline.
462
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
102✔
463
                return func() {
132✔
464
                        err := s.Stop(false)
66✔
465
                        if err != nil {
66✔
466
                                c.log.Errorf("could not stop session "+
×
467
                                        "queue: %s: %v", s.ID(), err)
×
468

×
469
                                returnErr = err
×
470
                        }
×
471
                }
472
        })
473

474
        // 5. Shutdown the backup queue, which will prevent any further
475
        // updates from being accepted.
476
        if err = c.pipeline.Stop(); err != nil {
36✔
477
                returnErr = err
×
478
        }
×
479

480
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
36✔
481

36✔
482
        return returnErr
36✔
483
}
484

485
// backupState initiates a request to back up a particular revoked state. If the
486
// method returns nil, the backup is guaranteed to be successful unless the:
487
//   - justice transaction would create dust outputs when trying to abide by the
488
//     negotiated policy, or
489
//   - breached outputs contain too little value to sweep at the target sweep
490
//     fee rate.
491
func (c *client) backupState(chanID *lnwire.ChannelID,
492
        stateNum uint64) error {
469✔
493

469✔
494
        id := &wtdb.BackupID{
469✔
495
                ChanID:       *chanID,
469✔
496
                CommitHeight: stateNum,
469✔
497
        }
469✔
498

469✔
499
        return c.pipeline.QueueBackupID(id)
469✔
500
}
469✔
501

502
// nextSessionQueue attempts to fetch an active session from our set of
503
// candidate sessions. Candidate sessions with a differing policy from the
504
// active client's advertised policy will be ignored, but may be resumed if the
505
// client is restarted with a matching policy. If no candidates were found, nil
506
// is returned to signal that we need to request a new policy.
507
func (c *client) nextSessionQueue() (*sessionQueue, error) {
83✔
508
        // Select any candidate session at random, and remove it from the set of
83✔
509
        // candidate sessions.
83✔
510
        var candidateSession *ClientSession
83✔
511
        for id, sessionInfo := range c.candidateSessions {
166✔
512
                delete(c.candidateSessions, id)
83✔
513

83✔
514
                // Skip any sessions with policies that don't match the current
83✔
515
                // TxPolicy, as they would result in different justice
83✔
516
                // transactions from what is requested. These can be used again
83✔
517
                // if the client changes their configuration and restarting.
83✔
518
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
83✔
519
                        continue
×
520
                }
521

522
                candidateSession = sessionInfo
83✔
523
                break
83✔
524
        }
525

526
        // If none of the sessions could be used or none were found, we'll
527
        // return nil to signal that we need another session to be negotiated.
528
        if candidateSession == nil {
83✔
529
                return nil, nil
×
530
        }
×
531

532
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
83✔
533
                &candidateSession.ID,
83✔
534
        )
83✔
535
        if err != nil {
83✔
536
                return nil, err
×
537
        }
×
538

539
        // Initialize the session queue and spin it up, so it can begin handling
540
        // updates. If the queue was already made active on startup, this will
541
        // simply return the existing session queue from the set.
542
        return c.getOrInitActiveQueue(candidateSession, updates), nil
83✔
543
}
544

545
// stopAndRemoveSession stops the session with the given ID and removes it from
546
// the in-memory active sessions set.
547
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
4✔
548
        return c.activeSessions.StopAndRemove(id, final)
4✔
549
}
4✔
550

551
// deleteSessionFromTower dials the tower that we created the session with and
552
// attempts to send the tower the DeleteSession message.
553
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
4✔
554
        // First, we check if we have already loaded this tower in our
4✔
555
        // candidate towers iterator.
4✔
556
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
4✔
557
        if errors.Is(err, ErrTowerNotInIterator) {
4✔
558
                // If not, then we attempt to load it from the DB.
×
559
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
560
                if err != nil {
×
561
                        return err
×
562
                }
×
563

564
                tower, err = NewTowerFromDBTower(dbTower)
×
565
                if err != nil {
×
566
                        return err
×
567
                }
×
568
        } else if err != nil {
4✔
569
                return err
×
570
        }
×
571

572
        session, err := NewClientSessionFromDBSession(
4✔
573
                sess, tower, c.cfg.SecretKeyRing,
4✔
574
        )
4✔
575
        if err != nil {
4✔
576
                return err
×
577
        }
×
578

579
        localInit := wtwire.NewInitMessage(
4✔
580
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
4✔
581
                c.cfg.ChainHash,
4✔
582
        )
4✔
583

4✔
584
        var (
4✔
585
                conn wtserver.Peer
4✔
586

4✔
587
                // addrIterator is a copy of the tower's address iterator.
4✔
588
                // We use this copy so that iterating through the addresses does
4✔
589
                // not affect any other threads using this iterator.
4✔
590
                addrIterator = tower.Addresses.Copy()
4✔
591
                towerAddr    = addrIterator.Peek()
4✔
592
        )
4✔
593
        // Attempt to dial the tower with its available addresses.
4✔
594
        for {
8✔
595
                conn, err = c.dial(
4✔
596
                        session.SessionKeyECDH, &lnwire.NetAddress{
4✔
597
                                IdentityKey: tower.IdentityKey,
4✔
598
                                Address:     towerAddr,
4✔
599
                        },
4✔
600
                )
4✔
601
                if err != nil {
4✔
602
                        // If there are more addrs available, immediately try
×
603
                        // those.
×
604
                        nextAddr, iteratorErr := addrIterator.Next()
×
605
                        if iteratorErr == nil {
×
606
                                towerAddr = nextAddr
×
607
                                continue
×
608
                        }
609

610
                        // Otherwise, if we have exhausted the address list,
611
                        // exit.
612
                        addrIterator.Reset()
×
613

×
614
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
615
                                "available addresses",
×
616
                                tower.IdentityKey.SerializeCompressed())
×
617
                }
618

619
                break
4✔
620
        }
621
        defer conn.Close()
4✔
622

4✔
623
        // Send Init to tower.
4✔
624
        err = c.sendMessage(conn, localInit)
4✔
625
        if err != nil {
4✔
626
                return err
×
627
        }
×
628

629
        // Receive Init from tower.
630
        remoteMsg, err := c.readMessage(conn)
4✔
631
        if err != nil {
4✔
632
                return err
×
633
        }
×
634

635
        remoteInit, ok := remoteMsg.(*wtwire.Init)
4✔
636
        if !ok {
4✔
637
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
638
                        towerAddr, remoteMsg)
×
639
        }
×
640

641
        // Validate Init.
642
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
4✔
643
        if err != nil {
4✔
644
                return err
×
645
        }
×
646

647
        // Send DeleteSession to tower.
648
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
4✔
649
        if err != nil {
4✔
650
                return err
×
651
        }
×
652

653
        // Receive DeleteSessionReply from tower.
654
        remoteMsg, err = c.readMessage(conn)
4✔
655
        if err != nil {
4✔
656
                return err
×
657
        }
×
658

659
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
4✔
660
        if !ok {
4✔
661
                return fmt.Errorf("watchtower %s responded with %T to "+
×
662
                        "DeleteSession", towerAddr, remoteMsg)
×
663
        }
×
664

665
        switch deleteSessionReply.Code {
4✔
666
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
4✔
667
                return nil
4✔
668
        default:
×
669
                return fmt.Errorf("received error code %v in "+
×
670
                        "DeleteSessionReply when attempting to delete "+
×
671
                        "session from tower", deleteSessionReply.Code)
×
672
        }
673
}
674

675
// backupDispatcher processes events coming from the taskPipeline and is
676
// responsible for detecting when the client needs to renegotiate a session to
677
// fulfill continuing demand. The event loop exits if the client is quit.
678
//
679
// NOTE: This method MUST be run as a goroutine.
680
func (c *client) backupDispatcher() {
36✔
681
        defer c.wg.Done()
36✔
682

36✔
683
        c.log.Tracef("Starting backup dispatcher")
36✔
684
        defer c.log.Tracef("Stopping backup dispatcher")
36✔
685

36✔
686
        for {
729✔
687
                switch {
693✔
688

689
                // No active session queue and no additional sessions.
690
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
79✔
691
                        c.log.Infof("Requesting new session.")
79✔
692

79✔
693
                        // Immediately request a new session.
79✔
694
                        c.negotiator.RequestSession()
79✔
695

79✔
696
                        // Wait until we receive the newly negotiated session.
79✔
697
                        // All backups sent in the meantime are queued in the
79✔
698
                        // revoke queue, as we cannot process them.
79✔
699
                awaitSession:
79✔
700
                        select {
79✔
701
                        case session := <-c.negotiator.NewSessions():
73✔
702
                                c.log.Infof("Acquired new session with id=%s",
73✔
703
                                        session.ID)
73✔
704
                                c.candidateSessions[session.ID] = session
73✔
705
                                c.stats.sessionAcquired()
73✔
706

73✔
707
                                // We'll continue to choose the newly negotiated
73✔
708
                                // session as our active session queue.
73✔
709
                                continue
73✔
710

711
                        case <-c.statTicker.C:
×
712
                                c.log.Infof("Client stats: %s", c.stats)
×
713

714
                        // A new tower has been requested to be added. We'll
715
                        // update our persisted and in-memory state and consider
716
                        // its corresponding sessions, if any, as new
717
                        // candidates.
718
                        case msg := <-c.newTowers:
37✔
719
                                msg.errChan <- c.handleNewTower(msg.tower)
37✔
720

721
                        // A tower has been requested to be removed. We'll
722
                        // only allow removal of it if the address in question
723
                        // is not currently being used for session negotiation.
724
                        case msg := <-c.staleTowers:
4✔
725
                                msg.errChan <- c.handleStaleTower(msg)
4✔
726

727
                        // A tower has been requested to be de-activated. We'll
728
                        // only allow this if the tower is not currently being
729
                        // used for session negotiation.
730
                        case msg := <-c.deactivateTowers:
×
731
                                msg.errChan <- c.handleDeactivateTower(msg)
×
732

733
                        // A request has come through to terminate a session.
734
                        case msg := <-c.terminateSessions:
×
735
                                msg.errChan <- c.handleTerminateSession(msg)
×
736

737
                        case <-c.quit:
6✔
738
                                return
6✔
739
                        }
740

741
                        // Instead of looping, we'll jump back into the select
742
                        // case and await the delivery of the session to prevent
743
                        // us from re-requesting additional sessions.
744
                        goto awaitSession
41✔
745

746
                // No active session queue but have additional sessions.
747
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
83✔
748
                        // We've exhausted the prior session, we'll pop another
83✔
749
                        // from the remaining sessions and continue processing
83✔
750
                        // backup tasks.
83✔
751
                        var err error
83✔
752
                        c.sessionQueue, err = c.nextSessionQueue()
83✔
753
                        if err != nil {
83✔
754
                                c.log.Errorf("error fetching next session "+
×
755
                                        "queue: %v", err)
×
756
                        }
×
757

758
                        if c.sessionQueue != nil {
166✔
759
                                c.log.Debugf("Loaded next candidate session "+
83✔
760
                                        "queue id=%s", c.sessionQueue.ID())
83✔
761
                        }
83✔
762

763
                // Have active session queue, process backups.
764
                case c.sessionQueue != nil:
531✔
765
                        if c.prevTask != nil {
532✔
766
                                c.processTask(c.prevTask)
1✔
767

1✔
768
                                // Continue to ensure the sessionQueue is
1✔
769
                                // properly initialized before attempting to
1✔
770
                                // process more tasks from the pipeline.
1✔
771
                                continue
1✔
772
                        }
773

774
                        // Normal operation where new tasks are read from the
775
                        // pipeline.
776
                        select {
530✔
777

778
                        // If any sessions are negotiated while we have an
779
                        // active session queue, queue them for future use.
780
                        // This shouldn't happen with the current design, so
781
                        // it doesn't hurt to select here just in case. In the
782
                        // future, we will likely allow more asynchrony so that
783
                        // we can request new sessions before the session is
784
                        // fully empty, which this case would handle.
785
                        case session := <-c.negotiator.NewSessions():
×
786
                                c.log.Warnf("Acquired new session with id=%s "+
×
787
                                        "while processing tasks", session.ID)
×
788
                                c.candidateSessions[session.ID] = session
×
789
                                c.stats.sessionAcquired()
×
790

791
                        case <-c.statTicker.C:
×
792
                                c.log.Infof("Client stats: %s", c.stats)
×
793

794
                        // Process each backup task serially from the queue of
795
                        // revoked states.
796
                        case task, ok := <-c.pipeline.NextBackupID():
480✔
797
                                // All backups in the pipeline have been
480✔
798
                                // processed, it is now safe to exit.
480✔
799
                                if !ok {
480✔
800
                                        return
×
801
                                }
×
802

803
                                c.log.Debugf("Processing %v", task)
480✔
804

480✔
805
                                c.stats.taskReceived()
480✔
806
                                c.processTask(task)
480✔
807

808
                        // A new tower has been requested to be added. We'll
809
                        // update our persisted and in-memory state and consider
810
                        // its corresponding sessions, if any, as new
811
                        // candidates.
812
                        case msg := <-c.newTowers:
10✔
813
                                msg.errChan <- c.handleNewTower(msg.tower)
10✔
814

815
                        // A tower has been removed, so we'll remove certain
816
                        // information that's persisted and also in our
817
                        // in-memory state depending on the request, and set any
818
                        // of its corresponding candidate sessions as inactive.
819
                        case msg := <-c.staleTowers:
7✔
820
                                msg.errChan <- c.handleStaleTower(msg)
7✔
821

822
                        // A tower has been requested to be de-activated.
823
                        case msg := <-c.deactivateTowers:
2✔
824
                                msg.errChan <- c.handleDeactivateTower(msg)
2✔
825

826
                        // A request has come through to terminate a session.
827
                        case msg := <-c.terminateSessions:
1✔
828
                                msg.errChan <- c.handleTerminateSession(msg)
1✔
829

830
                        case <-c.quit:
30✔
831
                                return
30✔
832
                        }
833
                }
834
        }
835
}
836

837
// processTask attempts to schedule the given backupTask on the active
838
// sessionQueue. The task will either be accepted or rejected, after which the
839
// appropriate modifications to the client's state machine will be made. After
840
// every invocation of processTask, the caller should ensure that the
841
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
842
// that are rejected because the active sessionQueue is full will be cached as
843
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
844
func (c *client) processTask(task *wtdb.BackupID) {
481✔
845
        script, ok := c.cfg.getSweepScript(task.ChanID)
481✔
846
        if !ok {
481✔
847
                log.Infof("not processing task for unregistered channel: %s",
×
848
                        task.ChanID)
×
849

×
850
                return
×
851
        }
×
852

853
        backupTask := newBackupTask(*task, script)
481✔
854

481✔
855
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
481✔
856
        if accepted {
956✔
857
                c.taskAccepted(task, status)
475✔
858
        } else {
481✔
859
                c.taskRejected(task, status)
6✔
860
        }
6✔
861
}
862

863
// taskAccepted processes the acceptance of a task by a sessionQueue depending
864
// on the state the sessionQueue is in *after* the task is added. The client's
865
// prevTask is always removed as a result of this call. The client's
866
// sessionQueue will be removed if accepting the task left the sessionQueue in
867
// an exhausted state.
868
func (c *client) taskAccepted(task *wtdb.BackupID,
869
        newStatus sessionQueueStatus) {
475✔
870

475✔
871
        c.log.Infof("Queued %v successfully for session %v", task,
475✔
872
                c.sessionQueue.ID())
475✔
873

475✔
874
        c.stats.taskAccepted()
475✔
875

475✔
876
        // If this task was accepted, we discard anything held in the prevTask.
475✔
877
        // Either it was nil before, or is the task which was just accepted.
475✔
878
        c.prevTask = nil
475✔
879

475✔
880
        switch newStatus {
475✔
881

882
        // The sessionQueue still has capacity after accepting this task.
883
        case sessionQueueAvailable:
432✔
884

885
        // The sessionQueue is full after accepting this task, so we will need
886
        // to request a new one before proceeding.
887
        case sessionQueueExhausted:
43✔
888
                c.stats.sessionExhausted()
43✔
889

43✔
890
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
43✔
891

43✔
892
                // This task left the session exhausted, set it to nil and
43✔
893
                // proceed to the next loop, so we can consume another
43✔
894
                // pre-negotiated session or request another.
43✔
895
                c.sessionQueue = nil
43✔
896
        }
897
}
898

899
// taskRejected process the rejection of a task by a sessionQueue depending on
900
// the state the was in *before* the task was rejected. The client's prevTask
901
// will cache the task if the sessionQueue was exhausted beforehand, and nil
902
// the sessionQueue to find a new session. If the sessionQueue was not
903
// exhausted and not shutting down, the client marks the task as ineligible, as
904
// this implies we couldn't construct a valid justice transaction given the
905
// session's policy.
906
func (c *client) taskRejected(task *wtdb.BackupID,
907
        curStatus sessionQueueStatus) {
6✔
908

6✔
909
        switch curStatus {
6✔
910

911
        // The sessionQueue has available capacity but the task was rejected,
912
        // this indicates that the task was ineligible for backup.
913
        case sessionQueueAvailable:
5✔
914
                c.stats.taskIneligible()
5✔
915

5✔
916
                c.log.Infof("Ignoring ineligible %v", task)
5✔
917

5✔
918
                err := c.cfg.DB.MarkBackupIneligible(
5✔
919
                        task.ChanID, task.CommitHeight,
5✔
920
                )
5✔
921
                if err != nil {
5✔
922
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
923
                                task, err)
×
924

×
925
                        // It is safe to not handle this error, even if we could
×
926
                        // not persist the result. At worst, this task may be
×
927
                        // reprocessed on a subsequent start up, and will either
×
928
                        // succeed do a change in session parameters or fail in
×
929
                        // the same manner.
×
930
                }
×
931

932
                // If this task was rejected *and* the session had available
933
                // capacity, we discard anything held in the prevTask. Either it
934
                // was nil before, or is the task which was just rejected.
935
                c.prevTask = nil
5✔
936

937
        // The sessionQueue rejected the task because it is full, we will stash
938
        // this task and try to add it to the next available sessionQueue.
939
        case sessionQueueExhausted:
1✔
940
                c.stats.sessionExhausted()
1✔
941

1✔
942
                c.log.Debugf("Session %v exhausted, %v queued for next session",
1✔
943
                        c.sessionQueue.ID(), task)
1✔
944

1✔
945
                // Cache the task that we pulled off, so that we can process it
1✔
946
                // once a new session queue is available.
1✔
947
                c.sessionQueue = nil
1✔
948
                c.prevTask = task
1✔
949

950
        // The sessionQueue rejected the task because it is shutting down. We
951
        // will stash this task and try to add it to the next available
952
        // sessionQueue.
953
        case sessionQueueShuttingDown:
×
954
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
955
                        "next session", c.sessionQueue.ID(), task)
×
956

×
957
                // Cache the task that we pulled off, so that we can process it
×
958
                // once a new session queue is available.
×
959
                c.sessionQueue = nil
×
960
                c.prevTask = task
×
961
        }
962
}
963

964
// dial connects the peer at addr using privKey as our secret key for the
965
// connection. The connection will use the configured Net's resolver to resolve
966
// the address for either Tor or clear net connections.
967
func (c *client) dial(localKey keychain.SingleKeyECDH,
968
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
440✔
969

440✔
970
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
440✔
971
}
440✔
972

973
// readMessage receives and parses the next message from the given Peer. An
974
// error is returned if a message is not received before the server's read
975
// timeout, the read off the wire failed, or the message could not be
976
// deserialized.
977
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
1,220✔
978
        // Set a read timeout to ensure we drop the connection if nothing is
1,220✔
979
        // received in a timely manner.
1,220✔
980
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
1,220✔
981
        if err != nil {
1,220✔
982
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
983
                c.log.Errorf("Unable to read msg: %v", err)
×
984
                return nil, err
×
985
        }
×
986

987
        // Pull the next message off the wire,
988
        rawMsg, err := peer.ReadNextMessage()
1,220✔
989
        if err != nil {
1,470✔
990
                err = fmt.Errorf("unable to read message: %w", err)
250✔
991
                c.log.Errorf("Unable to read msg: %v", err)
250✔
992
                return nil, err
250✔
993
        }
250✔
994

995
        // Parse the received message according to the watchtower wire
996
        // specification.
997
        msgReader := bytes.NewReader(rawMsg)
970✔
998
        msg, err := wtwire.ReadMessage(msgReader, 0)
970✔
999
        if err != nil {
970✔
1000
                err = fmt.Errorf("unable to parse message: %w", err)
×
1001
                c.log.Errorf("Unable to read msg: %v", err)
×
1002
                return nil, err
×
1003
        }
×
1004

1005
        c.logMessage(peer, msg, true)
970✔
1006

970✔
1007
        return msg, nil
970✔
1008
}
1009

1010
// sendMessage sends a watchtower wire message to the target peer.
1011
func (c *client) sendMessage(peer wtserver.Peer,
1012
        msg wtwire.Message) error {
1,221✔
1013

1,221✔
1014
        // Encode the next wire message into the buffer.
1,221✔
1015
        // TODO(conner): use buffer pool
1,221✔
1016
        var b bytes.Buffer
1,221✔
1017
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,221✔
1018
        if err != nil {
1,221✔
1019
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1020
                c.log.Errorf("Unable to send msg: %v", err)
×
1021
                return err
×
1022
        }
×
1023

1024
        // Set the write deadline for the connection, ensuring we drop the
1025
        // connection if nothing is sent in a timely manner.
1026
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
1,221✔
1027
        if err != nil {
1,221✔
1028
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1029
                c.log.Errorf("Unable to send msg: %v", err)
×
1030
                return err
×
1031
        }
×
1032

1033
        c.logMessage(peer, msg, false)
1,221✔
1034

1,221✔
1035
        // Write out the full message to the remote peer.
1,221✔
1036
        _, err = peer.Write(b.Bytes())
1,221✔
1037
        if err != nil {
1,222✔
1038
                c.log.Errorf("Unable to send msg: %v", err)
1✔
1039
        }
1✔
1040
        return err
1,221✔
1041
}
1042

1043
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1044
// database and supplying it with the resources needed by the client.
1045
func (c *client) newSessionQueue(s *ClientSession,
1046
        updates []wtdb.CommittedUpdate) *sessionQueue {
81✔
1047

81✔
1048
        return newSessionQueue(&sessionQueueConfig{
81✔
1049
                ClientSession:          s,
81✔
1050
                ChainHash:              c.cfg.ChainHash,
81✔
1051
                Dial:                   c.dial,
81✔
1052
                ReadMessage:            c.readMessage,
81✔
1053
                SendMessage:            c.sendMessage,
81✔
1054
                Signer:                 c.cfg.Signer,
81✔
1055
                DB:                     c.cfg.DB,
81✔
1056
                MinBackoff:             c.cfg.MinBackoff,
81✔
1057
                MaxBackoff:             c.cfg.MaxBackoff,
81✔
1058
                Log:                    c.log,
81✔
1059
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
81✔
1060
                TaskPipeline:           c.pipeline,
81✔
1061
        }, updates)
81✔
1062
}
81✔
1063

1064
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1065
// passed ClientSession. If it exists, the active sessionQueue is returned.
1066
// Otherwise, a new sessionQueue is initialized and added to the set.
1067
func (c *client) getOrInitActiveQueue(s *ClientSession,
1068
        updates []wtdb.CommittedUpdate) *sessionQueue {
83✔
1069

83✔
1070
        if sq, ok := c.activeSessions.Get(s.ID); ok {
87✔
1071
                return sq
4✔
1072
        }
4✔
1073

1074
        return c.initActiveQueue(s, updates)
79✔
1075
}
1076

1077
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1078
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1079
// so that it can deliver any committed updates or begin accepting newly
1080
// assigned tasks.
1081
func (c *client) initActiveQueue(s *ClientSession,
1082
        updates []wtdb.CommittedUpdate) *sessionQueue {
81✔
1083

81✔
1084
        // Initialize the session queue, providing it with all the resources it
81✔
1085
        // requires from the client instance.
81✔
1086
        sq := c.newSessionQueue(s, updates)
81✔
1087

81✔
1088
        // Add the session queue as an active session so that we remember to
81✔
1089
        // stop it on shutdown. This method will also start the queue so that it
81✔
1090
        // can be active in processing newly assigned tasks or to upload
81✔
1091
        // previously committed updates.
81✔
1092
        c.activeSessions.AddAndStart(sq)
81✔
1093

81✔
1094
        return sq
81✔
1095
}
81✔
1096

1097
// terminateSession sets the given session's status to CSessionTerminal meaning
1098
// that it will not be used again.
1099
func (c *client) terminateSession(id wtdb.SessionID) error {
1✔
1100
        errChan := make(chan error, 1)
1✔
1101

1✔
1102
        select {
1✔
1103
        case c.terminateSessions <- &terminateSessMsg{
1104
                id:      id,
1105
                errChan: errChan,
1106
        }:
1✔
1107
        case <-c.pipeline.quit:
×
1108
                return ErrClientExiting
×
1109
        }
1110

1111
        select {
1✔
1112
        case err := <-errChan:
1✔
1113
                return err
1✔
1114
        case <-c.pipeline.quit:
×
1115
                return ErrClientExiting
×
1116
        }
1117
}
1118

1119
// handleTerminateSession handles a request to terminate a session. It will
1120
// first shut down the session if it is part of the active session set, then
1121
// it will ensure that the active session queue is set reset if it is using the
1122
// session in question. Finally, the session's status in the DB will be updated.
1123
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
1✔
1124
        id := msg.id
1✔
1125

1✔
1126
        delete(c.candidateSessions, id)
1✔
1127

1✔
1128
        err := c.activeSessions.StopAndRemove(id, true)
1✔
1129
        if err != nil {
1✔
1130
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1131
        }
×
1132

1133
        // If our active session queue corresponds to the session being
1134
        // terminated, then we'll proceed to negotiate a new one.
1135
        if c.sessionQueue != nil {
2✔
1136
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
2✔
1137
                        c.sessionQueue = nil
1✔
1138
                }
1✔
1139
        }
1140

1141
        return nil
1✔
1142
}
1143

1144
// deactivateTower sends a tower deactivation request to the backupDispatcher
1145
// where it will be handled synchronously. The request should result in all the
1146
// sessions that we have with the given tower being shutdown and removed from
1147
// our in-memory set of active sessions.
1148
func (c *client) deactivateTower(id wtdb.TowerID,
1149
        pubKey *btcec.PublicKey) error {
2✔
1150

2✔
1151
        errChan := make(chan error, 1)
2✔
1152

2✔
1153
        select {
2✔
1154
        case c.deactivateTowers <- &deactivateTowerMsg{
1155
                id:      id,
1156
                pubKey:  pubKey,
1157
                errChan: errChan,
1158
        }:
2✔
1159
        case <-c.pipeline.quit:
×
1160
                return ErrClientExiting
×
1161
        }
1162

1163
        select {
2✔
1164
        case err := <-errChan:
2✔
1165
                return err
2✔
1166
        case <-c.pipeline.quit:
×
1167
                return ErrClientExiting
×
1168
        }
1169
}
1170

1171
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1172
// it from the in-memory candidate set, and we will also stop any active
1173
// sessions we have with this tower.
1174
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
2✔
1175
        // Remove the tower from our in-memory candidate set so that it is not
2✔
1176
        // used for any new session negotiations.
2✔
1177
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
2✔
1178
        if err != nil {
2✔
1179
                return err
×
1180
        }
×
1181

1182
        pubKey := msg.pubKey.SerializeCompressed()
2✔
1183
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
2✔
1184
        if err != nil {
2✔
1185
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1186
                        "%v", pubKey, err)
×
1187
        }
×
1188

1189
        // Iterate over all the sessions we have for this tower and remove them
1190
        // from our candidate set and also from our set of started, active
1191
        // sessions.
1192
        for sessionID := range sessions {
5✔
1193
                delete(c.candidateSessions, sessionID)
3✔
1194

3✔
1195
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1196
                if err != nil {
3✔
1197
                        return fmt.Errorf("could not stop session %s: %w",
×
1198
                                sessionID, err)
×
1199
                }
×
1200
        }
1201

1202
        // If our active session queue corresponds to the stale tower, we'll
1203
        // proceed to negotiate a new one.
1204
        if c.sessionQueue != nil {
4✔
1205
                towerKey := c.sessionQueue.tower.IdentityKey
2✔
1206

2✔
1207
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
4✔
1208
                        c.sessionQueue = nil
2✔
1209
                }
2✔
1210
        }
1211

1212
        return nil
2✔
1213
}
1214

1215
// addTower adds a new watchtower reachable at the given address and considers
1216
// it for new sessions. If the watchtower already exists, then any new addresses
1217
// included will be considered when dialing it for session negotiations and
1218
// backups.
1219
func (c *client) addTower(tower *Tower) error {
47✔
1220
        errChan := make(chan error, 1)
47✔
1221

47✔
1222
        select {
47✔
1223
        case c.newTowers <- &newTowerMsg{
1224
                tower:   tower,
1225
                errChan: errChan,
1226
        }:
47✔
1227
        case <-c.pipeline.quit:
×
1228
                return ErrClientExiting
×
1229
        }
1230

1231
        select {
47✔
1232
        case err := <-errChan:
47✔
1233
                return err
47✔
1234
        case <-c.pipeline.quit:
×
1235
                return ErrClientExiting
×
1236
        }
1237
}
1238

1239
// handleNewTower handles a request for a new tower to be added. If the tower
1240
// already exists, then its corresponding sessions, if any, will be set
1241
// considered as candidates.
1242
func (c *client) handleNewTower(tower *Tower) error {
47✔
1243
        c.candidateTowers.AddCandidate(tower)
47✔
1244

47✔
1245
        // Include all of its corresponding sessions to our set of candidates.
47✔
1246
        sessions, err := getClientSessions(
47✔
1247
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
47✔
1248
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
47✔
1249
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
47✔
1250
        )
47✔
1251
        if err != nil {
47✔
1252
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1253
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1254
        }
×
1255
        for id, session := range sessions {
58✔
1256
                c.candidateSessions[id] = session
11✔
1257
        }
11✔
1258

1259
        return nil
47✔
1260
}
1261

1262
// removeTower removes a watchtower from being considered for future session
1263
// negotiations and from being used for any subsequent backups until it's added
1264
// again. If an address is provided, then this call only serves as a way of
1265
// removing the address from the watchtower instead.
1266
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1267
        addr net.Addr) error {
11✔
1268

11✔
1269
        errChan := make(chan error, 1)
11✔
1270

11✔
1271
        select {
11✔
1272
        case c.staleTowers <- &staleTowerMsg{
1273
                id:      id,
1274
                pubKey:  pubKey,
1275
                addr:    addr,
1276
                errChan: errChan,
1277
        }:
11✔
1278
        case <-c.pipeline.quit:
×
1279
                return ErrClientExiting
×
1280
        }
1281

1282
        select {
11✔
1283
        case err := <-errChan:
11✔
1284
                return err
11✔
1285
        case <-c.pipeline.quit:
×
1286
                return ErrClientExiting
×
1287
        }
1288
}
1289

1290
// handleStaleTower handles a request for an existing tower to be removed. If
1291
// none of the tower's sessions have pending updates, then they will become
1292
// inactive and removed as candidates. If the active session queue corresponds
1293
// to any of these sessions, a new one will be negotiated.
1294
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
11✔
1295
        // We'll first update our in-memory state.
11✔
1296
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
11✔
1297
        if err != nil {
12✔
1298
                return err
1✔
1299
        }
1✔
1300

1301
        // If an address was provided, then we're only meant to remove the
1302
        // address from the tower.
1303
        if msg.addr != nil {
12✔
1304
                return nil
2✔
1305
        }
2✔
1306

1307
        // Otherwise, the tower should no longer be used for future session
1308
        // negotiations and backups.
1309

1310
        pubKey := msg.pubKey.SerializeCompressed()
8✔
1311
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
8✔
1312
        if err != nil {
8✔
1313
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1314
                        "%v", pubKey, err)
×
1315
        }
×
1316
        for sessionID := range sessions {
17✔
1317
                delete(c.candidateSessions, sessionID)
9✔
1318

9✔
1319
                // Shutdown the session so that any pending updates are
9✔
1320
                // replayed back onto the main task pipeline.
9✔
1321
                err = c.activeSessions.StopAndRemove(sessionID, true)
9✔
1322
                if err != nil {
9✔
1323
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1324
                                err)
×
1325
                }
×
1326
        }
1327

1328
        // If our active session queue corresponds to the stale tower, we'll
1329
        // proceed to negotiate a new one.
1330
        if c.sessionQueue != nil {
14✔
1331
                towerKey := c.sessionQueue.tower.IdentityKey
6✔
1332

6✔
1333
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
12✔
1334
                        c.sessionQueue = nil
6✔
1335
                }
6✔
1336
        }
1337

1338
        return nil
8✔
1339
}
1340

1341
// registeredTowers retrieves the list of watchtowers registered with the
1342
// client.
1343
func (c *client) registeredTowers(towers []*wtdb.Tower,
1344
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1345

×
1346
        // Generate a filter that will fetch all the client's sessions
×
1347
        // regardless of if they are active or not.
×
1348
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1349

×
1350
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1351
        if err != nil {
×
1352
                return nil, err
×
1353
        }
×
1354

1355
        // Construct a lookup map that coalesces all the sessions for a specific
1356
        // watchtower.
1357
        towerSessions := make(
×
1358
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1359
        )
×
1360
        for id, s := range clientSessions {
×
1361
                sessions, ok := towerSessions[s.TowerID]
×
1362
                if !ok {
×
1363
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1364
                        towerSessions[s.TowerID] = sessions
×
1365
                }
×
1366
                sessions[id] = s
×
1367
        }
1368

1369
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1370
        for _, tower := range towers {
×
1371
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1372
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1373
                        Tower:                  tower,
×
1374
                        Sessions:               towerSessions[tower.ID],
×
1375
                        ActiveSessionCandidate: isActive,
×
1376
                })
×
1377
        }
×
1378

1379
        return registeredTowers, nil
×
1380
}
1381

1382
// lookupTower retrieves the info of sessions held with the given tower handled
1383
// by this client.
1384
func (c *client) lookupTower(tower *wtdb.Tower,
1385
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
2✔
1386

2✔
1387
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
2✔
1388

2✔
1389
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
2✔
1390
        if err != nil {
2✔
1391
                return nil, err
×
1392
        }
×
1393

1394
        return &RegisteredTower{
2✔
1395
                Tower:                  tower,
2✔
1396
                Sessions:               towerSessions,
2✔
1397
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
2✔
1398
        }, nil
2✔
1399
}
1400

1401
// getStats returns the in-memory statistics of the client since startup.
UNCOV
1402
func (c *client) getStats() ClientStats {
×
UNCOV
1403
        return c.stats.getStatsCopy()
×
UNCOV
1404
}
×
1405

1406
// policy returns the active client policy configuration.
1407
func (c *client) policy() wtpolicy.Policy {
6✔
1408
        return c.cfg.Policy
6✔
1409
}
6✔
1410

1411
// logMessage writes information about a message received from a remote peer,
1412
// using directional prepositions to signal whether the message was sent or
1413
// received.
1414
func (c *client) logMessage(
1415
        peer wtserver.Peer, msg wtwire.Message, read bool) {
2,191✔
1416

2,191✔
1417
        var action = "Received"
2,191✔
1418
        var preposition = "from"
2,191✔
1419
        if !read {
3,412✔
1420
                action = "Sending"
1,221✔
1421
                preposition = "to"
1,221✔
1422
        }
1,221✔
1423

1424
        summary := wtwire.MessageSummary(msg)
2,191✔
1425
        if len(summary) > 0 {
3,519✔
1426
                summary = "(" + summary + ")"
1,328✔
1427
        }
1,328✔
1428

1429
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,191✔
1430
                preposition, peer.RemotePub().SerializeCompressed(),
2,191✔
1431
                peer.RemoteAddr())
2,191✔
1432
}
1433

1434
func newRandomDelay(max uint32) (uint32, error) {
4✔
1435
        var maxDelay big.Int
4✔
1436
        maxDelay.SetUint64(uint64(max))
4✔
1437

4✔
1438
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
4✔
1439
        if err != nil {
4✔
1440
                return 0, err
×
1441
        }
×
1442

1443
        return uint32(randDelay.Uint64()), nil
4✔
1444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc