• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13593508312

28 Feb 2025 05:41PM UTC coverage: 58.287% (-10.4%) from 68.65%
13593508312

Pull #9458

github

web-flow
Merge d40067c0c into f1182e433
Pull Request #9458: multi+server.go: add initial permissions for some peers

346 of 548 new or added lines in 10 files covered. (63.14%)

27412 existing lines in 442 files now uncovered.

94709 of 162488 relevant lines covered (58.29%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.85
/watchtower/wtclient/manager.go
1
package wtclient
2

3
import (
4
        "errors"
5
        "fmt"
6
        "net"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/channelnotifier"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/subscribe"
19
        "github.com/lightningnetwork/lnd/tor"
20
        "github.com/lightningnetwork/lnd/watchtower/blob"
21
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
22
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
23
)
24

25
// ClientManager is the primary interface used by the daemon to control a
26
// client's lifecycle and backup revoked states.
27
type ClientManager interface {
28
        // AddTower adds a new watchtower reachable at the given address and
29
        // considers it for new sessions. If the watchtower already exists, then
30
        // any new addresses included will be considered when dialing it for
31
        // session negotiations and backups.
32
        AddTower(*lnwire.NetAddress) error
33

34
        // RemoveTower removes a watchtower from being considered for future
35
        // session negotiations and from being used for any subsequent backups
36
        // until it's added again. If an address is provided, then this call
37
        // only serves as a way of removing the address from the watchtower
38
        // instead.
39
        RemoveTower(*btcec.PublicKey, net.Addr) error
40

41
        // DeactivateTower sets the given tower's status to inactive so that it
42
        // is not considered for session negotiation. Its sessions will also not
43
        // be used while the tower is inactive.
44
        DeactivateTower(pubKey *btcec.PublicKey) error
45

46
        // TerminateSession sets the given session's status to CSessionTerminal
47
        // meaning that it will not be used again.
48
        TerminateSession(id wtdb.SessionID) error
49

50
        // Stats returns the in-memory statistics of the client since startup.
51
        Stats() ClientStats
52

53
        // Policy returns the active client policy configuration.
54
        Policy(blob.Type) (wtpolicy.Policy, error)
55

56
        // RegisteredTowers retrieves the list of watchtowers registered with
57
        // the client. It returns a set of registered towers per client policy
58
        // type.
59
        RegisteredTowers(opts ...wtdb.ClientSessionListOption) (
60
                map[blob.Type][]*RegisteredTower, error)
61

62
        // LookupTower retrieves a registered watchtower through its public key.
63
        LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) (
64
                map[blob.Type]*RegisteredTower, error)
65

66
        // RegisterChannel persistently initializes any channel-dependent
67
        // parameters within the client. This should be called during link
68
        // startup to ensure that the client is able to support the link during
69
        // operation.
70
        RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error
71

72
        // BackupState initiates a request to back up a particular revoked
73
        // state. If the method returns nil, the backup is guaranteed to be
74
        // successful unless the justice transaction would create dust outputs
75
        // when trying to abide by the negotiated policy.
76
        BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
77
}
78

79
// Config provides the client with access to the resources it requires to
80
// perform its duty. All nillable fields must be non-nil for the tower to be
81
// initialized properly.
82
type Config struct {
83
        // Signer provides access to the wallet so that the client can sign
84
        // justice transactions that spend from a remote party's commitment
85
        // transaction.
86
        Signer input.Signer
87

88
        // SubscribeChannelEvents can be used to subscribe to channel event
89
        // notifications.
90
        SubscribeChannelEvents func() (subscribe.Subscription, error)
91

92
        // FetchClosedChannel can be used to fetch the info about a closed
93
        // channel. If the channel is not found or not yet closed then
94
        // channeldb.ErrClosedChannelNotFound will be returned.
95
        FetchClosedChannel func(cid lnwire.ChannelID) (
96
                *channeldb.ChannelCloseSummary, error)
97

98
        // ChainNotifier can be used to subscribe to block notifications.
99
        ChainNotifier chainntnfs.ChainNotifier
100

101
        // BuildBreachRetribution is a function closure that allows the client
102
        // fetch the breach retribution info for a certain channel at a certain
103
        // revoked commitment height.
104
        BuildBreachRetribution BreachRetributionBuilder
105

106
        // NewAddress generates a new on-chain sweep pkscript.
107
        NewAddress func() ([]byte, error)
108

109
        // SecretKeyRing is used to derive the session keys used to communicate
110
        // with the tower. The client only stores the KeyLocators internally so
111
        // that we never store private keys on disk.
112
        SecretKeyRing ECDHKeyRing
113

114
        // Dial connects to an addr using the specified net and returns the
115
        // connection object.
116
        Dial tor.DialFunc
117

118
        // AuthDialer establishes a brontide connection over an onion or clear
119
        // network.
120
        AuthDial AuthDialer
121

122
        // DB provides access to the client's stable storage medium.
123
        DB DB
124

125
        // ChainHash identifies the chain that the client is on and for which
126
        // the tower must be watching to monitor for breaches.
127
        ChainHash chainhash.Hash
128

129
        // ReadTimeout is the duration we will wait during a read before
130
        // breaking out of a blocking read. If the value is less than or equal
131
        // to zero, the default will be used instead.
132
        ReadTimeout time.Duration
133

134
        // WriteTimeout is the duration we will wait during a write before
135
        // breaking out of a blocking write. If the value is less than or equal
136
        // to zero, the default will be used instead.
137
        WriteTimeout time.Duration
138

139
        // MinBackoff defines the initial backoff applied to connections with
140
        // watchtowers. Subsequent backoff durations will grow exponentially up
141
        // until MaxBackoff.
142
        MinBackoff time.Duration
143

144
        // MaxBackoff defines the maximum backoff applied to connections with
145
        // watchtowers. If the exponential backoff produces a timeout greater
146
        // than this value, the backoff will be clamped to MaxBackoff.
147
        MaxBackoff time.Duration
148

149
        // SessionCloseRange is the range over which we will generate a random
150
        // number of blocks to delay closing a session after its last channel
151
        // has been closed.
152
        SessionCloseRange uint32
153

154
        // MaxTasksInMemQueue is the maximum number of backup tasks that should
155
        // be kept in-memory. Any more tasks will overflow to disk.
156
        MaxTasksInMemQueue uint64
157
}
158

159
// Manager manages the various tower clients that are active. A client is
160
// required for each different commitment transaction type. The Manager acts as
161
// a tower client multiplexer.
162
type Manager struct {
163
        started sync.Once
164
        stopped sync.Once
165

166
        cfg *Config
167

168
        clients   map[blob.Type]*client
169
        clientsMu sync.Mutex
170

171
        backupMu     sync.Mutex
172
        chanInfos    wtdb.ChannelInfos
173
        chanBlobType map[lnwire.ChannelID]blob.Type
174

175
        closableSessionQueue *sessionCloseMinHeap
176

177
        wg   sync.WaitGroup
178
        quit chan struct{}
179
}
180

181
var _ ClientManager = (*Manager)(nil)
182

183
// NewManager constructs a new Manager.
184
func NewManager(config *Config, policies ...wtpolicy.Policy) (*Manager, error) {
3✔
185
        // Copy the config to prevent side effects from modifying both the
3✔
186
        // internal and external version of the Config.
3✔
187
        cfg := *config
3✔
188

3✔
189
        // Set the read timeout to the default if none was provided.
3✔
190
        if cfg.ReadTimeout <= 0 {
6✔
191
                cfg.ReadTimeout = DefaultReadTimeout
3✔
192
        }
3✔
193

194
        // Set the write timeout to the default if none was provided.
195
        if cfg.WriteTimeout <= 0 {
6✔
196
                cfg.WriteTimeout = DefaultWriteTimeout
3✔
197
        }
3✔
198

199
        chanInfos, err := cfg.DB.FetchChanInfos()
3✔
200
        if err != nil {
3✔
201
                return nil, err
×
202
        }
×
203

204
        m := &Manager{
3✔
205
                cfg:                  &cfg,
3✔
206
                clients:              make(map[blob.Type]*client),
3✔
207
                chanBlobType:         make(map[lnwire.ChannelID]blob.Type),
3✔
208
                chanInfos:            chanInfos,
3✔
209
                closableSessionQueue: newSessionCloseMinHeap(),
3✔
210
                quit:                 make(chan struct{}),
3✔
211
        }
3✔
212

3✔
213
        for _, policy := range policies {
6✔
214
                if err = policy.Validate(); err != nil {
3✔
215
                        return nil, err
×
216
                }
×
217

218
                if err = m.newClient(policy); err != nil {
3✔
219
                        return nil, err
×
220
                }
×
221
        }
222

223
        return m, nil
3✔
224
}
225

226
// newClient constructs a new client and adds it to the set of clients that
227
// the Manager is keeping track of.
228
func (m *Manager) newClient(policy wtpolicy.Policy) error {
3✔
229
        m.clientsMu.Lock()
3✔
230
        defer m.clientsMu.Unlock()
3✔
231

3✔
232
        _, ok := m.clients[policy.BlobType]
3✔
233
        if ok {
3✔
234
                return fmt.Errorf("a client with blob type %s has "+
×
235
                        "already been registered", policy.BlobType)
×
236
        }
×
237

238
        cfg := &clientCfg{
3✔
239
                Config:         m.cfg,
3✔
240
                Policy:         policy,
3✔
241
                getSweepScript: m.getSweepScript,
3✔
242
        }
3✔
243

3✔
244
        client, err := newClient(cfg)
3✔
245
        if err != nil {
3✔
246
                return err
×
247
        }
×
248

249
        m.clients[policy.BlobType] = client
3✔
250

3✔
251
        return nil
3✔
252
}
253

254
// Start starts all the clients that have been registered with the Manager.
255
func (m *Manager) Start() error {
3✔
256
        var returnErr error
3✔
257
        m.started.Do(func() {
6✔
258
                chanSub, err := m.cfg.SubscribeChannelEvents()
3✔
259
                if err != nil {
3✔
260
                        returnErr = err
×
261

×
262
                        return
×
263
                }
×
264

265
                // Iterate over the list of registered channels and check if any
266
                // of them can be marked as closed.
267
                for id := range m.chanInfos {
6✔
268
                        isClosed, closedHeight, err := m.isChannelClosed(id)
3✔
269
                        if err != nil {
3✔
270
                                returnErr = err
×
271

×
272
                                return
×
273
                        }
×
274

275
                        if !isClosed {
6✔
276
                                continue
3✔
277
                        }
278

UNCOV
279
                        _, err = m.cfg.DB.MarkChannelClosed(id, closedHeight)
×
UNCOV
280
                        if err != nil {
×
281
                                log.Errorf("could not mark channel(%s) as "+
×
282
                                        "closed: %v", id, err)
×
283

×
284
                                continue
×
285
                        }
286

287
                        // Since the channel has been marked as closed, we can
288
                        // also remove it from the channel summaries map.
UNCOV
289
                        delete(m.chanInfos, id)
×
290
                }
291

292
                // Load all closable sessions.
293
                closableSessions, err := m.cfg.DB.ListClosableSessions()
3✔
294
                if err != nil {
3✔
295
                        returnErr = err
×
296

×
297
                        return
×
298
                }
×
299

300
                err = m.trackClosableSessions(closableSessions)
3✔
301
                if err != nil {
3✔
302
                        returnErr = err
×
303

×
304
                        return
×
305
                }
×
306

307
                m.wg.Add(1)
3✔
308
                go m.handleChannelCloses(chanSub)
3✔
309

3✔
310
                // Subscribe to new block events.
3✔
311
                blockEvents, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(
3✔
312
                        nil,
3✔
313
                )
3✔
314
                if err != nil {
3✔
315
                        returnErr = err
×
316

×
317
                        return
×
318
                }
×
319

320
                m.wg.Add(1)
3✔
321
                go m.handleClosableSessions(blockEvents)
3✔
322

3✔
323
                m.clientsMu.Lock()
3✔
324
                defer m.clientsMu.Unlock()
3✔
325

3✔
326
                for _, client := range m.clients {
6✔
327
                        if err := client.start(); err != nil {
3✔
328
                                returnErr = err
×
329
                                return
×
330
                        }
×
331
                }
332
        })
333

334
        return returnErr
3✔
335
}
336

337
// Stop stops all the clients that the Manger is managing.
338
func (m *Manager) Stop() error {
3✔
339
        var returnErr error
3✔
340
        m.stopped.Do(func() {
6✔
341
                m.clientsMu.Lock()
3✔
342
                defer m.clientsMu.Unlock()
3✔
343

3✔
344
                close(m.quit)
3✔
345
                m.wg.Wait()
3✔
346

3✔
347
                for _, client := range m.clients {
6✔
348
                        if err := client.stop(); err != nil {
3✔
349
                                returnErr = err
×
350
                        }
×
351
                }
352
        })
353

354
        return returnErr
3✔
355
}
356

357
// AddTower adds a new watchtower reachable at the given address and considers
358
// it for new sessions. If the watchtower already exists, then any new addresses
359
// included will be considered when dialing it for session negotiations and
360
// backups.
361
func (m *Manager) AddTower(address *lnwire.NetAddress) error {
3✔
362
        // We'll start by updating our persisted state, followed by the
3✔
363
        // in-memory state of each client, with the new tower. This might not
3✔
364
        // actually be a new tower, but it might include a new address at which
3✔
365
        // it can be reached.
3✔
366
        dbTower, err := m.cfg.DB.CreateTower(address)
3✔
367
        if err != nil {
3✔
368
                return err
×
369
        }
×
370

371
        tower, err := NewTowerFromDBTower(dbTower)
3✔
372
        if err != nil {
3✔
373
                return err
×
374
        }
×
375

376
        m.clientsMu.Lock()
3✔
377
        defer m.clientsMu.Unlock()
3✔
378

3✔
379
        for blobType, client := range m.clients {
6✔
380
                clientType, err := blobType.Identifier()
3✔
381
                if err != nil {
3✔
382
                        return err
×
383
                }
×
384

385
                if err := client.addTower(tower); err != nil {
3✔
386
                        return fmt.Errorf("could not add tower(%x) to the %s "+
×
387
                                "tower client: %w",
×
388
                                tower.IdentityKey.SerializeCompressed(),
×
389
                                clientType, err)
×
390
                }
×
391
        }
392

393
        return nil
3✔
394
}
395

396
// RemoveTower removes a watchtower from being considered for future session
397
// negotiations and from being used for any subsequent backups until it's added
398
// again. If an address is provided, then this call only serves as a way of
399
// removing the address from the watchtower instead.
400
func (m *Manager) RemoveTower(key *btcec.PublicKey, addr net.Addr) error {
3✔
401
        // We'll load the tower before potentially removing it in order to
3✔
402
        // retrieve its ID within the database.
3✔
403
        dbTower, err := m.cfg.DB.LoadTower(key)
3✔
404
        if err != nil {
3✔
405
                return err
×
406
        }
×
407

408
        m.clientsMu.Lock()
3✔
409
        defer m.clientsMu.Unlock()
3✔
410

3✔
411
        for _, client := range m.clients {
6✔
412
                err := client.removeTower(dbTower.ID, key, addr)
3✔
413
                if err != nil {
3✔
UNCOV
414
                        return err
×
UNCOV
415
                }
×
416
        }
417

418
        if err := m.cfg.DB.RemoveTower(key, addr); err != nil {
3✔
419
                // If the persisted state update fails, re-add the address to
×
420
                // our client's in-memory state.
×
421
                tower, newTowerErr := NewTowerFromDBTower(dbTower)
×
422
                if newTowerErr != nil {
×
423
                        log.Errorf("could not create new in-memory tower: %v",
×
424
                                newTowerErr)
×
425

×
426
                        return err
×
427
                }
×
428

429
                for _, client := range m.clients {
×
430
                        addTowerErr := client.addTower(tower)
×
431
                        if addTowerErr != nil {
×
432
                                log.Errorf("could not re-add tower: %v",
×
433
                                        addTowerErr)
×
434
                        }
×
435
                }
436

437
                return err
×
438
        }
439

440
        return nil
3✔
441
}
442

443
// TerminateSession sets the given session's status to CSessionTerminal meaning
444
// that it will not be used again.
445
func (m *Manager) TerminateSession(id wtdb.SessionID) error {
3✔
446
        m.clientsMu.Lock()
3✔
447
        defer m.clientsMu.Unlock()
3✔
448

3✔
449
        for _, client := range m.clients {
6✔
450
                err := client.terminateSession(id)
3✔
451
                if err != nil {
3✔
452
                        return err
×
453
                }
×
454
        }
455

456
        // Finally, mark the session as terminated in the DB.
457
        return m.cfg.DB.TerminateSession(id)
3✔
458
}
459

460
// DeactivateTower sets the given tower's status to inactive so that it is not
461
// considered for session negotiation. Its sessions will also not be used while
462
// the tower is inactive.
463
func (m *Manager) DeactivateTower(key *btcec.PublicKey) error {
3✔
464
        // We'll load the tower in order to retrieve its ID within the database.
3✔
465
        tower, err := m.cfg.DB.LoadTower(key)
3✔
466
        if err != nil {
3✔
467
                return err
×
468
        }
×
469

470
        m.clientsMu.Lock()
3✔
471
        defer m.clientsMu.Unlock()
3✔
472

3✔
473
        for _, client := range m.clients {
6✔
474
                err := client.deactivateTower(tower.ID, tower.IdentityKey)
3✔
475
                if err != nil {
3✔
476
                        return err
×
477
                }
×
478
        }
479

480
        // Finally, mark the tower as inactive in the DB.
481
        err = m.cfg.DB.DeactivateTower(key)
3✔
482
        if err != nil {
3✔
483
                log.Errorf("Could not deactivate the tower. Re-activating. %v",
×
484
                        err)
×
485

×
486
                // If the persisted state update fails, re-add the address to
×
487
                // our client's in-memory state.
×
488
                tower, newTowerErr := NewTowerFromDBTower(tower)
×
489
                if newTowerErr != nil {
×
490
                        log.Errorf("Could not create new in-memory tower: %v",
×
491
                                newTowerErr)
×
492

×
493
                        return err
×
494
                }
×
495

496
                for _, client := range m.clients {
×
497
                        addTowerErr := client.addTower(tower)
×
498
                        if addTowerErr != nil {
×
499
                                log.Errorf("Could not re-add tower: %v",
×
500
                                        addTowerErr)
×
501
                        }
×
502
                }
503

504
                return err
×
505
        }
506

507
        return nil
3✔
508
}
509

510
// Stats returns the in-memory statistics of the clients managed by the Manager
511
// since startup.
512
func (m *Manager) Stats() ClientStats {
3✔
513
        m.clientsMu.Lock()
3✔
514
        defer m.clientsMu.Unlock()
3✔
515

3✔
516
        var resp ClientStats
3✔
517
        for _, client := range m.clients {
6✔
518
                stats := client.getStats()
3✔
519
                resp.NumTasksAccepted += stats.NumTasksAccepted
3✔
520
                resp.NumTasksIneligible += stats.NumTasksIneligible
3✔
521
                resp.NumTasksPending += stats.NumTasksPending
3✔
522
                resp.NumSessionsAcquired += stats.NumSessionsAcquired
3✔
523
                resp.NumSessionsExhausted += stats.NumSessionsExhausted
3✔
524
        }
3✔
525

526
        return resp
3✔
527
}
528

529
// RegisteredTowers retrieves the list of watchtowers being used by the various
530
// clients.
531
func (m *Manager) RegisteredTowers(opts ...wtdb.ClientSessionListOption) (
532
        map[blob.Type][]*RegisteredTower, error) {
×
533

×
534
        towers, err := m.cfg.DB.ListTowers(nil)
×
535
        if err != nil {
×
536
                return nil, err
×
537
        }
×
538

539
        m.clientsMu.Lock()
×
540
        defer m.clientsMu.Unlock()
×
541

×
542
        resp := make(map[blob.Type][]*RegisteredTower)
×
543
        for _, client := range m.clients {
×
544
                towers, err := client.registeredTowers(towers, opts...)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548

549
                resp[client.policy().BlobType] = towers
×
550
        }
551

552
        return resp, nil
×
553
}
554

555
// LookupTower retrieves a registered watchtower through its public key.
556
func (m *Manager) LookupTower(key *btcec.PublicKey,
557
        opts ...wtdb.ClientSessionListOption) (map[blob.Type]*RegisteredTower,
558
        error) {
3✔
559

3✔
560
        tower, err := m.cfg.DB.LoadTower(key)
3✔
561
        if err != nil {
3✔
562
                return nil, err
×
563
        }
×
564

565
        m.clientsMu.Lock()
3✔
566
        defer m.clientsMu.Unlock()
3✔
567

3✔
568
        resp := make(map[blob.Type]*RegisteredTower)
3✔
569
        for _, client := range m.clients {
6✔
570
                tower, err := client.lookupTower(tower, opts...)
3✔
571
                if err != nil {
3✔
572
                        return nil, err
×
573
                }
×
574

575
                resp[client.policy().BlobType] = tower
3✔
576
        }
577

578
        return resp, nil
3✔
579
}
580

581
// Policy returns the active client policy configuration for the client using
582
// the given blob type.
583
func (m *Manager) Policy(blobType blob.Type) (wtpolicy.Policy, error) {
×
584
        m.clientsMu.Lock()
×
585
        defer m.clientsMu.Unlock()
×
586

×
587
        var policy wtpolicy.Policy
×
588
        client, ok := m.clients[blobType]
×
589
        if !ok {
×
590
                return policy, fmt.Errorf("no client for the given blob type")
×
591
        }
×
592

593
        return client.policy(), nil
×
594
}
595

596
// RegisterChannel persistently initializes any channel-dependent parameters
597
// within the client. This should be called during link startup to ensure that
598
// the client is able to support the link during operation.
599
func (m *Manager) RegisterChannel(id lnwire.ChannelID,
600
        chanType channeldb.ChannelType) error {
3✔
601

3✔
602
        blobType := blob.TypeFromChannel(chanType)
3✔
603

3✔
604
        m.clientsMu.Lock()
3✔
605
        if _, ok := m.clients[blobType]; !ok {
3✔
606
                m.clientsMu.Unlock()
×
607

×
608
                return fmt.Errorf("no client registered for blob type %s",
×
609
                        blobType)
×
610
        }
×
611
        m.clientsMu.Unlock()
3✔
612

3✔
613
        m.backupMu.Lock()
3✔
614
        defer m.backupMu.Unlock()
3✔
615

3✔
616
        // If a pkscript for this channel already exists, the channel has been
3✔
617
        // previously registered.
3✔
618
        if _, ok := m.chanInfos[id]; ok {
6✔
619
                // Keep track of which blob type this channel will use for
3✔
620
                // updates.
3✔
621
                m.chanBlobType[id] = blobType
3✔
622

3✔
623
                return nil
3✔
624
        }
3✔
625

626
        // Otherwise, generate a new sweep pkscript used to sweep funds for this
627
        // channel.
628
        pkScript, err := m.cfg.NewAddress()
3✔
629
        if err != nil {
3✔
630
                return err
×
631
        }
×
632

633
        // Persist the sweep pkscript so that restarts will not introduce
634
        // address inflation when the channel is reregistered after a restart.
635
        err = m.cfg.DB.RegisterChannel(id, pkScript)
3✔
636
        if err != nil {
3✔
637
                return err
×
638
        }
×
639

640
        // Finally, cache the pkscript in our in-memory cache to avoid db
641
        // lookups for the remainder of the daemon's execution.
642
        m.chanInfos[id] = &wtdb.ChannelInfo{
3✔
643
                ClientChanSummary: wtdb.ClientChanSummary{
3✔
644
                        SweepPkScript: pkScript,
3✔
645
                },
3✔
646
        }
3✔
647

3✔
648
        // Keep track of which blob type this channel will use for updates.
3✔
649
        m.chanBlobType[id] = blobType
3✔
650

3✔
651
        return nil
3✔
652
}
653

654
// BackupState initiates a request to back up a particular revoked state. If the
655
// method returns nil, the backup is guaranteed to be successful unless the
656
// justice transaction would create dust outputs when trying to abide by the
657
// negotiated policy.
658
func (m *Manager) BackupState(chanID *lnwire.ChannelID, stateNum uint64) error {
3✔
659
        select {
3✔
UNCOV
660
        case <-m.quit:
×
UNCOV
661
                return ErrClientExiting
×
662
        default:
3✔
663
        }
664

665
        // Make sure that this channel is registered with the tower client.
666
        m.backupMu.Lock()
3✔
667
        info, ok := m.chanInfos[*chanID]
3✔
668
        if !ok {
3✔
UNCOV
669
                m.backupMu.Unlock()
×
UNCOV
670

×
UNCOV
671
                return ErrUnregisteredChannel
×
UNCOV
672
        }
×
673

674
        // Ignore backups that have already been presented to the client.
675
        var duplicate bool
3✔
676
        info.MaxHeight.WhenSome(func(maxHeight uint64) {
6✔
677
                if stateNum <= maxHeight {
3✔
UNCOV
678
                        duplicate = true
×
UNCOV
679
                }
×
680
        })
681
        if duplicate {
3✔
UNCOV
682
                m.backupMu.Unlock()
×
UNCOV
683

×
UNCOV
684
                log.Debugf("Ignoring duplicate backup for chanid=%v at "+
×
UNCOV
685
                        "height=%d", chanID, stateNum)
×
UNCOV
686

×
UNCOV
687
                return nil
×
UNCOV
688
        }
×
689

690
        // This backup has a higher commit height than any known backup for this
691
        // channel. We'll update our tip so that we won't accept it again if the
692
        // link flaps.
693
        m.chanInfos[*chanID].MaxHeight = fn.Some(stateNum)
3✔
694

3✔
695
        blobType, ok := m.chanBlobType[*chanID]
3✔
696
        if !ok {
3✔
697
                m.backupMu.Unlock()
×
698

×
699
                return ErrUnregisteredChannel
×
700
        }
×
701
        m.backupMu.Unlock()
3✔
702

3✔
703
        m.clientsMu.Lock()
3✔
704
        client, ok := m.clients[blobType]
3✔
705
        if !ok {
3✔
706
                m.clientsMu.Unlock()
×
707

×
708
                return fmt.Errorf("no client registered for blob type %s",
×
709
                        blobType)
×
710
        }
×
711
        m.clientsMu.Unlock()
3✔
712

3✔
713
        return client.backupState(chanID, stateNum)
3✔
714
}
715

716
// isChanClosed can be used to check if the channel with the given ID has been
717
// closed. If it has been, the block height in which its closing transaction was
718
// mined will also be returned.
719
func (m *Manager) isChannelClosed(id lnwire.ChannelID) (bool, uint32,
720
        error) {
3✔
721

3✔
722
        chanSum, err := m.cfg.FetchClosedChannel(id)
3✔
723
        if errors.Is(err, channeldb.ErrClosedChannelNotFound) {
6✔
724
                return false, 0, nil
3✔
725
        } else if err != nil {
3✔
726
                return false, 0, err
×
727
        }
×
728

UNCOV
729
        return true, chanSum.CloseHeight, nil
×
730
}
731

732
// trackClosableSessions takes in a map of session IDs to the earliest block
733
// height at which the session should be deleted. For each of the sessions,
734
// a random delay is added to the block height and the session is added to the
735
// closableSessionQueue.
736
func (m *Manager) trackClosableSessions(
737
        sessions map[wtdb.SessionID]uint32) error {
3✔
738

3✔
739
        // For each closable session, add a random delay to its close
3✔
740
        // height and add it to the closableSessionQueue.
3✔
741
        for sID, blockHeight := range sessions {
6✔
742
                delay, err := newRandomDelay(m.cfg.SessionCloseRange)
3✔
743
                if err != nil {
3✔
744
                        return err
×
745
                }
×
746

747
                deleteHeight := blockHeight + delay
3✔
748

3✔
749
                m.closableSessionQueue.Push(&sessionCloseItem{
3✔
750
                        sessionID:    sID,
3✔
751
                        deleteHeight: deleteHeight,
3✔
752
                })
3✔
753
        }
754

755
        return nil
3✔
756
}
757

758
// handleChannelCloses listens for channel close events and marks channels as
759
// closed in the DB.
760
//
761
// NOTE: This method MUST be run as a goroutine.
762
func (m *Manager) handleChannelCloses(chanSub subscribe.Subscription) {
3✔
763
        defer m.wg.Done()
3✔
764

3✔
765
        log.Debugf("Starting channel close handler")
3✔
766
        defer log.Debugf("Stopping channel close handler")
3✔
767

3✔
768
        for {
6✔
769
                select {
3✔
770
                case update, ok := <-chanSub.Updates():
3✔
771
                        if !ok {
3✔
772
                                log.Debugf("Channel notifier has exited")
×
773
                                return
×
774
                        }
×
775

776
                        // We only care about channel-close events.
777
                        event, ok := update.(channelnotifier.ClosedChannelEvent)
3✔
778
                        if !ok {
6✔
779
                                continue
3✔
780
                        }
781

782
                        chanID := lnwire.NewChanIDFromOutPoint(
3✔
783
                                event.CloseSummary.ChanPoint,
3✔
784
                        )
3✔
785

3✔
786
                        log.Debugf("Received ClosedChannelEvent for "+
3✔
787
                                "channel: %s", chanID)
3✔
788

3✔
789
                        err := m.handleClosedChannel(
3✔
790
                                chanID, event.CloseSummary.CloseHeight,
3✔
791
                        )
3✔
792
                        if err != nil {
3✔
793
                                log.Errorf("Could not handle channel close "+
×
794
                                        "event for channel(%s): %v", chanID,
×
795
                                        err)
×
796
                        }
×
797

798
                case <-m.quit:
3✔
799
                        return
3✔
800
                }
801
        }
802
}
803

804
// handleClosedChannel handles the closure of a single channel. It will mark the
805
// channel as closed in the DB, then it will handle all the sessions that are
806
// now closable due to the channel closure.
807
func (m *Manager) handleClosedChannel(chanID lnwire.ChannelID,
808
        closeHeight uint32) error {
3✔
809

3✔
810
        m.backupMu.Lock()
3✔
811
        defer m.backupMu.Unlock()
3✔
812

3✔
813
        // We only care about channels registered with the tower client.
3✔
814
        if _, ok := m.chanInfos[chanID]; !ok {
3✔
815
                return nil
×
816
        }
×
817

818
        log.Debugf("Marking channel(%s) as closed", chanID)
3✔
819

3✔
820
        sessions, err := m.cfg.DB.MarkChannelClosed(chanID, closeHeight)
3✔
821
        if err != nil {
3✔
822
                return fmt.Errorf("could not mark channel(%s) as closed: %w",
×
823
                        chanID, err)
×
824
        }
×
825

826
        closableSessions := make(map[wtdb.SessionID]uint32, len(sessions))
3✔
827
        for _, sess := range sessions {
6✔
828
                closableSessions[sess] = closeHeight
3✔
829
        }
3✔
830

831
        log.Debugf("Tracking %d new closable sessions as a result of "+
3✔
832
                "closing channel %s", len(closableSessions), chanID)
3✔
833

3✔
834
        err = m.trackClosableSessions(closableSessions)
3✔
835
        if err != nil {
3✔
836
                return fmt.Errorf("could not track closable sessions: %w", err)
×
837
        }
×
838

839
        delete(m.chanInfos, chanID)
3✔
840

3✔
841
        return nil
3✔
842
}
843

844
// handleClosableSessions listens for new block notifications. For each block,
845
// it checks the closableSessionQueue to see if there is a closable session with
846
// a delete-height smaller than or equal to the new block, if there is then the
847
// tower is informed that it can delete the session, and then we also delete it
848
// from our DB.
849
func (m *Manager) handleClosableSessions(
850
        blocksChan *chainntnfs.BlockEpochEvent) {
3✔
851

3✔
852
        defer m.wg.Done()
3✔
853

3✔
854
        log.Debug("Starting closable sessions handler")
3✔
855
        defer log.Debug("Stopping closable sessions handler")
3✔
856

3✔
857
        for {
6✔
858
                select {
3✔
859
                case newBlock := <-blocksChan.Epochs:
3✔
860
                        if newBlock == nil {
6✔
861
                                return
3✔
862
                        }
3✔
863

864
                        height := uint32(newBlock.Height)
3✔
865
                        for {
6✔
866
                                select {
3✔
867
                                case <-m.quit:
×
868
                                        return
×
869
                                default:
3✔
870
                                }
871

872
                                // If there are no closable sessions that we
873
                                // need to handle, then we are done and can
874
                                // reevaluate when the next block comes.
875
                                item := m.closableSessionQueue.Top()
3✔
876
                                if item == nil {
6✔
877
                                        break
3✔
878
                                }
879

880
                                // If there is closable session but the delete
881
                                // height we have set for it is after the
882
                                // current block height, then our work is done.
883
                                if item.deleteHeight > height {
3✔
884
                                        break
×
885
                                }
886

887
                                // Otherwise, we pop this item from the heap
888
                                // and handle it.
889
                                m.closableSessionQueue.Pop()
3✔
890

3✔
891
                                // Fetch the session from the DB so that we can
3✔
892
                                // extract the Tower info.
3✔
893
                                sess, err := m.cfg.DB.GetClientSession(
3✔
894
                                        item.sessionID,
3✔
895
                                )
3✔
896
                                if err != nil {
3✔
897
                                        log.Errorf("error calling "+
×
898
                                                "GetClientSession for "+
×
899
                                                "session %s: %v",
×
900
                                                item.sessionID, err)
×
901

×
902
                                        continue
×
903
                                }
904

905
                                // get appropriate client.
906
                                m.clientsMu.Lock()
3✔
907
                                client, ok := m.clients[sess.Policy.BlobType]
3✔
908
                                if !ok {
3✔
909
                                        m.clientsMu.Unlock()
×
910
                                        log.Errorf("no client currently " +
×
911
                                                "active for the session type")
×
912

×
913
                                        continue
×
914
                                }
915
                                m.clientsMu.Unlock()
3✔
916

3✔
917
                                clientName, err := client.policy().BlobType.
3✔
918
                                        Identifier()
3✔
919
                                if err != nil {
3✔
920
                                        log.Errorf("could not get client "+
×
921
                                                "identifier: %v", err)
×
922

×
923
                                        continue
×
924
                                }
925

926
                                // Stop the session and remove it from the
927
                                // in-memory set.
928
                                err = client.stopAndRemoveSession(
3✔
929
                                        item.sessionID, true,
3✔
930
                                )
3✔
931
                                if err != nil {
3✔
932
                                        log.Errorf("could not remove "+
×
933
                                                "session(%s) from in-memory "+
×
934
                                                "set of the %s client: %v",
×
935
                                                item.sessionID, clientName, err)
×
936

×
937
                                        continue
×
938
                                }
939

940
                                err = client.deleteSessionFromTower(sess)
3✔
941
                                if err != nil {
3✔
942
                                        log.Errorf("error deleting "+
×
943
                                                "session %s from tower: %v",
×
944
                                                sess.ID, err)
×
945

×
946
                                        continue
×
947
                                }
948

949
                                err = m.cfg.DB.DeleteSession(item.sessionID)
3✔
950
                                if err != nil {
3✔
951
                                        log.Errorf("could not delete "+
×
952
                                                "session(%s) from DB: %w",
×
953
                                                sess.ID, err)
×
954

×
955
                                        continue
×
956
                                }
957
                        }
958

UNCOV
959
                case <-m.quit:
×
UNCOV
960
                        return
×
961
                }
962
        }
963
}
964

965
func (m *Manager) getSweepScript(id lnwire.ChannelID) ([]byte, bool) {
3✔
966
        m.backupMu.Lock()
3✔
967
        defer m.backupMu.Unlock()
3✔
968

3✔
969
        summary, ok := m.chanInfos[id]
3✔
970
        if !ok {
3✔
971
                return nil, false
×
972
        }
×
973

974
        return summary.SweepPkScript, true
3✔
975
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc