• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.0
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "fmt"
16
        "sync"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channelnotifier"
23
        "github.com/lightningnetwork/lnd/clock"
24
        "github.com/lightningnetwork/lnd/peernotifier"
25
        "github.com/lightningnetwork/lnd/routing/route"
26
        "github.com/lightningnetwork/lnd/subscribe"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // FlapCountFlushRate determines how often we write peer total flap
32
        // count to disk.
33
        FlapCountFlushRate = time.Hour
34
)
35

36
var (
37
        // errShuttingDown is returned when the store cannot respond to a query
38
        // because it has received the shutdown signal.
39
        errShuttingDown = errors.New("channel event store shutting down")
40

41
        // ErrChannelNotFound is returned when a query is made for a channel
42
        // that the event store does not have knowledge of.
43
        ErrChannelNotFound = errors.New("channel not found in event store")
44

45
        // ErrPeerNotFound is returned when a query is made for a channel
46
        // that has a peer that the event store is not currently tracking.
47
        ErrPeerNotFound = errors.New("peer not found in event store")
48
)
49

50
// ChannelEventStore maintains a set of event logs for the node's channels to
51
// provide insight into the performance and health of channels.
52
type ChannelEventStore struct {
53
        started atomic.Bool
54
        stopped atomic.Bool
55

56
        cfg *Config
57

58
        // peers tracks all of our currently monitored peers and their channels.
59
        peers map[route.Vertex]peerMonitor
60

61
        // chanInfoRequests serves requests for information about our channel.
62
        chanInfoRequests chan channelInfoRequest
63

64
        // peerRequests serves requests for information about a peer.
65
        peerRequests chan peerRequest
66

67
        quit chan struct{}
68

69
        wg sync.WaitGroup
70
}
71

72
// Config provides the event store with functions required to monitor channel
73
// activity. All elements of the config must be non-nil for the event store to
74
// operate.
75
type Config struct {
76
        // SubscribeChannelEvents provides a subscription client which provides
77
        // a stream of channel events.
78
        SubscribeChannelEvents func() (subscribe.Subscription, error)
79

80
        // SubscribePeerEvents provides a subscription client which provides a
81
        // stream of peer online/offline events.
82
        SubscribePeerEvents func() (subscribe.Subscription, error)
83

84
        // GetOpenChannels provides a list of existing open channels which is
85
        // used to populate the ChannelEventStore with a set of channels on
86
        // startup.
87
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
88

89
        // Clock is the time source that the subsystem uses, provided here
90
        // for ease of testing.
91
        Clock clock.Clock
92

93
        // WriteFlapCount records the flap count for a set of peers on disk.
94
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
95

96
        // ReadFlapCount gets the flap count for a peer on disk.
97
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
98

99
        // FlapCountTicker is a ticker which controls how often we flush our
100
        // peer's flap count to disk.
101
        FlapCountTicker ticker.Ticker
102
}
103

104
// peerFlapCountMap is the map used to map peers to flap counts, declared here
105
// to allow shorter function signatures.
106
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
107

108
type channelInfoRequest struct {
109
        peer         route.Vertex
110
        channelPoint wire.OutPoint
111
        responseChan chan channelInfoResponse
112
}
113

114
type channelInfoResponse struct {
115
        info *ChannelInfo
116
        err  error
117
}
118

119
type peerRequest struct {
120
        peer         route.Vertex
121
        responseChan chan peerResponse
122
}
123

124
type peerResponse struct {
125
        flapCount int
126
        ts        *time.Time
127
        err       error
128
}
129

130
// NewChannelEventStore initializes an event store with the config provided.
131
// Note that this function does not start the main event loop, Start() must be
132
// called.
133
func NewChannelEventStore(config *Config) *ChannelEventStore {
3✔
134
        store := &ChannelEventStore{
3✔
135
                cfg:              config,
3✔
136
                peers:            make(map[route.Vertex]peerMonitor),
3✔
137
                chanInfoRequests: make(chan channelInfoRequest),
3✔
138
                peerRequests:     make(chan peerRequest),
3✔
139
                quit:             make(chan struct{}),
3✔
140
        }
3✔
141

3✔
142
        return store
3✔
143
}
3✔
144

145
// Start adds all existing open channels to the event store and starts the main
146
// loop which records channel and peer events, and serves requests for
147
// information from the store. If this function fails, it cancels its existing
148
// subscriptions and returns an error.
149
func (c *ChannelEventStore) Start() error {
3✔
150
        log.Info("ChannelEventStore starting...")
3✔
151

3✔
152
        if c.started.Swap(true) {
3✔
153
                return fmt.Errorf("ChannelEventStore started more than once")
×
154
        }
×
155

156
        // Create a subscription to channel events.
157
        channelClient, err := c.cfg.SubscribeChannelEvents()
3✔
158
        if err != nil {
3✔
UNCOV
159
                return err
×
UNCOV
160
        }
×
161

162
        // Create a subscription to peer events. If an error occurs, cancel the
163
        // existing subscription to channel events and return.
164
        peerClient, err := c.cfg.SubscribePeerEvents()
3✔
165
        if err != nil {
3✔
UNCOV
166
                channelClient.Cancel()
×
UNCOV
167
                return err
×
UNCOV
168
        }
×
169

170
        // cancel should be called to cancel all subscriptions if an error
171
        // occurs.
172
        cancel := func() {
6✔
173
                channelClient.Cancel()
3✔
174
                peerClient.Cancel()
3✔
175
        }
3✔
176

177
        // Add the existing set of channels to the event store. This is required
178
        // because channel events will not be triggered for channels that exist
179
        // at startup time.
180
        channels, err := c.cfg.GetOpenChannels()
3✔
181
        if err != nil {
3✔
UNCOV
182
                cancel()
×
UNCOV
183
                return err
×
UNCOV
184
        }
×
185

186
        log.Infof("Adding %v channels to event store", len(channels))
3✔
187

3✔
188
        for _, ch := range channels {
6✔
189
                peerKey, err := route.NewVertexFromBytes(
3✔
190
                        ch.IdentityPub.SerializeCompressed(),
3✔
191
                )
3✔
192
                if err != nil {
3✔
193
                        cancel()
×
194
                        return err
×
195
                }
×
196

197
                // Add existing channels to the channel store with an initial
198
                // peer online or offline event.
199
                c.addChannel(ch.FundingOutpoint, peerKey)
3✔
200
        }
201

202
        // Start a goroutine that consumes events from all subscriptions.
203
        c.wg.Add(1)
3✔
204
        go c.consume(&subscriptions{
3✔
205
                channelUpdates: channelClient.Updates(),
3✔
206
                peerUpdates:    peerClient.Updates(),
3✔
207
                cancel:         cancel,
3✔
208
        })
3✔
209

3✔
210
        log.Debug("ChannelEventStore started")
3✔
211

3✔
212
        return nil
3✔
213
}
214

215
// Stop terminates all goroutines started by the event store.
216
func (c *ChannelEventStore) Stop() error {
3✔
217
        log.Info("ChannelEventStore shutting down...")
3✔
218

3✔
219
        if c.stopped.Swap(true) {
3✔
220
                return fmt.Errorf("ChannelEventStore stopped more than once")
×
221
        }
×
222

223
        // Stop the consume goroutine.
224
        close(c.quit)
3✔
225
        c.wg.Wait()
3✔
226

3✔
227
        // Stop the ticker after the goroutine reading from it has exited, to
3✔
228
        // avoid a race.
3✔
229
        var err error
3✔
230
        if c.cfg.FlapCountTicker == nil {
3✔
231
                err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
×
232
                        "initialized")
×
233
        } else {
3✔
234
                c.cfg.FlapCountTicker.Stop()
3✔
235
        }
3✔
236

237
        log.Debugf("ChannelEventStore shutdown complete")
3✔
238

3✔
239
        return err
3✔
240
}
241

242
// addChannel checks whether we are already tracking a channel's peer, creates a
243
// new peer log to track it if we are not yet monitoring it, and adds the
244
// channel.
245
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
246
        peer route.Vertex) {
3✔
247

3✔
248
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
249
        if err != nil {
3✔
250
                log.Error("could not create monitor: %v", err)
×
251
                return
×
252
        }
×
253

254
        if err := peerMonitor.addChannel(channelPoint); err != nil {
3✔
UNCOV
255
                log.Errorf("could not add channel: %v", err)
×
UNCOV
256
        }
×
257
}
258

259
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
260
// and falls back to creating a new monitor if it is not currently known.
261
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
262
        error) {
3✔
263

3✔
264
        peerMonitor, ok := c.peers[peer]
3✔
265
        if ok {
6✔
266
                return peerMonitor, nil
3✔
267
        }
3✔
268

269
        var (
3✔
270
                flapCount int
3✔
271
                lastFlap  *time.Time
3✔
272
        )
3✔
273

3✔
274
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
3✔
275
        switch err {
3✔
276
        // If we do not have any records for this peer we set a 0 flap count
277
        // and timestamp.
278
        case channeldb.ErrNoPeerBucket:
3✔
279

280
        case nil:
3✔
281
                flapCount = int(historicalFlap.Count)
3✔
282
                lastFlap = &historicalFlap.LastFlap
3✔
283

284
        // Return if we get an unexpected error.
285
        default:
×
286
                return nil, err
×
287
        }
288

289
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
3✔
290
        c.peers[peer] = peerMonitor
3✔
291

3✔
292
        return peerMonitor, nil
3✔
293
}
294

295
// closeChannel records a closed time for a channel, and returns early is the
296
// channel is not known to the event store. We log warnings (rather than errors)
297
// when we cannot find a peer/channel because channels that we restore from a
298
// static channel backup do not have their open notified, so the event store
299
// never learns about them, but they are closed using the regular flow so we
300
// will try to remove them on close. At present, we cannot easily distinguish
301
// between these closes and others.
302
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
303
        peer route.Vertex) {
3✔
304

3✔
305
        peerMonitor, ok := c.peers[peer]
3✔
306
        if !ok {
6✔
307
                log.Warnf("peer not known to store: %v", peer)
3✔
308
                return
3✔
309
        }
3✔
310

311
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
6✔
312
                log.Warnf("could not remove channel: %v", err)
3✔
313
        }
3✔
314
}
315

316
// peerEvent creates a peer monitor for a peer if we do not currently have
317
// one, and adds an online event to it.
318
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
3✔
319
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
320
        if err != nil {
3✔
321
                log.Error("could not create monitor: %v", err)
×
322
                return
×
323
        }
×
324

325
        peerMonitor.onlineEvent(online)
3✔
326
}
327

328
// subscriptions abstracts away from subscription clients to allow for mocking.
329
type subscriptions struct {
330
        channelUpdates <-chan interface{}
331
        peerUpdates    <-chan interface{}
332
        cancel         func()
333
}
334

335
// consume is the event store's main loop. It consumes subscriptions to update
336
// the event store with channel and peer events, and serves requests for channel
337
// uptime and lifespan.
338
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
3✔
339
        // Start our flap count ticker.
3✔
340
        c.cfg.FlapCountTicker.Resume()
3✔
341

3✔
342
        // On exit, we will cancel our subscriptions and write our most recent
3✔
343
        // flap counts to disk. This ensures that we have consistent data in
3✔
344
        // the case of a graceful shutdown. If we do not shutdown gracefully,
3✔
345
        // our worst case is data from our last flap count tick (1H).
3✔
346
        defer func() {
6✔
347
                subscriptions.cancel()
3✔
348

3✔
349
                if err := c.recordFlapCount(); err != nil {
3✔
350
                        log.Errorf("error recording flap on shutdown: %v", err)
×
351
                }
×
352

353
                c.wg.Done()
3✔
354
        }()
355

356
        // Consume events until the channel is closed.
357
        for {
6✔
358
                select {
3✔
359
                // Process channel opened and closed events.
360
                case e := <-subscriptions.channelUpdates:
3✔
361
                        switch event := e.(type) {
3✔
362
                        // A new channel has been opened, we must add the
363
                        // channel to the store and record a channel open event.
364
                        case channelnotifier.OpenChannelEvent:
3✔
365
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
3✔
366
                                peerKey, err := route.NewVertexFromBytes(
3✔
367
                                        compressed,
3✔
368
                                )
3✔
369
                                if err != nil {
3✔
370
                                        log.Errorf("Could not get vertex "+
×
371
                                                "from: %v", compressed)
×
372
                                }
×
373

374
                                c.addChannel(
3✔
375
                                        event.Channel.FundingOutpoint, peerKey,
3✔
376
                                )
3✔
377

378
                        // A channel has been closed, we must remove the channel
379
                        // from the store and record a channel closed event.
380
                        case channelnotifier.ClosedChannelEvent:
3✔
381
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
3✔
382
                                peerKey, err := route.NewVertexFromBytes(
3✔
383
                                        compressed,
3✔
384
                                )
3✔
385
                                if err != nil {
3✔
386
                                        log.Errorf("Could not get vertex "+
×
387
                                                "from: %v", compressed)
×
388
                                        continue
×
389
                                }
390

391
                                c.closeChannel(
3✔
392
                                        event.CloseSummary.ChanPoint, peerKey,
3✔
393
                                )
3✔
394
                        }
395

396
                // Process peer online and offline events.
397
                case e := <-subscriptions.peerUpdates:
3✔
398
                        switch event := e.(type) {
3✔
399
                        // We have reestablished a connection with our peer,
400
                        // and should record an online event for any channels
401
                        // with that peer.
402
                        case peernotifier.PeerOnlineEvent:
3✔
403
                                c.peerEvent(event.PubKey, true)
3✔
404

405
                        // We have lost a connection with our peer, and should
406
                        // record an offline event for any channels with that
407
                        // peer.
408
                        case peernotifier.PeerOfflineEvent:
3✔
409
                                c.peerEvent(event.PubKey, false)
3✔
410
                        }
411

412
                // Serve all requests for channel lifetime.
413
                case req := <-c.chanInfoRequests:
3✔
414
                        var resp channelInfoResponse
3✔
415

3✔
416
                        resp.info, resp.err = c.getChanInfo(req)
3✔
417
                        req.responseChan <- resp
3✔
418

419
                // Serve all requests for information about our peer.
420
                case req := <-c.peerRequests:
3✔
421
                        var resp peerResponse
3✔
422

3✔
423
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
3✔
424
                                req.peer,
3✔
425
                        )
3✔
426
                        req.responseChan <- resp
3✔
427

UNCOV
428
                case <-c.cfg.FlapCountTicker.Ticks():
×
UNCOV
429
                        if err := c.recordFlapCount(); err != nil {
×
430
                                log.Errorf("could not record flap "+
×
431
                                        "count: %v", err)
×
432
                        }
×
433

434
                // Exit if the store receives the signal to shutdown.
435
                case <-c.quit:
3✔
436
                        return
3✔
437
                }
438
        }
439
}
440

441
// ChannelInfo provides the set of information that the event store has recorded
442
// for a channel.
443
type ChannelInfo struct {
444
        // Lifetime is the total amount of time we have monitored the channel
445
        // for.
446
        Lifetime time.Duration
447

448
        // Uptime is the total amount of time that the channel peer has been
449
        // observed as online during the monitored lifespan.
450
        Uptime time.Duration
451
}
452

453
// GetChanInfo gets all the information we have on a channel in the event store.
454
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
455
        peer route.Vertex) (*ChannelInfo, error) {
3✔
456

3✔
457
        request := channelInfoRequest{
3✔
458
                peer:         peer,
3✔
459
                channelPoint: channelPoint,
3✔
460
                responseChan: make(chan channelInfoResponse),
3✔
461
        }
3✔
462

3✔
463
        // Send a request for the channel's information to the main event loop,
3✔
464
        // or return early with an error if the store has already received a
3✔
465
        // shutdown signal.
3✔
466
        select {
3✔
467
        case c.chanInfoRequests <- request:
3✔
468
        case <-c.quit:
×
469
                return nil, errShuttingDown
×
470
        }
471

472
        // Return the response we receive on the response channel or exit early
473
        // if the store is instructed to exit.
474
        select {
3✔
475
        case resp := <-request.responseChan:
3✔
476
                return resp.info, resp.err
3✔
477

478
        case <-c.quit:
×
479
                return nil, errShuttingDown
×
480
        }
481
}
482

483
// getChanInfo collects channel information for a channel. It gets uptime over
484
// the full lifetime of the channel.
485
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
486
        error) {
3✔
487

3✔
488
        peerMonitor, ok := c.peers[req.peer]
3✔
489
        if !ok {
3✔
490
                return nil, ErrPeerNotFound
×
491
        }
×
492

493
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
3✔
494
        if err != nil {
6✔
495
                return nil, err
3✔
496
        }
3✔
497

498
        return &ChannelInfo{
3✔
499
                Lifetime: lifetime,
3✔
500
                Uptime:   uptime,
3✔
501
        }, nil
3✔
502
}
503

504
// FlapCount returns the flap count we have for a peer and the timestamp of its
505
// last flap. If we do not have any flaps recorded for the peer, the last flap
506
// timestamp will be nil.
507
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
508
        error) {
3✔
509

3✔
510
        request := peerRequest{
3✔
511
                peer:         peer,
3✔
512
                responseChan: make(chan peerResponse),
3✔
513
        }
3✔
514

3✔
515
        // Send a request for the peer's information to the main event loop,
3✔
516
        // or return early with an error if the store has already received a
3✔
517
        // shutdown signal.
3✔
518
        select {
3✔
519
        case c.peerRequests <- request:
3✔
520
        case <-c.quit:
×
521
                return 0, nil, errShuttingDown
×
522
        }
523

524
        // Return the response we receive on the response channel or exit early
525
        // if the store is instructed to exit.
526
        select {
3✔
527
        case resp := <-request.responseChan:
3✔
528
                return resp.flapCount, resp.ts, resp.err
3✔
529

530
        case <-c.quit:
×
531
                return 0, nil, errShuttingDown
×
532
        }
533
}
534

535
// flapCount gets our peer flap count and last flap timestamp from our in memory
536
// record of a peer, falling back to on disk if we are not currently tracking
537
// the peer. If we have no flap count recorded for the peer, a nil last flap
538
// time will be returned.
539
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
540
        error) {
3✔
541

3✔
542
        // First check whether we are tracking this peer in memory, because this
3✔
543
        // record will have the most accurate flap count. We do not fail if we
3✔
544
        // can't find the peer in memory, because we may have previously
3✔
545
        // recorded its flap count on disk.
3✔
546
        peerMonitor, ok := c.peers[peer]
3✔
547
        if ok {
6✔
548
                count, ts := peerMonitor.getFlapCount()
3✔
549
                return count, ts, nil
3✔
550
        }
3✔
551

552
        // Try to get our flap count from the database. If this value is not
553
        // recorded, we return a nil last flap time to indicate that we have no
554
        // record of the peer's flap count.
UNCOV
555
        flapCount, err := c.cfg.ReadFlapCount(peer)
×
UNCOV
556
        switch err {
×
UNCOV
557
        case channeldb.ErrNoPeerBucket:
×
UNCOV
558
                return 0, nil, nil
×
559

UNCOV
560
        case nil:
×
UNCOV
561
                return int(flapCount.Count), &flapCount.LastFlap, nil
×
562

563
        default:
×
564
                return 0, nil, err
×
565
        }
566
}
567

568
// recordFlapCount will record our flap count for each peer that we are
569
// currently tracking, skipping peers that have a 0 flap count.
570
func (c *ChannelEventStore) recordFlapCount() error {
3✔
571
        updates := make(peerFlapCountMap)
3✔
572

3✔
573
        for peer, monitor := range c.peers {
6✔
574
                flapCount, lastFlap := monitor.getFlapCount()
3✔
575
                if lastFlap == nil {
3✔
576
                        continue
×
577
                }
578

579
                updates[peer] = &channeldb.FlapCount{
3✔
580
                        Count:    uint32(flapCount),
3✔
581
                        LastFlap: *lastFlap,
3✔
582
                }
3✔
583
        }
584

585
        log.Debugf("recording flap count for: %v peers", len(updates))
3✔
586

3✔
587
        return c.cfg.WriteFlapCount(updates)
3✔
588
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc