• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.03
/watchtower/wtclient/client.go
1
package wtclient
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "errors"
7
        "fmt"
8
        "math/big"
9
        "net"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btclog"
15
        "github.com/lightningnetwork/lnd/build"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/keychain"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
21
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
22
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
23
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
24
)
25

26
const (
27
        // DefaultReadTimeout specifies the default duration we will wait during
28
        // a read before breaking out of a blocking read.
29
        DefaultReadTimeout = 15 * time.Second
30

31
        // DefaultWriteTimeout specifies the default duration we will wait
32
        // during a write before breaking out of a blocking write.
33
        DefaultWriteTimeout = 15 * time.Second
34

35
        // DefaultStatInterval specifies the default interval between logging
36
        // metrics about the client's operation.
37
        DefaultStatInterval = time.Minute
38

39
        // DefaultSessionCloseRange is the range over which we will generate a
40
        // random number of blocks to delay closing a session after its last
41
        // channel has been closed.
42
        DefaultSessionCloseRange = 288
43

44
        // DefaultMaxTasksInMemQueue is the maximum number of items to be held
45
        // in the in-memory queue.
46
        DefaultMaxTasksInMemQueue = 2000
47
)
48

49
// genSessionFilter constructs a filter that can be used to select sessions only
50
// if they match the policy of the client (namely anchor vs legacy). If
51
// activeOnly is set, then only active sessions will be returned.
52
func (c *client) genSessionFilter(
53
        activeOnly bool) wtdb.ClientSessionFilterFn {
85✔
54

85✔
55
        return func(session *wtdb.ClientSession) bool {
113✔
56
                if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy {
28✔
UNCOV
57
                        return false
×
UNCOV
58
                }
×
59

60
                if !activeOnly {
32✔
61
                        return true
4✔
62
                }
4✔
63

64
                return session.Status == wtdb.CSessionActive
24✔
65
        }
66
}
67

68
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
69
// function that will filter out any sessions that have been exhausted. A
70
// session is considered exhausted only if it has no un-acked updates and the
71
// sequence number of the session is equal to the max updates of the session
72
// policy.
73
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
83✔
74
        return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
107✔
75
                return session.SeqNum < session.Policy.MaxUpdates ||
24✔
76
                        numUnAcked > 0
24✔
77
        }
24✔
78
}
79

80
// RegisteredTower encompasses information about a registered watchtower with
81
// the client.
82
type RegisteredTower struct {
83
        *wtdb.Tower
84

85
        // Sessions is the set of sessions corresponding to the watchtower.
86
        Sessions map[wtdb.SessionID]*wtdb.ClientSession
87

88
        // ActiveSessionCandidate determines whether the watchtower is currently
89
        // being considered for new sessions.
90
        ActiveSessionCandidate bool
91
}
92

93
// BreachRetributionBuilder is a function that can be used to construct a
94
// BreachRetribution from a channel ID and a commitment height.
95
type BreachRetributionBuilder func(id lnwire.ChannelID,
96
        commitHeight uint64) (*lnwallet.BreachRetribution,
97
        channeldb.ChannelType, error)
98

99
// newTowerMsg is an internal message we'll use within the client to signal
100
// that a new tower can be considered.
101
type newTowerMsg struct {
102
        // tower holds the info about the new Tower or new tower address
103
        // required to connect to it.
104
        tower *Tower
105

106
        // errChan is the channel through which we'll send a response back to
107
        // the caller when handling their request.
108
        //
109
        // NOTE: This channel must be buffered.
110
        errChan chan error
111
}
112

113
// staleTowerMsg is an internal message we'll use within the client to
114
// signal that a tower should no longer be considered.
115
type staleTowerMsg struct {
116
        // id is the unique database identifier for the tower.
117
        id wtdb.TowerID
118

119
        // pubKey is the identifying public key of the watchtower.
120
        pubKey *btcec.PublicKey
121

122
        // addr is an optional field that when set signals that the address
123
        // should be removed from the watchtower's set of addresses, indicating
124
        // that it is stale. If it's not set, then the watchtower should be
125
        // no longer be considered for new sessions.
126
        addr net.Addr
127

128
        // errChan is the channel through which we'll send a response back to
129
        // the caller when handling their request.
130
        //
131
        // NOTE: This channel must be buffered.
132
        errChan chan error
133
}
134

135
// deactivateTowerMsg is an internal message we'll use within the TowerClient
136
// to signal that a tower should be marked as inactive.
137
type deactivateTowerMsg struct {
138
        // id is the unique database identifier for the tower.
139
        id wtdb.TowerID
140

141
        // pubKey is the identifying public key of the watchtower.
142
        pubKey *btcec.PublicKey
143

144
        // errChan is the channel through which we'll send a response back to
145
        // the caller when handling their request.
146
        //
147
        // NOTE: This channel must be buffered.
148
        errChan chan error
149
}
150

151
// terminateSessMsg is an internal message we'll use within the TowerClient to
152
// signal that a session should be terminated.
153
type terminateSessMsg struct {
154
        // id is the session identifier.
155
        id wtdb.SessionID
156

157
        // errChan is the channel through which we'll send a response back to
158
        // the caller when handling their request.
159
        //
160
        // NOTE: This channel must be buffered.
161
        errChan chan error
162
}
163

164
// clientCfg holds the configuration values required by a client.
165
type clientCfg struct {
166
        *Config
167

168
        // Policy is the session policy the client will propose when creating
169
        // new sessions with the tower. If the policy differs from any active
170
        // sessions recorded in the database, those sessions will be ignored and
171
        // new sessions will be requested immediately.
172
        Policy wtpolicy.Policy
173

174
        getSweepScript func(lnwire.ChannelID) ([]byte, bool)
175
}
176

177
// client manages backing up revoked states for all states that fall under a
178
// specific policy type.
179
type client struct {
180
        cfg *clientCfg
181

182
        log btclog.Logger
183

184
        pipeline *DiskOverflowQueue[*wtdb.BackupID]
185

186
        negotiator        SessionNegotiator
187
        candidateTowers   TowerCandidateIterator
188
        candidateSessions map[wtdb.SessionID]*ClientSession
189
        activeSessions    *sessionQueueSet
190

191
        sessionQueue *sessionQueue
192
        prevTask     *wtdb.BackupID
193

194
        statTicker *time.Ticker
195
        stats      *clientStats
196

197
        newTowers         chan *newTowerMsg
198
        staleTowers       chan *staleTowerMsg
199
        deactivateTowers  chan *deactivateTowerMsg
200
        terminateSessions chan *terminateSessMsg
201

202
        wg   sync.WaitGroup
203
        quit chan struct{}
204
}
205

206
// newClient initializes a new client from the provided clientCfg. An error is
207
// returned if the client could not be initialized.
208
func newClient(cfg *clientCfg) (*client, error) {
36✔
209
        identifier, err := cfg.Policy.BlobType.Identifier()
36✔
210
        if err != nil {
36✔
211
                return nil, err
×
212
        }
×
213
        prefix := fmt.Sprintf("(%s)", identifier)
36✔
214

36✔
215
        plog := build.NewPrefixLog(prefix, log)
36✔
216

36✔
217
        queueDB := cfg.DB.GetDBQueue([]byte(identifier))
36✔
218
        queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
36✔
219
                queueDB, cfg.MaxTasksInMemQueue, plog,
36✔
220
        )
36✔
221
        if err != nil {
36✔
222
                return nil, err
×
223
        }
×
224

225
        c := &client{
36✔
226
                cfg:               cfg,
36✔
227
                log:               plog,
36✔
228
                pipeline:          queue,
36✔
229
                activeSessions:    newSessionQueueSet(),
36✔
230
                statTicker:        time.NewTicker(DefaultStatInterval),
36✔
231
                stats:             new(clientStats),
36✔
232
                newTowers:         make(chan *newTowerMsg),
36✔
233
                staleTowers:       make(chan *staleTowerMsg),
36✔
234
                deactivateTowers:  make(chan *deactivateTowerMsg),
36✔
235
                terminateSessions: make(chan *terminateSessMsg),
36✔
236
                quit:              make(chan struct{}),
36✔
237
        }
36✔
238

36✔
239
        candidateTowers := newTowerListIterator()
36✔
240
        perActiveTower := func(tower *Tower) {
45✔
241
                // If the tower has already been marked as active, then there is
9✔
242
                // no need to add it to the iterator again.
9✔
243
                if candidateTowers.IsActive(tower.ID) {
9✔
244
                        return
×
245
                }
×
246

247
                c.log.Infof("Using private watchtower %x, offering policy %s",
9✔
248
                        tower.IdentityKey.SerializeCompressed(), cfg.Policy)
9✔
249

9✔
250
                // Add the tower to the set of candidate towers.
9✔
251
                candidateTowers.AddCandidate(tower)
9✔
252
        }
253

254
        // Load all candidate sessions and towers from the database into the
255
        // client. We will use any of these sessions if their policies match the
256
        // current policy of the client, otherwise they will be ignored and new
257
        // sessions will be requested.
258
        candidateSessions, err := getTowerAndSessionCandidates(
36✔
259
                cfg.DB, cfg.SecretKeyRing, perActiveTower,
36✔
260
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
36✔
261
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
36✔
262
        )
36✔
263
        if err != nil {
36✔
264
                return nil, err
×
265
        }
×
266

267
        c.candidateTowers = candidateTowers
36✔
268
        c.candidateSessions = candidateSessions
36✔
269

36✔
270
        c.negotiator = newSessionNegotiator(&NegotiatorConfig{
36✔
271
                DB:            cfg.DB,
36✔
272
                SecretKeyRing: cfg.SecretKeyRing,
36✔
273
                Policy:        cfg.Policy,
36✔
274
                ChainHash:     cfg.ChainHash,
36✔
275
                SendMessage:   c.sendMessage,
36✔
276
                ReadMessage:   c.readMessage,
36✔
277
                Dial:          c.dial,
36✔
278
                Candidates:    c.candidateTowers,
36✔
279
                MinBackoff:    cfg.MinBackoff,
36✔
280
                MaxBackoff:    cfg.MaxBackoff,
36✔
281
                Log:           plog,
36✔
282
        })
36✔
283

36✔
284
        return c, nil
36✔
285
}
286

287
// getTowerAndSessionCandidates loads all the towers from the DB and then
288
// fetches the sessions for each of tower. Sessions are only collected if they
289
// pass the sessionFilter check. If a tower has a session that does pass the
290
// sessionFilter check then the perActiveTower call-back will be called on that
291
// tower.
292
func getTowerAndSessionCandidates(db DB, keyRing ECDHKeyRing,
293
        perActiveTower func(tower *Tower),
294
        opts ...wtdb.ClientSessionListOption) (
295
        map[wtdb.SessionID]*ClientSession, error) {
36✔
296

36✔
297
        // Fetch all active towers from the DB.
36✔
298
        towers, err := db.ListTowers(func(tower *wtdb.Tower) bool {
45✔
299
                return tower.Status == wtdb.TowerStatusActive
9✔
300
        })
9✔
301
        if err != nil {
36✔
302
                return nil, err
×
303
        }
×
304

305
        candidateSessions := make(map[wtdb.SessionID]*ClientSession)
36✔
306
        for _, dbTower := range towers {
45✔
307
                tower, err := NewTowerFromDBTower(dbTower)
9✔
308
                if err != nil {
9✔
309
                        return nil, err
×
310
                }
×
311

312
                sessions, err := db.ListClientSessions(&tower.ID, opts...)
9✔
313
                if err != nil {
9✔
314
                        return nil, err
×
315
                }
×
316

317
                for _, s := range sessions {
17✔
318
                        cs, err := NewClientSessionFromDBSession(
8✔
319
                                s, tower, keyRing,
8✔
320
                        )
8✔
321
                        if err != nil {
8✔
322
                                return nil, err
×
323
                        }
×
324

325
                        // Add the session to the set of candidate sessions.
326
                        candidateSessions[s.ID] = cs
8✔
327
                }
328

329
                perActiveTower(tower)
9✔
330
        }
331

332
        return candidateSessions, nil
36✔
333
}
334

335
// getClientSessions retrieves the client sessions for a particular tower if
336
// specified, otherwise all client sessions for all towers are retrieved. An
337
// optional filter can be provided to filter out any undesired client sessions.
338
//
339
// NOTE: This method should only be used when deserialization of a
340
// ClientSession's SessionPrivKey field is desired, otherwise, the existing
341
// ListClientSessions method should be used.
342
func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID,
343
        opts ...wtdb.ClientSessionListOption) (
344
        map[wtdb.SessionID]*ClientSession, error) {
47✔
345

47✔
346
        dbSessions, err := db.ListClientSessions(forTower, opts...)
47✔
347
        if err != nil {
47✔
348
                return nil, err
×
349
        }
×
350

351
        // Reload the tower from disk using the tower ID contained in each
352
        // candidate session. We will also rederive any session keys needed to
353
        // be able to communicate with the towers and authenticate session
354
        // requests. This prevents us from having to store the private keys on
355
        // disk.
356
        sessions := make(map[wtdb.SessionID]*ClientSession)
47✔
357
        for _, s := range dbSessions {
58✔
358
                dbTower, err := db.LoadTowerByID(s.TowerID)
11✔
359
                if err != nil {
11✔
360
                        return nil, err
×
361
                }
×
362

363
                towerKeyDesc, err := keyRing.DeriveKey(keychain.KeyLocator{
11✔
364
                        Family: keychain.KeyFamilyTowerSession,
11✔
365
                        Index:  s.KeyIndex,
11✔
366
                })
11✔
367
                if err != nil {
11✔
368
                        return nil, err
×
369
                }
×
370

371
                sessionKeyECDH := keychain.NewPubKeyECDH(towerKeyDesc, keyRing)
11✔
372

11✔
373
                tower, err := NewTowerFromDBTower(dbTower)
11✔
374
                if err != nil {
11✔
375
                        return nil, err
×
376
                }
×
377

378
                sessions[s.ID] = &ClientSession{
11✔
379
                        ID:                s.ID,
11✔
380
                        ClientSessionBody: s.ClientSessionBody,
11✔
381
                        Tower:             tower,
11✔
382
                        SessionKeyECDH:    sessionKeyECDH,
11✔
383
                }
11✔
384
        }
385

386
        return sessions, nil
47✔
387
}
388

389
// start initializes the watchtower client by loading or negotiating an active
390
// session and then begins processing backup tasks from the request pipeline.
391
func (c *client) start() error {
36✔
392
        c.log.Infof("Watchtower client starting")
36✔
393

36✔
394
        // First, restart a session queue for any sessions that have
36✔
395
        // committed but unacked state updates. This ensures that these
36✔
396
        // sessions will be able to flush the committed updates after a
36✔
397
        // restart.
36✔
398
        fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates
36✔
399
        for _, session := range c.candidateSessions {
44✔
400
                committedUpdates, err := fetchCommittedUpdates(
8✔
401
                        &session.ID,
8✔
402
                )
8✔
403
                if err != nil {
8✔
404
                        return err
×
405
                }
×
406

407
                if len(committedUpdates) > 0 {
10✔
408
                        c.log.Infof("Starting session=%s to process "+
2✔
409
                                "%d committed backups", session.ID,
2✔
410
                                len(committedUpdates))
2✔
411

2✔
412
                        c.initActiveQueue(session, committedUpdates)
2✔
413
                }
2✔
414
        }
415

416
        // Now start the session negotiator, which will allow us to request new
417
        // session as soon as the backupDispatcher starts up.
418
        err := c.negotiator.Start()
36✔
419
        if err != nil {
36✔
420
                return err
×
421
        }
×
422

423
        // Start the task pipeline to which new backup tasks will be
424
        // submitted from active links.
425
        err = c.pipeline.Start()
36✔
426
        if err != nil {
36✔
427
                return err
×
428
        }
×
429

430
        c.wg.Add(1)
36✔
431
        go c.backupDispatcher()
36✔
432

36✔
433
        c.log.Infof("Watchtower client started successfully")
36✔
434

36✔
435
        return nil
36✔
436
}
437

438
// stop idempotently initiates a graceful shutdown of the watchtower client.
439
func (c *client) stop() error {
36✔
440
        var returnErr error
36✔
441
        c.log.Debugf("Stopping watchtower client")
36✔
442

36✔
443
        // 1. Stop the session negotiator.
36✔
444
        err := c.negotiator.Stop()
36✔
445
        if err != nil {
36✔
446
                returnErr = err
×
447
        }
×
448

449
        // 2. Stop the backup dispatcher and any other goroutines.
450
        close(c.quit)
36✔
451
        c.wg.Wait()
36✔
452

36✔
453
        // 3. If there was a left over 'prevTask' from the backup
36✔
454
        // dispatcher, replay that onto the pipeline.
36✔
455
        if c.prevTask != nil {
36✔
456
                err = c.pipeline.QueueBackupID(c.prevTask)
×
457
                if err != nil {
×
458
                        returnErr = err
×
459
                }
×
460
        }
461

462
        // 4. Shutdown all active session queues in parallel. These will
463
        // exit once all unhandled updates have been replayed to the
464
        // task pipeline.
465
        c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
102✔
466
                return func() {
132✔
467
                        err := s.Stop(false)
66✔
468
                        if err != nil {
66✔
469
                                c.log.Errorf("could not stop session "+
×
470
                                        "queue: %s: %v", s.ID(), err)
×
471

×
472
                                returnErr = err
×
473
                        }
×
474
                }
475
        })
476

477
        // 5. Shutdown the backup queue, which will prevent any further
478
        // updates from being accepted.
479
        if err = c.pipeline.Stop(); err != nil {
36✔
480
                returnErr = err
×
481
        }
×
482

483
        c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
36✔
484

36✔
485
        return returnErr
36✔
486
}
487

488
// backupState initiates a request to back up a particular revoked state. If the
489
// method returns nil, the backup is guaranteed to be successful unless the:
490
//   - justice transaction would create dust outputs when trying to abide by the
491
//     negotiated policy, or
492
//   - breached outputs contain too little value to sweep at the target sweep
493
//     fee rate.
494
func (c *client) backupState(chanID *lnwire.ChannelID,
495
        stateNum uint64) error {
469✔
496

469✔
497
        id := &wtdb.BackupID{
469✔
498
                ChanID:       *chanID,
469✔
499
                CommitHeight: stateNum,
469✔
500
        }
469✔
501

469✔
502
        return c.pipeline.QueueBackupID(id)
469✔
503
}
469✔
504

505
// nextSessionQueue attempts to fetch an active session from our set of
506
// candidate sessions. Candidate sessions with a differing policy from the
507
// active client's advertised policy will be ignored, but may be resumed if the
508
// client is restarted with a matching policy. If no candidates were found, nil
509
// is returned to signal that we need to request a new policy.
510
func (c *client) nextSessionQueue() (*sessionQueue, error) {
83✔
511
        // Select any candidate session at random, and remove it from the set of
83✔
512
        // candidate sessions.
83✔
513
        var candidateSession *ClientSession
83✔
514
        for id, sessionInfo := range c.candidateSessions {
166✔
515
                delete(c.candidateSessions, id)
83✔
516

83✔
517
                // Skip any sessions with policies that don't match the current
83✔
518
                // TxPolicy, as they would result in different justice
83✔
519
                // transactions from what is requested. These can be used again
83✔
520
                // if the client changes their configuration and restarting.
83✔
521
                if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
83✔
522
                        continue
×
523
                }
524

525
                candidateSession = sessionInfo
83✔
526
                break
83✔
527
        }
528

529
        // If none of the sessions could be used or none were found, we'll
530
        // return nil to signal that we need another session to be negotiated.
531
        if candidateSession == nil {
83✔
532
                return nil, nil
×
533
        }
×
534

535
        updates, err := c.cfg.DB.FetchSessionCommittedUpdates(
83✔
536
                &candidateSession.ID,
83✔
537
        )
83✔
538
        if err != nil {
83✔
539
                return nil, err
×
540
        }
×
541

542
        // Initialize the session queue and spin it up, so it can begin handling
543
        // updates. If the queue was already made active on startup, this will
544
        // simply return the existing session queue from the set.
545
        return c.getOrInitActiveQueue(candidateSession, updates), nil
83✔
546
}
547

548
// stopAndRemoveSession stops the session with the given ID and removes it from
549
// the in-memory active sessions set.
550
func (c *client) stopAndRemoveSession(id wtdb.SessionID, final bool) error {
4✔
551
        return c.activeSessions.StopAndRemove(id, final)
4✔
552
}
4✔
553

554
// deleteSessionFromTower dials the tower that we created the session with and
555
// attempts to send the tower the DeleteSession message.
556
func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error {
4✔
557
        // First, we check if we have already loaded this tower in our
4✔
558
        // candidate towers iterator.
4✔
559
        tower, err := c.candidateTowers.GetTower(sess.TowerID)
4✔
560
        if errors.Is(err, ErrTowerNotInIterator) {
4✔
561
                // If not, then we attempt to load it from the DB.
×
562
                dbTower, err := c.cfg.DB.LoadTowerByID(sess.TowerID)
×
563
                if err != nil {
×
564
                        return err
×
565
                }
×
566

567
                tower, err = NewTowerFromDBTower(dbTower)
×
568
                if err != nil {
×
569
                        return err
×
570
                }
×
571
        } else if err != nil {
4✔
572
                return err
×
573
        }
×
574

575
        session, err := NewClientSessionFromDBSession(
4✔
576
                sess, tower, c.cfg.SecretKeyRing,
4✔
577
        )
4✔
578
        if err != nil {
4✔
579
                return err
×
580
        }
×
581

582
        localInit := wtwire.NewInitMessage(
4✔
583
                lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
4✔
584
                c.cfg.ChainHash,
4✔
585
        )
4✔
586

4✔
587
        var (
4✔
588
                conn wtserver.Peer
4✔
589

4✔
590
                // addrIterator is a copy of the tower's address iterator.
4✔
591
                // We use this copy so that iterating through the addresses does
4✔
592
                // not affect any other threads using this iterator.
4✔
593
                addrIterator = tower.Addresses.Copy()
4✔
594
                towerAddr    = addrIterator.Peek()
4✔
595
        )
4✔
596
        // Attempt to dial the tower with its available addresses.
4✔
597
        for {
8✔
598
                conn, err = c.dial(
4✔
599
                        session.SessionKeyECDH, &lnwire.NetAddress{
4✔
600
                                IdentityKey: tower.IdentityKey,
4✔
601
                                Address:     towerAddr,
4✔
602
                        },
4✔
603
                )
4✔
604
                if err != nil {
4✔
605
                        // If there are more addrs available, immediately try
×
606
                        // those.
×
607
                        nextAddr, iteratorErr := addrIterator.Next()
×
608
                        if iteratorErr == nil {
×
609
                                towerAddr = nextAddr
×
610
                                continue
×
611
                        }
612

613
                        // Otherwise, if we have exhausted the address list,
614
                        // exit.
615
                        addrIterator.Reset()
×
616

×
617
                        return fmt.Errorf("failed to dial tower(%x) at any "+
×
618
                                "available addresses",
×
619
                                tower.IdentityKey.SerializeCompressed())
×
620
                }
621

622
                break
4✔
623
        }
624
        defer conn.Close()
4✔
625

4✔
626
        // Send Init to tower.
4✔
627
        err = c.sendMessage(conn, localInit)
4✔
628
        if err != nil {
4✔
629
                return err
×
630
        }
×
631

632
        // Receive Init from tower.
633
        remoteMsg, err := c.readMessage(conn)
4✔
634
        if err != nil {
4✔
635
                return err
×
636
        }
×
637

638
        remoteInit, ok := remoteMsg.(*wtwire.Init)
4✔
639
        if !ok {
4✔
640
                return fmt.Errorf("watchtower %s responded with %T to Init",
×
641
                        towerAddr, remoteMsg)
×
642
        }
×
643

644
        // Validate Init.
645
        err = localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
4✔
646
        if err != nil {
4✔
647
                return err
×
648
        }
×
649

650
        // Send DeleteSession to tower.
651
        err = c.sendMessage(conn, &wtwire.DeleteSession{})
4✔
652
        if err != nil {
4✔
653
                return err
×
654
        }
×
655

656
        // Receive DeleteSessionReply from tower.
657
        remoteMsg, err = c.readMessage(conn)
4✔
658
        if err != nil {
4✔
659
                return err
×
660
        }
×
661

662
        deleteSessionReply, ok := remoteMsg.(*wtwire.DeleteSessionReply)
4✔
663
        if !ok {
4✔
664
                return fmt.Errorf("watchtower %s responded with %T to "+
×
665
                        "DeleteSession", towerAddr, remoteMsg)
×
666
        }
×
667

668
        switch deleteSessionReply.Code {
4✔
669
        case wtwire.CodeOK, wtwire.DeleteSessionCodeNotFound:
4✔
670
                return nil
4✔
671
        default:
×
672
                return fmt.Errorf("received error code %v in "+
×
673
                        "DeleteSessionReply when attempting to delete "+
×
674
                        "session from tower", deleteSessionReply.Code)
×
675
        }
676
}
677

678
// backupDispatcher processes events coming from the taskPipeline and is
679
// responsible for detecting when the client needs to renegotiate a session to
680
// fulfill continuing demand. The event loop exits if the client is quit.
681
//
682
// NOTE: This method MUST be run as a goroutine.
683
func (c *client) backupDispatcher() {
36✔
684
        defer c.wg.Done()
36✔
685

36✔
686
        c.log.Tracef("Starting backup dispatcher")
36✔
687
        defer c.log.Tracef("Stopping backup dispatcher")
36✔
688

36✔
689
        for {
728✔
690
                switch {
692✔
691

692
                // No active session queue and no additional sessions.
693
                case c.sessionQueue == nil && len(c.candidateSessions) == 0:
79✔
694
                        c.log.Infof("Requesting new session.")
79✔
695

79✔
696
                        // Immediately request a new session.
79✔
697
                        c.negotiator.RequestSession()
79✔
698

79✔
699
                        // Wait until we receive the newly negotiated session.
79✔
700
                        // All backups sent in the meantime are queued in the
79✔
701
                        // revoke queue, as we cannot process them.
79✔
702
                awaitSession:
79✔
703
                        select {
79✔
704
                        case session := <-c.negotiator.NewSessions():
73✔
705
                                c.log.Infof("Acquired new session with id=%s",
73✔
706
                                        session.ID)
73✔
707
                                c.candidateSessions[session.ID] = session
73✔
708
                                c.stats.sessionAcquired()
73✔
709

73✔
710
                                // We'll continue to choose the newly negotiated
73✔
711
                                // session as our active session queue.
73✔
712
                                continue
73✔
713

714
                        case <-c.statTicker.C:
×
715
                                c.log.Infof("Client stats: %s", c.stats)
×
716

717
                        // A new tower has been requested to be added. We'll
718
                        // update our persisted and in-memory state and consider
719
                        // its corresponding sessions, if any, as new
720
                        // candidates.
721
                        case msg := <-c.newTowers:
37✔
722
                                msg.errChan <- c.handleNewTower(msg.tower)
37✔
723

724
                        // A tower has been requested to be removed. We'll
725
                        // only allow removal of it if the address in question
726
                        // is not currently being used for session negotiation.
727
                        case msg := <-c.staleTowers:
4✔
728
                                msg.errChan <- c.handleStaleTower(msg)
4✔
729

730
                        // A tower has been requested to be de-activated. We'll
731
                        // only allow this if the tower is not currently being
732
                        // used for session negotiation.
733
                        case msg := <-c.deactivateTowers:
×
734
                                msg.errChan <- c.handleDeactivateTower(msg)
×
735

736
                        // A request has come through to terminate a session.
737
                        case msg := <-c.terminateSessions:
×
738
                                msg.errChan <- c.handleTerminateSession(msg)
×
739

740
                        case <-c.quit:
6✔
741
                                return
6✔
742
                        }
743

744
                        // Instead of looping, we'll jump back into the select
745
                        // case and await the delivery of the session to prevent
746
                        // us from re-requesting additional sessions.
747
                        goto awaitSession
41✔
748

749
                // No active session queue but have additional sessions.
750
                case c.sessionQueue == nil && len(c.candidateSessions) > 0:
83✔
751
                        // We've exhausted the prior session, we'll pop another
83✔
752
                        // from the remaining sessions and continue processing
83✔
753
                        // backup tasks.
83✔
754
                        var err error
83✔
755
                        c.sessionQueue, err = c.nextSessionQueue()
83✔
756
                        if err != nil {
83✔
757
                                c.log.Errorf("error fetching next session "+
×
758
                                        "queue: %v", err)
×
759
                        }
×
760

761
                        if c.sessionQueue != nil {
166✔
762
                                c.log.Debugf("Loaded next candidate session "+
83✔
763
                                        "queue id=%s", c.sessionQueue.ID())
83✔
764
                        }
83✔
765

766
                // Have active session queue, process backups.
767
                case c.sessionQueue != nil:
530✔
768
                        if c.prevTask != nil {
531✔
769
                                c.processTask(c.prevTask)
1✔
770

1✔
771
                                // Continue to ensure the sessionQueue is
1✔
772
                                // properly initialized before attempting to
1✔
773
                                // process more tasks from the pipeline.
1✔
774
                                continue
1✔
775
                        }
776

777
                        // Normal operation where new tasks are read from the
778
                        // pipeline.
779
                        select {
529✔
780

781
                        // If any sessions are negotiated while we have an
782
                        // active session queue, queue them for future use.
783
                        // This shouldn't happen with the current design, so
784
                        // it doesn't hurt to select here just in case. In the
785
                        // future, we will likely allow more asynchrony so that
786
                        // we can request new sessions before the session is
787
                        // fully empty, which this case would handle.
788
                        case session := <-c.negotiator.NewSessions():
×
789
                                c.log.Warnf("Acquired new session with id=%s "+
×
790
                                        "while processing tasks", session.ID)
×
791
                                c.candidateSessions[session.ID] = session
×
792
                                c.stats.sessionAcquired()
×
793

794
                        case <-c.statTicker.C:
×
795
                                c.log.Infof("Client stats: %s", c.stats)
×
796

797
                        // Process each backup task serially from the queue of
798
                        // revoked states.
799
                        case task, ok := <-c.pipeline.NextBackupID():
479✔
800
                                // All backups in the pipeline have been
479✔
801
                                // processed, it is now safe to exit.
479✔
802
                                if !ok {
479✔
803
                                        return
×
804
                                }
×
805

806
                                c.log.Debugf("Processing %v", task)
479✔
807

479✔
808
                                c.stats.taskReceived()
479✔
809
                                c.processTask(task)
479✔
810

811
                        // A new tower has been requested to be added. We'll
812
                        // update our persisted and in-memory state and consider
813
                        // its corresponding sessions, if any, as new
814
                        // candidates.
815
                        case msg := <-c.newTowers:
10✔
816
                                msg.errChan <- c.handleNewTower(msg.tower)
10✔
817

818
                        // A tower has been removed, so we'll remove certain
819
                        // information that's persisted and also in our
820
                        // in-memory state depending on the request, and set any
821
                        // of its corresponding candidate sessions as inactive.
822
                        case msg := <-c.staleTowers:
7✔
823
                                msg.errChan <- c.handleStaleTower(msg)
7✔
824

825
                        // A tower has been requested to be de-activated.
826
                        case msg := <-c.deactivateTowers:
2✔
827
                                msg.errChan <- c.handleDeactivateTower(msg)
2✔
828

829
                        // A request has come through to terminate a session.
830
                        case msg := <-c.terminateSessions:
1✔
831
                                msg.errChan <- c.handleTerminateSession(msg)
1✔
832

833
                        case <-c.quit:
30✔
834
                                return
30✔
835
                        }
836
                }
837
        }
838
}
839

840
// processTask attempts to schedule the given backupTask on the active
841
// sessionQueue. The task will either be accepted or rejected, after which the
842
// appropriate modifications to the client's state machine will be made. After
843
// every invocation of processTask, the caller should ensure that the
844
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
845
// that are rejected because the active sessionQueue is full will be cached as
846
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
847
func (c *client) processTask(task *wtdb.BackupID) {
480✔
848
        script, ok := c.cfg.getSweepScript(task.ChanID)
480✔
849
        if !ok {
480✔
850
                log.Infof("not processing task for unregistered channel: %s",
×
851
                        task.ChanID)
×
852

×
853
                return
×
854
        }
×
855

856
        backupTask := newBackupTask(*task, script)
480✔
857

480✔
858
        status, accepted := c.sessionQueue.AcceptTask(backupTask)
480✔
859
        if accepted {
954✔
860
                c.taskAccepted(task, status)
474✔
861
        } else {
480✔
862
                c.taskRejected(task, status)
6✔
863
        }
6✔
864
}
865

866
// taskAccepted processes the acceptance of a task by a sessionQueue depending
867
// on the state the sessionQueue is in *after* the task is added. The client's
868
// prevTask is always removed as a result of this call. The client's
869
// sessionQueue will be removed if accepting the task left the sessionQueue in
870
// an exhausted state.
871
func (c *client) taskAccepted(task *wtdb.BackupID,
872
        newStatus sessionQueueStatus) {
474✔
873

474✔
874
        c.log.Infof("Queued %v successfully for session %v", task,
474✔
875
                c.sessionQueue.ID())
474✔
876

474✔
877
        c.stats.taskAccepted()
474✔
878

474✔
879
        // If this task was accepted, we discard anything held in the prevTask.
474✔
880
        // Either it was nil before, or is the task which was just accepted.
474✔
881
        c.prevTask = nil
474✔
882

474✔
883
        switch newStatus {
474✔
884

885
        // The sessionQueue still has capacity after accepting this task.
886
        case sessionQueueAvailable:
431✔
887

888
        // The sessionQueue is full after accepting this task, so we will need
889
        // to request a new one before proceeding.
890
        case sessionQueueExhausted:
43✔
891
                c.stats.sessionExhausted()
43✔
892

43✔
893
                c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
43✔
894

43✔
895
                // This task left the session exhausted, set it to nil and
43✔
896
                // proceed to the next loop, so we can consume another
43✔
897
                // pre-negotiated session or request another.
43✔
898
                c.sessionQueue = nil
43✔
899
        }
900
}
901

902
// taskRejected process the rejection of a task by a sessionQueue depending on
903
// the state the was in *before* the task was rejected. The client's prevTask
904
// will cache the task if the sessionQueue was exhausted beforehand, and nil
905
// the sessionQueue to find a new session. If the sessionQueue was not
906
// exhausted and not shutting down, the client marks the task as ineligible, as
907
// this implies we couldn't construct a valid justice transaction given the
908
// session's policy.
909
func (c *client) taskRejected(task *wtdb.BackupID,
910
        curStatus sessionQueueStatus) {
6✔
911

6✔
912
        switch curStatus {
6✔
913

914
        // The sessionQueue has available capacity but the task was rejected,
915
        // this indicates that the task was ineligible for backup.
916
        case sessionQueueAvailable:
5✔
917
                c.stats.taskIneligible()
5✔
918

5✔
919
                c.log.Infof("Ignoring ineligible %v", task)
5✔
920

5✔
921
                err := c.cfg.DB.MarkBackupIneligible(
5✔
922
                        task.ChanID, task.CommitHeight,
5✔
923
                )
5✔
924
                if err != nil {
5✔
925
                        c.log.Errorf("Unable to mark %v ineligible: %v",
×
926
                                task, err)
×
927

×
928
                        // It is safe to not handle this error, even if we could
×
929
                        // not persist the result. At worst, this task may be
×
930
                        // reprocessed on a subsequent start up, and will either
×
931
                        // succeed do a change in session parameters or fail in
×
932
                        // the same manner.
×
933
                }
×
934

935
                // If this task was rejected *and* the session had available
936
                // capacity, we discard anything held in the prevTask. Either it
937
                // was nil before, or is the task which was just rejected.
938
                c.prevTask = nil
5✔
939

940
        // The sessionQueue rejected the task because it is full, we will stash
941
        // this task and try to add it to the next available sessionQueue.
942
        case sessionQueueExhausted:
1✔
943
                c.stats.sessionExhausted()
1✔
944

1✔
945
                c.log.Debugf("Session %v exhausted, %v queued for next session",
1✔
946
                        c.sessionQueue.ID(), task)
1✔
947

1✔
948
                // Cache the task that we pulled off, so that we can process it
1✔
949
                // once a new session queue is available.
1✔
950
                c.sessionQueue = nil
1✔
951
                c.prevTask = task
1✔
952

953
        // The sessionQueue rejected the task because it is shutting down. We
954
        // will stash this task and try to add it to the next available
955
        // sessionQueue.
956
        case sessionQueueShuttingDown:
×
957
                c.log.Debugf("Session %v is shutting down, %v queued for "+
×
958
                        "next session", c.sessionQueue.ID(), task)
×
959

×
960
                // Cache the task that we pulled off, so that we can process it
×
961
                // once a new session queue is available.
×
962
                c.sessionQueue = nil
×
963
                c.prevTask = task
×
964
        }
965
}
966

967
// dial connects the peer at addr using privKey as our secret key for the
968
// connection. The connection will use the configured Net's resolver to resolve
969
// the address for either Tor or clear net connections.
970
func (c *client) dial(localKey keychain.SingleKeyECDH,
971
        addr *lnwire.NetAddress) (wtserver.Peer, error) {
444✔
972

444✔
973
        return c.cfg.AuthDial(localKey, addr, c.cfg.Dial)
444✔
974
}
444✔
975

976
// readMessage receives and parses the next message from the given Peer. An
977
// error is returned if a message is not received before the server's read
978
// timeout, the read off the wire failed, or the message could not be
979
// deserialized.
980
func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
1,224✔
981
        // Set a read timeout to ensure we drop the connection if nothing is
1,224✔
982
        // received in a timely manner.
1,224✔
983
        err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
1,224✔
984
        if err != nil {
1,224✔
985
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
986
                c.log.Errorf("Unable to read msg: %v", err)
×
987
                return nil, err
×
988
        }
×
989

990
        // Pull the next message off the wire,
991
        rawMsg, err := peer.ReadNextMessage()
1,224✔
992
        if err != nil {
1,473✔
993
                err = fmt.Errorf("unable to read message: %w", err)
249✔
994
                c.log.Errorf("Unable to read msg: %v", err)
249✔
995
                return nil, err
249✔
996
        }
249✔
997

998
        // Parse the received message according to the watchtower wire
999
        // specification.
1000
        msgReader := bytes.NewReader(rawMsg)
975✔
1001
        msg, err := wtwire.ReadMessage(msgReader, 0)
975✔
1002
        if err != nil {
975✔
1003
                err = fmt.Errorf("unable to parse message: %w", err)
×
1004
                c.log.Errorf("Unable to read msg: %v", err)
×
1005
                return nil, err
×
1006
        }
×
1007

1008
        c.logMessage(peer, msg, true)
975✔
1009

975✔
1010
        return msg, nil
975✔
1011
}
1012

1013
// sendMessage sends a watchtower wire message to the target peer.
1014
func (c *client) sendMessage(peer wtserver.Peer,
1015
        msg wtwire.Message) error {
1,225✔
1016

1,225✔
1017
        // Encode the next wire message into the buffer.
1,225✔
1018
        // TODO(conner): use buffer pool
1,225✔
1019
        var b bytes.Buffer
1,225✔
1020
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,225✔
1021
        if err != nil {
1,225✔
1022
                err = fmt.Errorf("unable to encode msg: %w", err)
×
1023
                c.log.Errorf("Unable to send msg: %v", err)
×
1024
                return err
×
1025
        }
×
1026

1027
        // Set the write deadline for the connection, ensuring we drop the
1028
        // connection if nothing is sent in a timely manner.
1029
        err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
1,225✔
1030
        if err != nil {
1,225✔
1031
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
1032
                c.log.Errorf("Unable to send msg: %v", err)
×
1033
                return err
×
1034
        }
×
1035

1036
        c.logMessage(peer, msg, false)
1,225✔
1037

1,225✔
1038
        // Write out the full message to the remote peer.
1,225✔
1039
        _, err = peer.Write(b.Bytes())
1,225✔
1040
        if err != nil {
1,226✔
1041
                c.log.Errorf("Unable to send msg: %v", err)
1✔
1042
        }
1✔
1043
        return err
1,225✔
1044
}
1045

1046
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
1047
// database and supplying it with the resources needed by the client.
1048
func (c *client) newSessionQueue(s *ClientSession,
1049
        updates []wtdb.CommittedUpdate) *sessionQueue {
80✔
1050

80✔
1051
        return newSessionQueue(&sessionQueueConfig{
80✔
1052
                ClientSession:          s,
80✔
1053
                ChainHash:              c.cfg.ChainHash,
80✔
1054
                Dial:                   c.dial,
80✔
1055
                ReadMessage:            c.readMessage,
80✔
1056
                SendMessage:            c.sendMessage,
80✔
1057
                Signer:                 c.cfg.Signer,
80✔
1058
                DB:                     c.cfg.DB,
80✔
1059
                MinBackoff:             c.cfg.MinBackoff,
80✔
1060
                MaxBackoff:             c.cfg.MaxBackoff,
80✔
1061
                Log:                    c.log,
80✔
1062
                BuildBreachRetribution: c.cfg.BuildBreachRetribution,
80✔
1063
                TaskPipeline:           c.pipeline,
80✔
1064
        }, updates)
80✔
1065
}
80✔
1066

1067
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
1068
// passed ClientSession. If it exists, the active sessionQueue is returned.
1069
// Otherwise, a new sessionQueue is initialized and added to the set.
1070
func (c *client) getOrInitActiveQueue(s *ClientSession,
1071
        updates []wtdb.CommittedUpdate) *sessionQueue {
83✔
1072

83✔
1073
        if sq, ok := c.activeSessions.Get(s.ID); ok {
88✔
1074
                return sq
5✔
1075
        }
5✔
1076

1077
        return c.initActiveQueue(s, updates)
78✔
1078
}
1079

1080
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
1081
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
1082
// so that it can deliver any committed updates or begin accepting newly
1083
// assigned tasks.
1084
func (c *client) initActiveQueue(s *ClientSession,
1085
        updates []wtdb.CommittedUpdate) *sessionQueue {
80✔
1086

80✔
1087
        // Initialize the session queue, providing it with all the resources it
80✔
1088
        // requires from the client instance.
80✔
1089
        sq := c.newSessionQueue(s, updates)
80✔
1090

80✔
1091
        // Add the session queue as an active session so that we remember to
80✔
1092
        // stop it on shutdown. This method will also start the queue so that it
80✔
1093
        // can be active in processing newly assigned tasks or to upload
80✔
1094
        // previously committed updates.
80✔
1095
        c.activeSessions.AddAndStart(sq)
80✔
1096

80✔
1097
        return sq
80✔
1098
}
80✔
1099

1100
// terminateSession sets the given session's status to CSessionTerminal meaning
1101
// that it will not be used again.
1102
func (c *client) terminateSession(id wtdb.SessionID) error {
1✔
1103
        errChan := make(chan error, 1)
1✔
1104

1✔
1105
        select {
1✔
1106
        case c.terminateSessions <- &terminateSessMsg{
1107
                id:      id,
1108
                errChan: errChan,
1109
        }:
1✔
1110
        case <-c.pipeline.quit:
×
1111
                return ErrClientExiting
×
1112
        }
1113

1114
        select {
1✔
1115
        case err := <-errChan:
1✔
1116
                return err
1✔
1117
        case <-c.pipeline.quit:
×
1118
                return ErrClientExiting
×
1119
        }
1120
}
1121

1122
// handleTerminateSession handles a request to terminate a session. It will
1123
// first shut down the session if it is part of the active session set, then
1124
// it will ensure that the active session queue is set reset if it is using the
1125
// session in question. Finally, the session's status in the DB will be updated.
1126
func (c *client) handleTerminateSession(msg *terminateSessMsg) error {
1✔
1127
        id := msg.id
1✔
1128

1✔
1129
        delete(c.candidateSessions, id)
1✔
1130

1✔
1131
        err := c.activeSessions.StopAndRemove(id, true)
1✔
1132
        if err != nil {
1✔
1133
                return fmt.Errorf("could not stop session %s: %w", id, err)
×
1134
        }
×
1135

1136
        // If our active session queue corresponds to the session being
1137
        // terminated, then we'll proceed to negotiate a new one.
1138
        if c.sessionQueue != nil {
2✔
1139
                if bytes.Equal(c.sessionQueue.ID()[:], id[:]) {
2✔
1140
                        c.sessionQueue = nil
1✔
1141
                }
1✔
1142
        }
1143

1144
        return nil
1✔
1145
}
1146

1147
// deactivateTower sends a tower deactivation request to the backupDispatcher
1148
// where it will be handled synchronously. The request should result in all the
1149
// sessions that we have with the given tower being shutdown and removed from
1150
// our in-memory set of active sessions.
1151
func (c *client) deactivateTower(id wtdb.TowerID,
1152
        pubKey *btcec.PublicKey) error {
2✔
1153

2✔
1154
        errChan := make(chan error, 1)
2✔
1155

2✔
1156
        select {
2✔
1157
        case c.deactivateTowers <- &deactivateTowerMsg{
1158
                id:      id,
1159
                pubKey:  pubKey,
1160
                errChan: errChan,
1161
        }:
2✔
1162
        case <-c.pipeline.quit:
×
1163
                return ErrClientExiting
×
1164
        }
1165

1166
        select {
2✔
1167
        case err := <-errChan:
2✔
1168
                return err
2✔
1169
        case <-c.pipeline.quit:
×
1170
                return ErrClientExiting
×
1171
        }
1172
}
1173

1174
// handleDeactivateTower handles a request to deactivate a tower. We will remove
1175
// it from the in-memory candidate set, and we will also stop any active
1176
// sessions we have with this tower.
1177
func (c *client) handleDeactivateTower(msg *deactivateTowerMsg) error {
2✔
1178
        // Remove the tower from our in-memory candidate set so that it is not
2✔
1179
        // used for any new session negotiations.
2✔
1180
        err := c.candidateTowers.RemoveCandidate(msg.id, nil)
2✔
1181
        if err != nil {
2✔
1182
                return err
×
1183
        }
×
1184

1185
        pubKey := msg.pubKey.SerializeCompressed()
2✔
1186
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
2✔
1187
        if err != nil {
2✔
1188
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1189
                        "%v", pubKey, err)
×
1190
        }
×
1191

1192
        // Iterate over all the sessions we have for this tower and remove them
1193
        // from our candidate set and also from our set of started, active
1194
        // sessions.
1195
        for sessionID := range sessions {
5✔
1196
                delete(c.candidateSessions, sessionID)
3✔
1197

3✔
1198
                err = c.activeSessions.StopAndRemove(sessionID, false)
3✔
1199
                if err != nil {
3✔
1200
                        return fmt.Errorf("could not stop session %s: %w",
×
1201
                                sessionID, err)
×
1202
                }
×
1203
        }
1204

1205
        // If our active session queue corresponds to the stale tower, we'll
1206
        // proceed to negotiate a new one.
1207
        if c.sessionQueue != nil {
4✔
1208
                towerKey := c.sessionQueue.tower.IdentityKey
2✔
1209

2✔
1210
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
4✔
1211
                        c.sessionQueue = nil
2✔
1212
                }
2✔
1213
        }
1214

1215
        return nil
2✔
1216
}
1217

1218
// addTower adds a new watchtower reachable at the given address and considers
1219
// it for new sessions. If the watchtower already exists, then any new addresses
1220
// included will be considered when dialing it for session negotiations and
1221
// backups.
1222
func (c *client) addTower(tower *Tower) error {
47✔
1223
        errChan := make(chan error, 1)
47✔
1224

47✔
1225
        select {
47✔
1226
        case c.newTowers <- &newTowerMsg{
1227
                tower:   tower,
1228
                errChan: errChan,
1229
        }:
47✔
1230
        case <-c.pipeline.quit:
×
1231
                return ErrClientExiting
×
1232
        }
1233

1234
        select {
47✔
1235
        case err := <-errChan:
47✔
1236
                return err
47✔
1237
        case <-c.pipeline.quit:
×
1238
                return ErrClientExiting
×
1239
        }
1240
}
1241

1242
// handleNewTower handles a request for a new tower to be added. If the tower
1243
// already exists, then its corresponding sessions, if any, will be set
1244
// considered as candidates.
1245
func (c *client) handleNewTower(tower *Tower) error {
47✔
1246
        c.candidateTowers.AddCandidate(tower)
47✔
1247

47✔
1248
        // Include all of its corresponding sessions to our set of candidates.
47✔
1249
        sessions, err := getClientSessions(
47✔
1250
                c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID,
47✔
1251
                wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
47✔
1252
                wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
47✔
1253
        )
47✔
1254
        if err != nil {
47✔
1255
                return fmt.Errorf("unable to determine sessions for tower %x: "+
×
1256
                        "%v", tower.IdentityKey.SerializeCompressed(), err)
×
1257
        }
×
1258
        for id, session := range sessions {
58✔
1259
                c.candidateSessions[id] = session
11✔
1260
        }
11✔
1261

1262
        return nil
47✔
1263
}
1264

1265
// removeTower removes a watchtower from being considered for future session
1266
// negotiations and from being used for any subsequent backups until it's added
1267
// again. If an address is provided, then this call only serves as a way of
1268
// removing the address from the watchtower instead.
1269
func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey,
1270
        addr net.Addr) error {
11✔
1271

11✔
1272
        errChan := make(chan error, 1)
11✔
1273

11✔
1274
        select {
11✔
1275
        case c.staleTowers <- &staleTowerMsg{
1276
                id:      id,
1277
                pubKey:  pubKey,
1278
                addr:    addr,
1279
                errChan: errChan,
1280
        }:
11✔
1281
        case <-c.pipeline.quit:
×
1282
                return ErrClientExiting
×
1283
        }
1284

1285
        select {
11✔
1286
        case err := <-errChan:
11✔
1287
                return err
11✔
1288
        case <-c.pipeline.quit:
×
1289
                return ErrClientExiting
×
1290
        }
1291
}
1292

1293
// handleStaleTower handles a request for an existing tower to be removed. If
1294
// none of the tower's sessions have pending updates, then they will become
1295
// inactive and removed as candidates. If the active session queue corresponds
1296
// to any of these sessions, a new one will be negotiated.
1297
func (c *client) handleStaleTower(msg *staleTowerMsg) error {
11✔
1298
        // We'll first update our in-memory state.
11✔
1299
        err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr)
11✔
1300
        if err != nil {
12✔
1301
                return err
1✔
1302
        }
1✔
1303

1304
        // If an address was provided, then we're only meant to remove the
1305
        // address from the tower.
1306
        if msg.addr != nil {
12✔
1307
                return nil
2✔
1308
        }
2✔
1309

1310
        // Otherwise, the tower should no longer be used for future session
1311
        // negotiations and backups.
1312

1313
        pubKey := msg.pubKey.SerializeCompressed()
8✔
1314
        sessions, err := c.cfg.DB.ListClientSessions(&msg.id)
8✔
1315
        if err != nil {
8✔
1316
                return fmt.Errorf("unable to retrieve sessions for tower %x: "+
×
1317
                        "%v", pubKey, err)
×
1318
        }
×
1319
        for sessionID := range sessions {
17✔
1320
                delete(c.candidateSessions, sessionID)
9✔
1321

9✔
1322
                // Shutdown the session so that any pending updates are
9✔
1323
                // replayed back onto the main task pipeline.
9✔
1324
                err = c.activeSessions.StopAndRemove(sessionID, true)
9✔
1325
                if err != nil {
9✔
1326
                        c.log.Errorf("could not stop session %s: %w", sessionID,
×
1327
                                err)
×
1328
                }
×
1329
        }
1330

1331
        // If our active session queue corresponds to the stale tower, we'll
1332
        // proceed to negotiate a new one.
1333
        if c.sessionQueue != nil {
14✔
1334
                towerKey := c.sessionQueue.tower.IdentityKey
6✔
1335

6✔
1336
                if bytes.Equal(pubKey, towerKey.SerializeCompressed()) {
12✔
1337
                        c.sessionQueue = nil
6✔
1338
                }
6✔
1339
        }
1340

1341
        return nil
8✔
1342
}
1343

1344
// registeredTowers retrieves the list of watchtowers registered with the
1345
// client.
1346
func (c *client) registeredTowers(towers []*wtdb.Tower,
1347
        opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) {
×
1348

×
1349
        // Generate a filter that will fetch all the client's sessions
×
1350
        // regardless of if they are active or not.
×
1351
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
×
1352

×
1353
        clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...)
×
1354
        if err != nil {
×
1355
                return nil, err
×
1356
        }
×
1357

1358
        // Construct a lookup map that coalesces all the sessions for a specific
1359
        // watchtower.
1360
        towerSessions := make(
×
1361
                map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
×
1362
        )
×
1363
        for id, s := range clientSessions {
×
1364
                sessions, ok := towerSessions[s.TowerID]
×
1365
                if !ok {
×
1366
                        sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
×
1367
                        towerSessions[s.TowerID] = sessions
×
1368
                }
×
1369
                sessions[id] = s
×
1370
        }
1371

1372
        registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
×
1373
        for _, tower := range towers {
×
1374
                isActive := c.candidateTowers.IsActive(tower.ID)
×
1375
                registeredTowers = append(registeredTowers, &RegisteredTower{
×
1376
                        Tower:                  tower,
×
1377
                        Sessions:               towerSessions[tower.ID],
×
1378
                        ActiveSessionCandidate: isActive,
×
1379
                })
×
1380
        }
×
1381

1382
        return registeredTowers, nil
×
1383
}
1384

1385
// lookupTower retrieves the info of sessions held with the given tower handled
1386
// by this client.
1387
func (c *client) lookupTower(tower *wtdb.Tower,
1388
        opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) {
2✔
1389

2✔
1390
        opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false)))
2✔
1391

2✔
1392
        towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...)
2✔
1393
        if err != nil {
2✔
1394
                return nil, err
×
1395
        }
×
1396

1397
        return &RegisteredTower{
2✔
1398
                Tower:                  tower,
2✔
1399
                Sessions:               towerSessions,
2✔
1400
                ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
2✔
1401
        }, nil
2✔
1402
}
1403

1404
// getStats returns the in-memory statistics of the client since startup.
UNCOV
1405
func (c *client) getStats() ClientStats {
×
UNCOV
1406
        return c.stats.getStatsCopy()
×
UNCOV
1407
}
×
1408

1409
// policy returns the active client policy configuration.
1410
func (c *client) policy() wtpolicy.Policy {
6✔
1411
        return c.cfg.Policy
6✔
1412
}
6✔
1413

1414
// logMessage writes information about a message received from a remote peer,
1415
// using directional prepositions to signal whether the message was sent or
1416
// received.
1417
func (c *client) logMessage(
1418
        peer wtserver.Peer, msg wtwire.Message, read bool) {
2,200✔
1419

2,200✔
1420
        var action = "Received"
2,200✔
1421
        var preposition = "from"
2,200✔
1422
        if !read {
3,425✔
1423
                action = "Sending"
1,225✔
1424
                preposition = "to"
1,225✔
1425
        }
1,225✔
1426

1427
        summary := wtwire.MessageSummary(msg)
2,200✔
1428
        if len(summary) > 0 {
3,529✔
1429
                summary = "(" + summary + ")"
1,329✔
1430
        }
1,329✔
1431

1432
        c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,200✔
1433
                preposition, peer.RemotePub().SerializeCompressed(),
2,200✔
1434
                peer.RemoteAddr())
2,200✔
1435
}
1436

1437
func newRandomDelay(max uint32) (uint32, error) {
4✔
1438
        var maxDelay big.Int
4✔
1439
        maxDelay.SetUint64(uint64(max))
4✔
1440

4✔
1441
        randDelay, err := rand.Int(rand.Reader, &maxDelay)
4✔
1442
        if err != nil {
4✔
1443
                return 0, err
×
1444
        }
×
1445

1446
        return uint32(randDelay.Uint64()), nil
4✔
1447
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc