• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.85
/watchtower/wtclient/manager.go
1
package wtclient
2

3
import (
4
        "errors"
5
        "fmt"
6
        "net"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/channelnotifier"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/subscribe"
19
        "github.com/lightningnetwork/lnd/tor"
20
        "github.com/lightningnetwork/lnd/watchtower/blob"
21
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
22
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
23
)
24

25
// ClientManager is the primary interface used by the daemon to control a
26
// client's lifecycle and backup revoked states.
27
type ClientManager interface {
28
        // AddTower adds a new watchtower reachable at the given address and
29
        // considers it for new sessions. If the watchtower already exists, then
30
        // any new addresses included will be considered when dialing it for
31
        // session negotiations and backups.
32
        AddTower(*lnwire.NetAddress) error
33

34
        // RemoveTower removes a watchtower from being considered for future
35
        // session negotiations and from being used for any subsequent backups
36
        // until it's added again. If an address is provided, then this call
37
        // only serves as a way of removing the address from the watchtower
38
        // instead.
39
        RemoveTower(*btcec.PublicKey, net.Addr) error
40

41
        // DeactivateTower sets the given tower's status to inactive so that it
42
        // is not considered for session negotiation. Its sessions will also not
43
        // be used while the tower is inactive.
44
        DeactivateTower(pubKey *btcec.PublicKey) error
45

46
        // TerminateSession sets the given session's status to CSessionTerminal
47
        // meaning that it will not be used again.
48
        TerminateSession(id wtdb.SessionID) error
49

50
        // Stats returns the in-memory statistics of the client since startup.
51
        Stats() ClientStats
52

53
        // Policy returns the active client policy configuration.
54
        Policy(blob.Type) (wtpolicy.Policy, error)
55

56
        // RegisteredTowers retrieves the list of watchtowers registered with
57
        // the client. It returns a set of registered towers per client policy
58
        // type.
59
        RegisteredTowers(opts ...wtdb.ClientSessionListOption) (
60
                map[blob.Type][]*RegisteredTower, error)
61

62
        // LookupTower retrieves a registered watchtower through its public key.
63
        LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) (
64
                map[blob.Type]*RegisteredTower, error)
65

66
        // RegisterChannel persistently initializes any channel-dependent
67
        // parameters within the client. This should be called during link
68
        // startup to ensure that the client is able to support the link during
69
        // operation.
70
        RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error
71

72
        // BackupState initiates a request to back up a particular revoked
73
        // state. If the method returns nil, the backup is guaranteed to be
74
        // successful unless the justice transaction would create dust outputs
75
        // when trying to abide by the negotiated policy.
76
        BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
77
}
78

79
// Config provides the client with access to the resources it requires to
80
// perform its duty. All nillable fields must be non-nil for the tower to be
81
// initialized properly.
82
type Config struct {
83
        // Signer provides access to the wallet so that the client can sign
84
        // justice transactions that spend from a remote party's commitment
85
        // transaction.
86
        Signer input.Signer
87

88
        // SubscribeChannelEvents can be used to subscribe to channel event
89
        // notifications.
90
        SubscribeChannelEvents func() (subscribe.Subscription, error)
91

92
        // FetchClosedChannel can be used to fetch the info about a closed
93
        // channel. If the channel is not found or not yet closed then
94
        // channeldb.ErrClosedChannelNotFound will be returned.
95
        FetchClosedChannel func(cid lnwire.ChannelID) (
96
                *channeldb.ChannelCloseSummary, error)
97

98
        // ChainNotifier can be used to subscribe to block notifications.
99
        ChainNotifier chainntnfs.ChainNotifier
100

101
        // BuildBreachRetribution is a function closure that allows the client
102
        // fetch the breach retribution info for a certain channel at a certain
103
        // revoked commitment height.
104
        BuildBreachRetribution BreachRetributionBuilder
105

106
        // NewAddress generates a new on-chain sweep pkscript.
107
        NewAddress func() ([]byte, error)
108

109
        // SecretKeyRing is used to derive the session keys used to communicate
110
        // with the tower. The client only stores the KeyLocators internally so
111
        // that we never store private keys on disk.
112
        SecretKeyRing ECDHKeyRing
113

114
        // Dial connects to an addr using the specified net and returns the
115
        // connection object.
116
        Dial tor.DialFunc
117

118
        // AuthDialer establishes a brontide connection over an onion or clear
119
        // network.
120
        AuthDial AuthDialer
121

122
        // DB provides access to the client's stable storage medium.
123
        DB DB
124

125
        // ChainHash identifies the chain that the client is on and for which
126
        // the tower must be watching to monitor for breaches.
127
        ChainHash chainhash.Hash
128

129
        // ReadTimeout is the duration we will wait during a read before
130
        // breaking out of a blocking read. If the value is less than or equal
131
        // to zero, the default will be used instead.
132
        ReadTimeout time.Duration
133

134
        // WriteTimeout is the duration we will wait during a write before
135
        // breaking out of a blocking write. If the value is less than or equal
136
        // to zero, the default will be used instead.
137
        WriteTimeout time.Duration
138

139
        // MinBackoff defines the initial backoff applied to connections with
140
        // watchtowers. Subsequent backoff durations will grow exponentially up
141
        // until MaxBackoff.
142
        MinBackoff time.Duration
143

144
        // MaxBackoff defines the maximum backoff applied to connections with
145
        // watchtowers. If the exponential backoff produces a timeout greater
146
        // than this value, the backoff will be clamped to MaxBackoff.
147
        MaxBackoff time.Duration
148

149
        // SessionCloseRange is the range over which we will generate a random
150
        // number of blocks to delay closing a session after its last channel
151
        // has been closed.
152
        SessionCloseRange uint32
153

154
        // MaxTasksInMemQueue is the maximum number of backup tasks that should
155
        // be kept in-memory. Any more tasks will overflow to disk.
156
        MaxTasksInMemQueue uint64
157
}
158

159
// Manager manages the various tower clients that are active. A client is
160
// required for each different commitment transaction type. The Manager acts as
161
// a tower client multiplexer.
162
type Manager struct {
163
        started sync.Once
164
        stopped sync.Once
165

166
        cfg *Config
167

168
        clients   map[blob.Type]*client
169
        clientsMu sync.Mutex
170

171
        backupMu     sync.Mutex
172
        chanInfos    wtdb.ChannelInfos
173
        chanBlobType map[lnwire.ChannelID]blob.Type
174

175
        closableSessionQueue *sessionCloseMinHeap
176

177
        wg   sync.WaitGroup
178
        quit chan struct{}
179
}
180

181
var _ ClientManager = (*Manager)(nil)
182

183
// NewManager constructs a new Manager.
184
func NewManager(config *Config, policies ...wtpolicy.Policy) (*Manager, error) {
3✔
185
        // Copy the config to prevent side effects from modifying both the
3✔
186
        // internal and external version of the Config.
3✔
187
        cfg := *config
3✔
188

3✔
189
        // Set the read timeout to the default if none was provided.
3✔
190
        if cfg.ReadTimeout <= 0 {
6✔
191
                cfg.ReadTimeout = DefaultReadTimeout
3✔
192
        }
3✔
193

194
        // Set the write timeout to the default if none was provided.
195
        if cfg.WriteTimeout <= 0 {
6✔
196
                cfg.WriteTimeout = DefaultWriteTimeout
3✔
197
        }
3✔
198

199
        chanInfos, err := cfg.DB.FetchChanInfos()
3✔
200
        if err != nil {
3✔
201
                return nil, err
×
202
        }
×
203

204
        m := &Manager{
3✔
205
                cfg:                  &cfg,
3✔
206
                clients:              make(map[blob.Type]*client),
3✔
207
                chanBlobType:         make(map[lnwire.ChannelID]blob.Type),
3✔
208
                chanInfos:            chanInfos,
3✔
209
                closableSessionQueue: newSessionCloseMinHeap(),
3✔
210
                quit:                 make(chan struct{}),
3✔
211
        }
3✔
212

3✔
213
        for _, policy := range policies {
6✔
214
                if err = policy.Validate(); err != nil {
3✔
215
                        return nil, err
×
216
                }
×
217

218
                if err = m.newClient(policy); err != nil {
3✔
219
                        return nil, err
×
220
                }
×
221
        }
222

223
        return m, nil
3✔
224
}
225

226
// newClient constructs a new client and adds it to the set of clients that
227
// the Manager is keeping track of.
228
func (m *Manager) newClient(policy wtpolicy.Policy) error {
3✔
229
        m.clientsMu.Lock()
3✔
230
        defer m.clientsMu.Unlock()
3✔
231

3✔
232
        _, ok := m.clients[policy.BlobType]
3✔
233
        if ok {
3✔
234
                return fmt.Errorf("a client with blob type %s has "+
×
235
                        "already been registered", policy.BlobType)
×
236
        }
×
237

238
        cfg := &clientCfg{
3✔
239
                Config:         m.cfg,
3✔
240
                Policy:         policy,
3✔
241
                getSweepScript: m.getSweepScript,
3✔
242
        }
3✔
243

3✔
244
        client, err := newClient(cfg)
3✔
245
        if err != nil {
3✔
246
                return err
×
247
        }
×
248

249
        m.clients[policy.BlobType] = client
3✔
250

3✔
251
        return nil
3✔
252
}
253

254
// Start starts all the clients that have been registered with the Manager.
255
func (m *Manager) Start() error {
3✔
256
        var returnErr error
3✔
257
        m.started.Do(func() {
6✔
258
                chanSub, err := m.cfg.SubscribeChannelEvents()
3✔
259
                if err != nil {
3✔
260
                        returnErr = err
×
261

×
262
                        return
×
263
                }
×
264

265
                // Iterate over the list of registered channels and check if any
266
                // of them can be marked as closed.
267
                for id := range m.chanInfos {
6✔
268
                        isClosed, closedHeight, err := m.isChannelClosed(id)
3✔
269
                        if err != nil {
3✔
270
                                returnErr = err
×
271

×
272
                                return
×
273
                        }
×
274

275
                        if !isClosed {
6✔
276
                                continue
3✔
277
                        }
278

UNCOV
279
                        _, err = m.cfg.DB.MarkChannelClosed(id, closedHeight)
×
UNCOV
280
                        if err != nil {
×
281
                                log.Errorf("could not mark channel(%s) as "+
×
282
                                        "closed: %v", id, err)
×
283

×
284
                                continue
×
285
                        }
286

287
                        // Since the channel has been marked as closed, we can
288
                        // also remove it from the channel summaries map.
UNCOV
289
                        delete(m.chanInfos, id)
×
290
                }
291

292
                // Load all closable sessions.
293
                closableSessions, err := m.cfg.DB.ListClosableSessions()
3✔
294
                if err != nil {
3✔
295
                        returnErr = err
×
296

×
297
                        return
×
298
                }
×
299

300
                err = m.trackClosableSessions(closableSessions)
3✔
301
                if err != nil {
3✔
302
                        returnErr = err
×
303

×
304
                        return
×
305
                }
×
306

307
                m.wg.Add(1)
3✔
308
                go m.handleChannelCloses(chanSub)
3✔
309

3✔
310
                // Subscribe to new block events.
3✔
311
                blockEvents, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(
3✔
312
                        nil,
3✔
313
                )
3✔
314
                if err != nil {
3✔
315
                        returnErr = err
×
316

×
317
                        return
×
318
                }
×
319

320
                m.wg.Add(1)
3✔
321
                go m.handleClosableSessions(blockEvents)
3✔
322

3✔
323
                m.clientsMu.Lock()
3✔
324
                defer m.clientsMu.Unlock()
3✔
325

3✔
326
                for _, client := range m.clients {
6✔
327
                        if err := client.start(); err != nil {
3✔
328
                                returnErr = err
×
329
                                return
×
330
                        }
×
331
                }
332
        })
333

334
        return returnErr
3✔
335
}
336

337
// Stop stops all the clients that the Manger is managing.
338
func (m *Manager) Stop() error {
3✔
339
        var returnErr error
3✔
340
        m.stopped.Do(func() {
6✔
341
                m.clientsMu.Lock()
3✔
342
                defer m.clientsMu.Unlock()
3✔
343

3✔
344
                close(m.quit)
3✔
345
                m.wg.Wait()
3✔
346

3✔
347
                for _, client := range m.clients {
6✔
348
                        if err := client.stop(); err != nil {
3✔
349
                                returnErr = err
×
350
                        }
×
351
                }
352
        })
353

354
        return returnErr
3✔
355
}
356

357
// AddTower adds a new watchtower reachable at the given address and considers
358
// it for new sessions. If the watchtower already exists, then any new addresses
359
// included will be considered when dialing it for session negotiations and
360
// backups.
361
func (m *Manager) AddTower(address *lnwire.NetAddress) error {
3✔
362
        // We'll start by updating our persisted state, followed by the
3✔
363
        // in-memory state of each client, with the new tower. This might not
3✔
364
        // actually be a new tower, but it might include a new address at which
3✔
365
        // it can be reached.
3✔
366
        dbTower, err := m.cfg.DB.CreateTower(address)
3✔
367
        if err != nil {
3✔
368
                return err
×
369
        }
×
370

371
        tower, err := NewTowerFromDBTower(dbTower)
3✔
372
        if err != nil {
3✔
373
                return err
×
374
        }
×
375

376
        m.clientsMu.Lock()
3✔
377
        defer m.clientsMu.Unlock()
3✔
378

3✔
379
        for blobType, client := range m.clients {
6✔
380
                clientType, err := blobType.Identifier()
3✔
381
                if err != nil {
3✔
382
                        return err
×
383
                }
×
384

385
                if err := client.addTower(tower); err != nil {
3✔
386
                        return fmt.Errorf("could not add tower(%x) to the %s "+
×
387
                                "tower client: %w",
×
388
                                tower.IdentityKey.SerializeCompressed(),
×
389
                                clientType, err)
×
390
                }
×
391
        }
392

393
        return nil
3✔
394
}
395

396
// RemoveTower removes a watchtower from being considered for future session
397
// negotiations and from being used for any subsequent backups until it's added
398
// again. If an address is provided, then this call only serves as a way of
399
// removing the address from the watchtower instead.
400
func (m *Manager) RemoveTower(key *btcec.PublicKey, addr net.Addr) error {
3✔
401
        // We'll load the tower before potentially removing it in order to
3✔
402
        // retrieve its ID within the database.
3✔
403
        dbTower, err := m.cfg.DB.LoadTower(key)
3✔
404
        if err != nil {
3✔
405
                return err
×
406
        }
×
407

408
        m.clientsMu.Lock()
3✔
409
        defer m.clientsMu.Unlock()
3✔
410

3✔
411
        for _, client := range m.clients {
6✔
412
                err := client.removeTower(dbTower.ID, key, addr)
3✔
413
                if err != nil {
3✔
UNCOV
414
                        return err
×
UNCOV
415
                }
×
416
        }
417

418
        if err := m.cfg.DB.RemoveTower(key, addr); err != nil {
3✔
419
                // If the persisted state update fails, re-add the address to
×
420
                // our client's in-memory state.
×
421
                tower, newTowerErr := NewTowerFromDBTower(dbTower)
×
422
                if newTowerErr != nil {
×
423
                        log.Errorf("could not create new in-memory tower: %v",
×
424
                                newTowerErr)
×
425

×
426
                        return err
×
427
                }
×
428

429
                for _, client := range m.clients {
×
430
                        addTowerErr := client.addTower(tower)
×
431
                        if addTowerErr != nil {
×
432
                                log.Errorf("could not re-add tower: %v",
×
433
                                        addTowerErr)
×
434
                        }
×
435
                }
436

437
                return err
×
438
        }
439

440
        return nil
3✔
441
}
442

443
// TerminateSession sets the given session's status to CSessionTerminal meaning
444
// that it will not be used again.
445
func (m *Manager) TerminateSession(id wtdb.SessionID) error {
3✔
446
        m.clientsMu.Lock()
3✔
447
        defer m.clientsMu.Unlock()
3✔
448

3✔
449
        for _, client := range m.clients {
6✔
450
                err := client.terminateSession(id)
3✔
451
                if err != nil {
3✔
452
                        return err
×
453
                }
×
454
        }
455

456
        // Finally, mark the session as terminated in the DB.
457
        return m.cfg.DB.TerminateSession(id)
3✔
458
}
459

460
// DeactivateTower sets the given tower's status to inactive so that it is not
461
// considered for session negotiation. Its sessions will also not be used while
462
// the tower is inactive.
463
func (m *Manager) DeactivateTower(key *btcec.PublicKey) error {
3✔
464
        // We'll load the tower in order to retrieve its ID within the database.
3✔
465
        tower, err := m.cfg.DB.LoadTower(key)
3✔
466
        if err != nil {
3✔
467
                return err
×
468
        }
×
469

470
        m.clientsMu.Lock()
3✔
471
        defer m.clientsMu.Unlock()
3✔
472

3✔
473
        for _, client := range m.clients {
6✔
474
                err := client.deactivateTower(tower.ID, tower.IdentityKey)
3✔
475
                if err != nil {
3✔
476
                        return err
×
477
                }
×
478
        }
479

480
        // Finally, mark the tower as inactive in the DB.
481
        err = m.cfg.DB.DeactivateTower(key)
3✔
482
        if err != nil {
3✔
483
                log.Errorf("Could not deactivate the tower. Re-activating. %v",
×
484
                        err)
×
485

×
486
                // If the persisted state update fails, re-add the address to
×
487
                // our client's in-memory state.
×
488
                tower, newTowerErr := NewTowerFromDBTower(tower)
×
489
                if newTowerErr != nil {
×
490
                        log.Errorf("Could not create new in-memory tower: %v",
×
491
                                newTowerErr)
×
492

×
493
                        return err
×
494
                }
×
495

496
                for _, client := range m.clients {
×
497
                        addTowerErr := client.addTower(tower)
×
498
                        if addTowerErr != nil {
×
499
                                log.Errorf("Could not re-add tower: %v",
×
500
                                        addTowerErr)
×
501
                        }
×
502
                }
503

504
                return err
×
505
        }
506

507
        return nil
3✔
508
}
509

510
// Stats returns the in-memory statistics of the clients managed by the Manager
511
// since startup.
512
func (m *Manager) Stats() ClientStats {
3✔
513
        m.clientsMu.Lock()
3✔
514
        defer m.clientsMu.Unlock()
3✔
515

3✔
516
        var resp ClientStats
3✔
517
        for _, client := range m.clients {
6✔
518
                stats := client.getStats()
3✔
519
                resp.NumTasksAccepted += stats.NumTasksAccepted
3✔
520
                resp.NumTasksIneligible += stats.NumTasksIneligible
3✔
521
                resp.NumTasksPending += stats.NumTasksPending
3✔
522
                resp.NumSessionsAcquired += stats.NumSessionsAcquired
3✔
523
                resp.NumSessionsExhausted += stats.NumSessionsExhausted
3✔
524
        }
3✔
525

526
        return resp
3✔
527
}
528

529
// RegisteredTowers retrieves the list of watchtowers being used by the various
530
// clients.
531
func (m *Manager) RegisteredTowers(opts ...wtdb.ClientSessionListOption) (
532
        map[blob.Type][]*RegisteredTower, error) {
×
533

×
534
        towers, err := m.cfg.DB.ListTowers(nil)
×
535
        if err != nil {
×
536
                return nil, err
×
537
        }
×
538

539
        m.clientsMu.Lock()
×
540
        defer m.clientsMu.Unlock()
×
541

×
542
        resp := make(map[blob.Type][]*RegisteredTower)
×
543
        for _, client := range m.clients {
×
544
                towers, err := client.registeredTowers(towers, opts...)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548

549
                resp[client.policy().BlobType] = towers
×
550
        }
551

552
        return resp, nil
×
553
}
554

555
// LookupTower retrieves a registered watchtower through its public key.
556
func (m *Manager) LookupTower(key *btcec.PublicKey,
557
        opts ...wtdb.ClientSessionListOption) (map[blob.Type]*RegisteredTower,
558
        error) {
3✔
559

3✔
560
        tower, err := m.cfg.DB.LoadTower(key)
3✔
561
        if err != nil {
3✔
562
                return nil, err
×
563
        }
×
564

565
        m.clientsMu.Lock()
3✔
566
        defer m.clientsMu.Unlock()
3✔
567

3✔
568
        resp := make(map[blob.Type]*RegisteredTower)
3✔
569
        for _, client := range m.clients {
6✔
570
                tower, err := client.lookupTower(tower, opts...)
3✔
571
                if err != nil {
3✔
572
                        return nil, err
×
573
                }
×
574

575
                resp[client.policy().BlobType] = tower
3✔
576
        }
577

578
        return resp, nil
3✔
579
}
580

581
// Policy returns the active client policy configuration for the client using
582
// the given blob type.
583
func (m *Manager) Policy(blobType blob.Type) (wtpolicy.Policy, error) {
×
584
        m.clientsMu.Lock()
×
585
        defer m.clientsMu.Unlock()
×
586

×
587
        var policy wtpolicy.Policy
×
588
        client, ok := m.clients[blobType]
×
589
        if !ok {
×
590
                return policy, fmt.Errorf("no client for the given blob type")
×
591
        }
×
592

593
        return client.policy(), nil
×
594
}
595

596
// RegisterChannel persistently initializes any channel-dependent parameters
597
// within the client. This should be called during link startup to ensure that
598
// the client is able to support the link during operation.
599
func (m *Manager) RegisterChannel(id lnwire.ChannelID,
600
        chanType channeldb.ChannelType) error {
3✔
601

3✔
602
        blobType := blob.TypeFromChannel(chanType)
3✔
603

3✔
604
        m.clientsMu.Lock()
3✔
605
        if _, ok := m.clients[blobType]; !ok {
3✔
606
                m.clientsMu.Unlock()
×
607

×
608
                return fmt.Errorf("no client registered for blob type %s",
×
609
                        blobType)
×
610
        }
×
611
        m.clientsMu.Unlock()
3✔
612

3✔
613
        m.backupMu.Lock()
3✔
614
        defer m.backupMu.Unlock()
3✔
615

3✔
616
        // If a pkscript for this channel already exists, the channel has been
3✔
617
        // previously registered.
3✔
618
        if _, ok := m.chanInfos[id]; ok {
6✔
619
                // Keep track of which blob type this channel will use for
3✔
620
                // updates.
3✔
621
                m.chanBlobType[id] = blobType
3✔
622

3✔
623
                return nil
3✔
624
        }
3✔
625

626
        // Otherwise, generate a new sweep pkscript used to sweep funds for this
627
        // channel.
628
        pkScript, err := m.cfg.NewAddress()
3✔
629
        if err != nil {
3✔
630
                return err
×
631
        }
×
632

633
        // Persist the sweep pkscript so that restarts will not introduce
634
        // address inflation when the channel is reregistered after a restart.
635
        err = m.cfg.DB.RegisterChannel(id, pkScript)
3✔
636
        if err != nil {
3✔
637
                return err
×
638
        }
×
639

640
        // Finally, cache the pkscript in our in-memory cache to avoid db
641
        // lookups for the remainder of the daemon's execution.
642
        m.chanInfos[id] = &wtdb.ChannelInfo{
3✔
643
                ClientChanSummary: wtdb.ClientChanSummary{
3✔
644
                        SweepPkScript: pkScript,
3✔
645
                },
3✔
646
        }
3✔
647

3✔
648
        // Keep track of which blob type this channel will use for updates.
3✔
649
        m.chanBlobType[id] = blobType
3✔
650

3✔
651
        return nil
3✔
652
}
653

654
// BackupState initiates a request to back up a particular revoked state. If the
655
// method returns nil, the backup is guaranteed to be successful unless the
656
// justice transaction would create dust outputs when trying to abide by the
657
// negotiated policy.
658
func (m *Manager) BackupState(chanID *lnwire.ChannelID, stateNum uint64) error {
3✔
659
        select {
3✔
UNCOV
660
        case <-m.quit:
×
UNCOV
661
                return ErrClientExiting
×
662
        default:
3✔
663
        }
664

665
        // Make sure that this channel is registered with the tower client.
666
        m.backupMu.Lock()
3✔
667
        info, ok := m.chanInfos[*chanID]
3✔
668
        if !ok {
3✔
UNCOV
669
                m.backupMu.Unlock()
×
UNCOV
670

×
UNCOV
671
                return ErrUnregisteredChannel
×
UNCOV
672
        }
×
673

674
        // Ignore backups that have already been presented to the client.
675
        var duplicate bool
3✔
676
        info.MaxHeight.WhenSome(func(maxHeight uint64) {
6✔
677
                if stateNum <= maxHeight {
3✔
UNCOV
678
                        duplicate = true
×
UNCOV
679
                }
×
680
        })
681
        if duplicate {
3✔
UNCOV
682
                m.backupMu.Unlock()
×
UNCOV
683

×
UNCOV
684
                log.Debugf("Ignoring duplicate backup for chanid=%v at "+
×
UNCOV
685
                        "height=%d", chanID, stateNum)
×
UNCOV
686

×
UNCOV
687
                return nil
×
UNCOV
688
        }
×
689

690
        // This backup has a higher commit height than any known backup for this
691
        // channel. We'll update our tip so that we won't accept it again if the
692
        // link flaps.
693
        m.chanInfos[*chanID].MaxHeight = fn.Some(stateNum)
3✔
694

3✔
695
        blobType, ok := m.chanBlobType[*chanID]
3✔
696
        if !ok {
3✔
697
                m.backupMu.Unlock()
×
698

×
699
                return ErrUnregisteredChannel
×
700
        }
×
701
        m.backupMu.Unlock()
3✔
702

3✔
703
        m.clientsMu.Lock()
3✔
704
        client, ok := m.clients[blobType]
3✔
705
        if !ok {
3✔
706
                m.clientsMu.Unlock()
×
707

×
708
                return fmt.Errorf("no client registered for blob type %s",
×
709
                        blobType)
×
710
        }
×
711
        m.clientsMu.Unlock()
3✔
712

3✔
713
        return client.backupState(chanID, stateNum)
3✔
714
}
715

716
// isChanClosed can be used to check if the channel with the given ID has been
717
// closed. If it has been, the block height in which its closing transaction was
718
// mined will also be returned.
719
func (m *Manager) isChannelClosed(id lnwire.ChannelID) (bool, uint32,
720
        error) {
3✔
721

3✔
722
        chanSum, err := m.cfg.FetchClosedChannel(id)
3✔
723
        if errors.Is(err, channeldb.ErrClosedChannelNotFound) {
6✔
724
                return false, 0, nil
3✔
725
        } else if err != nil {
3✔
726
                return false, 0, err
×
727
        }
×
728

UNCOV
729
        return true, chanSum.CloseHeight, nil
×
730
}
731

732
// trackClosableSessions takes in a map of session IDs to the earliest block
733
// height at which the session should be deleted. For each of the sessions,
734
// a random delay is added to the block height and the session is added to the
735
// closableSessionQueue.
736
func (m *Manager) trackClosableSessions(
737
        sessions map[wtdb.SessionID]uint32) error {
3✔
738

3✔
739
        // For each closable session, add a random delay to its close
3✔
740
        // height and add it to the closableSessionQueue.
3✔
741
        for sID, blockHeight := range sessions {
6✔
742
                delay, err := newRandomDelay(m.cfg.SessionCloseRange)
3✔
743
                if err != nil {
3✔
744
                        return err
×
745
                }
×
746

747
                deleteHeight := blockHeight + delay
3✔
748

3✔
749
                m.closableSessionQueue.Push(&sessionCloseItem{
3✔
750
                        sessionID:    sID,
3✔
751
                        deleteHeight: deleteHeight,
3✔
752
                })
3✔
753
        }
754

755
        return nil
3✔
756
}
757

758
// handleChannelCloses listens for channel close events and marks channels as
759
// closed in the DB.
760
//
761
// NOTE: This method MUST be run as a goroutine.
762
func (m *Manager) handleChannelCloses(chanSub subscribe.Subscription) {
3✔
763
        defer m.wg.Done()
3✔
764

3✔
765
        log.Debugf("Starting channel close handler")
3✔
766
        defer log.Debugf("Stopping channel close handler")
3✔
767

3✔
768
        for {
6✔
769
                select {
3✔
770
                case update, ok := <-chanSub.Updates():
3✔
771
                        if !ok {
3✔
772
                                log.Debugf("Channel notifier has exited")
×
773
                                return
×
774
                        }
×
775

776
                        // We only care about channel-close events.
777
                        event, ok := update.(channelnotifier.ClosedChannelEvent)
3✔
778
                        if !ok {
6✔
779
                                continue
3✔
780
                        }
781

782
                        chanID := lnwire.NewChanIDFromOutPoint(
3✔
783
                                event.CloseSummary.ChanPoint,
3✔
784
                        )
3✔
785

3✔
786
                        log.Debugf("Received ClosedChannelEvent for "+
3✔
787
                                "channel: %s", chanID)
3✔
788

3✔
789
                        err := m.handleClosedChannel(
3✔
790
                                chanID, event.CloseSummary.CloseHeight,
3✔
791
                        )
3✔
792
                        if err != nil {
3✔
793
                                log.Errorf("Could not handle channel close "+
×
794
                                        "event for channel(%s): %v", chanID,
×
795
                                        err)
×
796
                        }
×
797

798
                case <-m.quit:
3✔
799
                        return
3✔
800
                }
801
        }
802
}
803

804
// handleClosedChannel handles the closure of a single channel. It will mark the
805
// channel as closed in the DB, then it will handle all the sessions that are
806
// now closable due to the channel closure.
807
func (m *Manager) handleClosedChannel(chanID lnwire.ChannelID,
808
        closeHeight uint32) error {
3✔
809

3✔
810
        m.backupMu.Lock()
3✔
811
        defer m.backupMu.Unlock()
3✔
812

3✔
813
        // We only care about channels registered with the tower client.
3✔
814
        if _, ok := m.chanInfos[chanID]; !ok {
3✔
815
                return nil
×
816
        }
×
817

818
        log.Debugf("Marking channel(%s) as closed", chanID)
3✔
819

3✔
820
        sessions, err := m.cfg.DB.MarkChannelClosed(chanID, closeHeight)
3✔
821
        if err != nil {
3✔
822
                return fmt.Errorf("could not mark channel(%s) as closed: %w",
×
823
                        chanID, err)
×
824
        }
×
825

826
        closableSessions := make(map[wtdb.SessionID]uint32, len(sessions))
3✔
827
        for _, sess := range sessions {
6✔
828
                closableSessions[sess] = closeHeight
3✔
829
        }
3✔
830

831
        log.Debugf("Tracking %d new closable sessions as a result of "+
3✔
832
                "closing channel %s", len(closableSessions), chanID)
3✔
833

3✔
834
        err = m.trackClosableSessions(closableSessions)
3✔
835
        if err != nil {
3✔
836
                return fmt.Errorf("could not track closable sessions: %w", err)
×
837
        }
×
838

839
        delete(m.chanInfos, chanID)
3✔
840

3✔
841
        return nil
3✔
842
}
843

844
// handleClosableSessions listens for new block notifications. For each block,
845
// it checks the closableSessionQueue to see if there is a closable session with
846
// a delete-height smaller than or equal to the new block, if there is then the
847
// tower is informed that it can delete the session, and then we also delete it
848
// from our DB.
849
func (m *Manager) handleClosableSessions(
850
        blocksChan *chainntnfs.BlockEpochEvent) {
3✔
851

3✔
852
        defer m.wg.Done()
3✔
853

3✔
854
        log.Debug("Starting closable sessions handler")
3✔
855
        defer log.Debug("Stopping closable sessions handler")
3✔
856

3✔
857
        for {
6✔
858
                select {
3✔
859
                case newBlock := <-blocksChan.Epochs:
3✔
860
                        if newBlock == nil {
6✔
861
                                return
3✔
862
                        }
3✔
863

864
                        height := uint32(newBlock.Height)
3✔
865
                        for {
6✔
866
                                select {
3✔
867
                                case <-m.quit:
×
868
                                        return
×
869
                                default:
3✔
870
                                }
871

872
                                // If there are no closable sessions that we
873
                                // need to handle, then we are done and can
874
                                // reevaluate when the next block comes.
875
                                item := m.closableSessionQueue.Top()
3✔
876
                                if item == nil {
6✔
877
                                        break
3✔
878
                                }
879

880
                                // If there is closable session but the delete
881
                                // height we have set for it is after the
882
                                // current block height, then our work is done.
883
                                if item.deleteHeight > height {
3✔
884
                                        break
×
885
                                }
886

887
                                // Otherwise, we pop this item from the heap
888
                                // and handle it.
889
                                m.closableSessionQueue.Pop()
3✔
890

3✔
891
                                // Fetch the session from the DB so that we can
3✔
892
                                // extract the Tower info.
3✔
893
                                sess, err := m.cfg.DB.GetClientSession(
3✔
894
                                        item.sessionID,
3✔
895
                                )
3✔
896
                                if err != nil {
3✔
897
                                        log.Errorf("error calling "+
×
898
                                                "GetClientSession for "+
×
899
                                                "session %s: %v",
×
900
                                                item.sessionID, err)
×
901

×
902
                                        continue
×
903
                                }
904

905
                                // get appropriate client.
906
                                m.clientsMu.Lock()
3✔
907
                                client, ok := m.clients[sess.Policy.BlobType]
3✔
908
                                if !ok {
3✔
909
                                        m.clientsMu.Unlock()
×
910
                                        log.Errorf("no client currently " +
×
911
                                                "active for the session type")
×
912

×
913
                                        continue
×
914
                                }
915
                                m.clientsMu.Unlock()
3✔
916

3✔
917
                                clientName, err := client.policy().BlobType.
3✔
918
                                        Identifier()
3✔
919
                                if err != nil {
3✔
920
                                        log.Errorf("could not get client "+
×
921
                                                "identifier: %v", err)
×
922

×
923
                                        continue
×
924
                                }
925

926
                                // Stop the session and remove it from the
927
                                // in-memory set.
928
                                err = client.stopAndRemoveSession(
3✔
929
                                        item.sessionID, true,
3✔
930
                                )
3✔
931
                                if err != nil {
3✔
932
                                        log.Errorf("could not remove "+
×
933
                                                "session(%s) from in-memory "+
×
934
                                                "set of the %s client: %v",
×
935
                                                item.sessionID, clientName, err)
×
936

×
937
                                        continue
×
938
                                }
939

940
                                err = client.deleteSessionFromTower(sess)
3✔
941
                                if err != nil {
3✔
942
                                        log.Errorf("error deleting "+
×
943
                                                "session %s from tower: %v",
×
944
                                                sess.ID, err)
×
945

×
946
                                        continue
×
947
                                }
948

949
                                err = m.cfg.DB.DeleteSession(item.sessionID)
3✔
950
                                if err != nil {
3✔
951
                                        log.Errorf("could not delete "+
×
952
                                                "session(%s) from DB: %w",
×
953
                                                sess.ID, err)
×
954

×
955
                                        continue
×
956
                                }
957
                        }
958

UNCOV
959
                case <-m.quit:
×
UNCOV
960
                        return
×
961
                }
962
        }
963
}
964

965
func (m *Manager) getSweepScript(id lnwire.ChannelID) ([]byte, bool) {
3✔
966
        m.backupMu.Lock()
3✔
967
        defer m.backupMu.Unlock()
3✔
968

3✔
969
        summary, ok := m.chanInfos[id]
3✔
970
        if !ok {
3✔
971
                return nil, false
×
972
        }
×
973

974
        return summary.SweepPkScript, true
3✔
975
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc