• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13980275562

20 Mar 2025 10:06PM UTC coverage: 58.6% (-10.2%) from 68.789%
13980275562

Pull #9623

github

web-flow
Merge b9b960345 into 09b674508
Pull Request #9623: Size msg test msg

0 of 1518 new or added lines in 42 files covered. (0.0%)

26603 existing lines in 443 files now uncovered.

96807 of 165200 relevant lines covered (58.6%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.0
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "fmt"
16
        "sync"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channelnotifier"
23
        "github.com/lightningnetwork/lnd/clock"
24
        "github.com/lightningnetwork/lnd/peernotifier"
25
        "github.com/lightningnetwork/lnd/routing/route"
26
        "github.com/lightningnetwork/lnd/subscribe"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // FlapCountFlushRate determines how often we write peer total flap
32
        // count to disk.
33
        FlapCountFlushRate = time.Hour
34
)
35

36
var (
37
        // errShuttingDown is returned when the store cannot respond to a query
38
        // because it has received the shutdown signal.
39
        errShuttingDown = errors.New("channel event store shutting down")
40

41
        // ErrChannelNotFound is returned when a query is made for a channel
42
        // that the event store does not have knowledge of.
43
        ErrChannelNotFound = errors.New("channel not found in event store")
44

45
        // ErrPeerNotFound is returned when a query is made for a channel
46
        // that has a peer that the event store is not currently tracking.
47
        ErrPeerNotFound = errors.New("peer not found in event store")
48
)
49

50
// ChannelEventStore maintains a set of event logs for the node's channels to
51
// provide insight into the performance and health of channels.
52
type ChannelEventStore struct {
53
        started atomic.Bool
54
        stopped atomic.Bool
55

56
        cfg *Config
57

58
        // peers tracks all of our currently monitored peers and their channels.
59
        peers map[route.Vertex]peerMonitor
60

61
        // chanInfoRequests serves requests for information about our channel.
62
        chanInfoRequests chan channelInfoRequest
63

64
        // peerRequests serves requests for information about a peer.
65
        peerRequests chan peerRequest
66

67
        quit chan struct{}
68

69
        wg sync.WaitGroup
70
}
71

72
// Config provides the event store with functions required to monitor channel
73
// activity. All elements of the config must be non-nil for the event store to
74
// operate.
75
type Config struct {
76
        // SubscribeChannelEvents provides a subscription client which provides
77
        // a stream of channel events.
78
        SubscribeChannelEvents func() (subscribe.Subscription, error)
79

80
        // SubscribePeerEvents provides a subscription client which provides a
81
        // stream of peer online/offline events.
82
        SubscribePeerEvents func() (subscribe.Subscription, error)
83

84
        // GetOpenChannels provides a list of existing open channels which is
85
        // used to populate the ChannelEventStore with a set of channels on
86
        // startup.
87
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
88

89
        // Clock is the time source that the subsystem uses, provided here
90
        // for ease of testing.
91
        Clock clock.Clock
92

93
        // WriteFlapCount records the flap count for a set of peers on disk.
94
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
95

96
        // ReadFlapCount gets the flap count for a peer on disk.
97
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
98

99
        // FlapCountTicker is a ticker which controls how often we flush our
100
        // peer's flap count to disk.
101
        FlapCountTicker ticker.Ticker
102
}
103

104
// peerFlapCountMap is the map used to map peers to flap counts, declared here
105
// to allow shorter function signatures.
106
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
107

108
type channelInfoRequest struct {
109
        peer         route.Vertex
110
        channelPoint wire.OutPoint
111
        responseChan chan channelInfoResponse
112
}
113

114
type channelInfoResponse struct {
115
        info *ChannelInfo
116
        err  error
117
}
118

119
type peerRequest struct {
120
        peer         route.Vertex
121
        responseChan chan peerResponse
122
}
123

124
type peerResponse struct {
125
        flapCount int
126
        ts        *time.Time
127
        err       error
128
}
129

130
// NewChannelEventStore initializes an event store with the config provided.
131
// Note that this function does not start the main event loop, Start() must be
132
// called.
133
func NewChannelEventStore(config *Config) *ChannelEventStore {
3✔
134
        store := &ChannelEventStore{
3✔
135
                cfg:              config,
3✔
136
                peers:            make(map[route.Vertex]peerMonitor),
3✔
137
                chanInfoRequests: make(chan channelInfoRequest),
3✔
138
                peerRequests:     make(chan peerRequest),
3✔
139
                quit:             make(chan struct{}),
3✔
140
        }
3✔
141

3✔
142
        return store
3✔
143
}
3✔
144

145
// Start adds all existing open channels to the event store and starts the main
146
// loop which records channel and peer events, and serves requests for
147
// information from the store. If this function fails, it cancels its existing
148
// subscriptions and returns an error.
149
func (c *ChannelEventStore) Start() error {
3✔
150
        log.Info("ChannelEventStore starting...")
3✔
151

3✔
152
        if c.started.Swap(true) {
3✔
153
                return fmt.Errorf("ChannelEventStore started more than once")
×
154
        }
×
155

156
        // Create a subscription to channel events.
157
        channelClient, err := c.cfg.SubscribeChannelEvents()
3✔
158
        if err != nil {
3✔
UNCOV
159
                return err
×
UNCOV
160
        }
×
161

162
        // Create a subscription to peer events. If an error occurs, cancel the
163
        // existing subscription to channel events and return.
164
        peerClient, err := c.cfg.SubscribePeerEvents()
3✔
165
        if err != nil {
3✔
UNCOV
166
                channelClient.Cancel()
×
UNCOV
167
                return err
×
UNCOV
168
        }
×
169

170
        // cancel should be called to cancel all subscriptions if an error
171
        // occurs.
172
        cancel := func() {
6✔
173
                channelClient.Cancel()
3✔
174
                peerClient.Cancel()
3✔
175
        }
3✔
176

177
        // Add the existing set of channels to the event store. This is required
178
        // because channel events will not be triggered for channels that exist
179
        // at startup time.
180
        channels, err := c.cfg.GetOpenChannels()
3✔
181
        if err != nil {
3✔
UNCOV
182
                cancel()
×
UNCOV
183
                return err
×
UNCOV
184
        }
×
185

186
        log.Infof("Adding %v channels to event store", len(channels))
3✔
187

3✔
188
        for _, ch := range channels {
6✔
189
                peerKey, err := route.NewVertexFromBytes(
3✔
190
                        ch.IdentityPub.SerializeCompressed(),
3✔
191
                )
3✔
192
                if err != nil {
3✔
193
                        cancel()
×
194
                        return err
×
195
                }
×
196

197
                // Add existing channels to the channel store with an initial
198
                // peer online or offline event.
199
                c.addChannel(ch.FundingOutpoint, peerKey)
3✔
200
        }
201

202
        // Start a goroutine that consumes events from all subscriptions.
203
        c.wg.Add(1)
3✔
204
        go c.consume(&subscriptions{
3✔
205
                channelUpdates: channelClient.Updates(),
3✔
206
                peerUpdates:    peerClient.Updates(),
3✔
207
                cancel:         cancel,
3✔
208
        })
3✔
209

3✔
210
        log.Debug("ChannelEventStore started")
3✔
211

3✔
212
        return nil
3✔
213
}
214

215
// Stop terminates all goroutines started by the event store.
216
func (c *ChannelEventStore) Stop() error {
3✔
217
        log.Info("ChannelEventStore shutting down...")
3✔
218

3✔
219
        if c.stopped.Swap(true) {
3✔
220
                return fmt.Errorf("ChannelEventStore stopped more than once")
×
221
        }
×
222

223
        // Stop the consume goroutine.
224
        close(c.quit)
3✔
225
        c.wg.Wait()
3✔
226

3✔
227
        // Stop the ticker after the goroutine reading from it has exited, to
3✔
228
        // avoid a race.
3✔
229
        var err error
3✔
230
        if c.cfg.FlapCountTicker == nil {
3✔
231
                err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
×
232
                        "initialized")
×
233
        } else {
3✔
234
                c.cfg.FlapCountTicker.Stop()
3✔
235
        }
3✔
236

237
        log.Debugf("ChannelEventStore shutdown complete")
3✔
238

3✔
239
        return err
3✔
240
}
241

242
// addChannel checks whether we are already tracking a channel's peer, creates a
243
// new peer log to track it if we are not yet monitoring it, and adds the
244
// channel.
245
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
246
        peer route.Vertex) {
3✔
247

3✔
248
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
249
        if err != nil {
3✔
250
                log.Error("could not create monitor: %v", err)
×
251
                return
×
252
        }
×
253

254
        if err := peerMonitor.addChannel(channelPoint); err != nil {
3✔
UNCOV
255
                log.Errorf("could not add channel: %v", err)
×
UNCOV
256
        }
×
257
}
258

259
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
260
// and falls back to creating a new monitor if it is not currently known.
261
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
262
        error) {
3✔
263

3✔
264
        peerMonitor, ok := c.peers[peer]
3✔
265
        if ok {
6✔
266
                return peerMonitor, nil
3✔
267
        }
3✔
268

269
        var (
3✔
270
                flapCount int
3✔
271
                lastFlap  *time.Time
3✔
272
        )
3✔
273

3✔
274
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
3✔
275
        switch err {
3✔
276
        // If we do not have any records for this peer we set a 0 flap count
277
        // and timestamp.
278
        case channeldb.ErrNoPeerBucket:
3✔
279

280
        case nil:
3✔
281
                flapCount = int(historicalFlap.Count)
3✔
282
                lastFlap = &historicalFlap.LastFlap
3✔
283

284
        // Return if we get an unexpected error.
285
        default:
×
286
                return nil, err
×
287
        }
288

289
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
3✔
290
        c.peers[peer] = peerMonitor
3✔
291

3✔
292
        return peerMonitor, nil
3✔
293
}
294

295
// closeChannel records a closed time for a channel, and returns early is the
296
// channel is not known to the event store. We log warnings (rather than errors)
297
// when we cannot find a peer/channel because channels that we restore from a
298
// static channel backup do not have their open notified, so the event store
299
// never learns about them, but they are closed using the regular flow so we
300
// will try to remove them on close. At present, we cannot easily distinguish
301
// between these closes and others.
302
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
303
        peer route.Vertex) {
3✔
304

3✔
305
        peerMonitor, ok := c.peers[peer]
3✔
306
        if !ok {
6✔
307
                log.Warnf("peer not known to store: %v", peer)
3✔
308
                return
3✔
309
        }
3✔
310

311
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
6✔
312
                log.Warnf("could not remove channel: %v", err)
3✔
313
        }
3✔
314
}
315

316
// peerEvent creates a peer monitor for a peer if we do not currently have
317
// one, and adds an online event to it.
318
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
3✔
319
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
320
        if err != nil {
3✔
321
                log.Error("could not create monitor: %v", err)
×
322
                return
×
323
        }
×
324

325
        peerMonitor.onlineEvent(online)
3✔
326
}
327

328
// subscriptions abstracts away from subscription clients to allow for mocking.
329
type subscriptions struct {
330
        channelUpdates <-chan interface{}
331
        peerUpdates    <-chan interface{}
332
        cancel         func()
333
}
334

335
// consume is the event store's main loop. It consumes subscriptions to update
336
// the event store with channel and peer events, and serves requests for channel
337
// uptime and lifespan.
338
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
3✔
339
        // Start our flap count ticker.
3✔
340
        c.cfg.FlapCountTicker.Resume()
3✔
341

3✔
342
        // On exit, we will cancel our subscriptions and write our most recent
3✔
343
        // flap counts to disk. This ensures that we have consistent data in
3✔
344
        // the case of a graceful shutdown. If we do not shutdown gracefully,
3✔
345
        // our worst case is data from our last flap count tick (1H).
3✔
346
        defer func() {
6✔
347
                subscriptions.cancel()
3✔
348

3✔
349
                if err := c.recordFlapCount(); err != nil {
3✔
350
                        log.Errorf("error recording flap on shutdown: %v", err)
×
351
                }
×
352

353
                c.wg.Done()
3✔
354
        }()
355

356
        // Consume events until the channel is closed.
357
        for {
6✔
358
                select {
3✔
359
                // Process channel opened and closed events.
360
                case e := <-subscriptions.channelUpdates:
3✔
361
                        switch event := e.(type) {
3✔
362
                        // A new channel has been opened, we must add the
363
                        // channel to the store and record a channel open event.
364
                        case channelnotifier.OpenChannelEvent:
3✔
365
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
3✔
366
                                peerKey, err := route.NewVertexFromBytes(
3✔
367
                                        compressed,
3✔
368
                                )
3✔
369
                                if err != nil {
3✔
370
                                        log.Errorf("Could not get vertex "+
×
371
                                                "from: %v", compressed)
×
372
                                }
×
373

374
                                c.addChannel(
3✔
375
                                        event.Channel.FundingOutpoint, peerKey,
3✔
376
                                )
3✔
377

378
                        // A channel has been closed, we must remove the channel
379
                        // from the store and record a channel closed event.
380
                        case channelnotifier.ClosedChannelEvent:
3✔
381
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
3✔
382
                                peerKey, err := route.NewVertexFromBytes(
3✔
383
                                        compressed,
3✔
384
                                )
3✔
385
                                if err != nil {
3✔
386
                                        log.Errorf("Could not get vertex "+
×
387
                                                "from: %v", compressed)
×
388
                                        continue
×
389
                                }
390

391
                                c.closeChannel(
3✔
392
                                        event.CloseSummary.ChanPoint, peerKey,
3✔
393
                                )
3✔
394
                        }
395

396
                // Process peer online and offline events.
397
                case e := <-subscriptions.peerUpdates:
3✔
398
                        switch event := e.(type) {
3✔
399
                        // We have reestablished a connection with our peer,
400
                        // and should record an online event for any channels
401
                        // with that peer.
402
                        case peernotifier.PeerOnlineEvent:
3✔
403
                                c.peerEvent(event.PubKey, true)
3✔
404

405
                        // We have lost a connection with our peer, and should
406
                        // record an offline event for any channels with that
407
                        // peer.
408
                        case peernotifier.PeerOfflineEvent:
3✔
409
                                c.peerEvent(event.PubKey, false)
3✔
410
                        }
411

412
                // Serve all requests for channel lifetime.
413
                case req := <-c.chanInfoRequests:
3✔
414
                        var resp channelInfoResponse
3✔
415

3✔
416
                        resp.info, resp.err = c.getChanInfo(req)
3✔
417
                        req.responseChan <- resp
3✔
418

419
                // Serve all requests for information about our peer.
420
                case req := <-c.peerRequests:
3✔
421
                        var resp peerResponse
3✔
422

3✔
423
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
3✔
424
                                req.peer,
3✔
425
                        )
3✔
426
                        req.responseChan <- resp
3✔
427

UNCOV
428
                case <-c.cfg.FlapCountTicker.Ticks():
×
UNCOV
429
                        if err := c.recordFlapCount(); err != nil {
×
430
                                log.Errorf("could not record flap "+
×
431
                                        "count: %v", err)
×
432
                        }
×
433

434
                // Exit if the store receives the signal to shutdown.
435
                case <-c.quit:
3✔
436
                        return
3✔
437
                }
438
        }
439
}
440

441
// ChannelInfo provides the set of information that the event store has recorded
442
// for a channel.
443
type ChannelInfo struct {
444
        // Lifetime is the total amount of time we have monitored the channel
445
        // for.
446
        Lifetime time.Duration
447

448
        // Uptime is the total amount of time that the channel peer has been
449
        // observed as online during the monitored lifespan.
450
        Uptime time.Duration
451
}
452

453
// GetChanInfo gets all the information we have on a channel in the event store.
454
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
455
        peer route.Vertex) (*ChannelInfo, error) {
3✔
456

3✔
457
        request := channelInfoRequest{
3✔
458
                peer:         peer,
3✔
459
                channelPoint: channelPoint,
3✔
460
                responseChan: make(chan channelInfoResponse),
3✔
461
        }
3✔
462

3✔
463
        // Send a request for the channel's information to the main event loop,
3✔
464
        // or return early with an error if the store has already received a
3✔
465
        // shutdown signal.
3✔
466
        select {
3✔
467
        case c.chanInfoRequests <- request:
3✔
468
        case <-c.quit:
×
469
                return nil, errShuttingDown
×
470
        }
471

472
        // Return the response we receive on the response channel or exit early
473
        // if the store is instructed to exit.
474
        select {
3✔
475
        case resp := <-request.responseChan:
3✔
476
                return resp.info, resp.err
3✔
477

478
        case <-c.quit:
×
479
                return nil, errShuttingDown
×
480
        }
481
}
482

483
// getChanInfo collects channel information for a channel. It gets uptime over
484
// the full lifetime of the channel.
485
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
486
        error) {
3✔
487

3✔
488
        peerMonitor, ok := c.peers[req.peer]
3✔
489
        if !ok {
3✔
490
                return nil, ErrPeerNotFound
×
491
        }
×
492

493
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
3✔
494
        if err != nil {
6✔
495
                return nil, err
3✔
496
        }
3✔
497

498
        return &ChannelInfo{
3✔
499
                Lifetime: lifetime,
3✔
500
                Uptime:   uptime,
3✔
501
        }, nil
3✔
502
}
503

504
// FlapCount returns the flap count we have for a peer and the timestamp of its
505
// last flap. If we do not have any flaps recorded for the peer, the last flap
506
// timestamp will be nil.
507
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
508
        error) {
3✔
509

3✔
510
        request := peerRequest{
3✔
511
                peer:         peer,
3✔
512
                responseChan: make(chan peerResponse),
3✔
513
        }
3✔
514

3✔
515
        // Send a request for the peer's information to the main event loop,
3✔
516
        // or return early with an error if the store has already received a
3✔
517
        // shutdown signal.
3✔
518
        select {
3✔
519
        case c.peerRequests <- request:
3✔
520
        case <-c.quit:
×
521
                return 0, nil, errShuttingDown
×
522
        }
523

524
        // Return the response we receive on the response channel or exit early
525
        // if the store is instructed to exit.
526
        select {
3✔
527
        case resp := <-request.responseChan:
3✔
528
                return resp.flapCount, resp.ts, resp.err
3✔
529

530
        case <-c.quit:
×
531
                return 0, nil, errShuttingDown
×
532
        }
533
}
534

535
// flapCount gets our peer flap count and last flap timestamp from our in memory
536
// record of a peer, falling back to on disk if we are not currently tracking
537
// the peer. If we have no flap count recorded for the peer, a nil last flap
538
// time will be returned.
539
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
540
        error) {
3✔
541

3✔
542
        // First check whether we are tracking this peer in memory, because this
3✔
543
        // record will have the most accurate flap count. We do not fail if we
3✔
544
        // can't find the peer in memory, because we may have previously
3✔
545
        // recorded its flap count on disk.
3✔
546
        peerMonitor, ok := c.peers[peer]
3✔
547
        if ok {
6✔
548
                count, ts := peerMonitor.getFlapCount()
3✔
549
                return count, ts, nil
3✔
550
        }
3✔
551

552
        // Try to get our flap count from the database. If this value is not
553
        // recorded, we return a nil last flap time to indicate that we have no
554
        // record of the peer's flap count.
UNCOV
555
        flapCount, err := c.cfg.ReadFlapCount(peer)
×
UNCOV
556
        switch err {
×
UNCOV
557
        case channeldb.ErrNoPeerBucket:
×
UNCOV
558
                return 0, nil, nil
×
559

UNCOV
560
        case nil:
×
UNCOV
561
                return int(flapCount.Count), &flapCount.LastFlap, nil
×
562

563
        default:
×
564
                return 0, nil, err
×
565
        }
566
}
567

568
// recordFlapCount will record our flap count for each peer that we are
569
// currently tracking, skipping peers that have a 0 flap count.
570
func (c *ChannelEventStore) recordFlapCount() error {
3✔
571
        updates := make(peerFlapCountMap)
3✔
572

3✔
573
        for peer, monitor := range c.peers {
6✔
574
                flapCount, lastFlap := monitor.getFlapCount()
3✔
575
                if lastFlap == nil {
3✔
576
                        continue
×
577
                }
578

579
                updates[peer] = &channeldb.FlapCount{
3✔
580
                        Count:    uint32(flapCount),
3✔
581
                        LastFlap: *lastFlap,
3✔
582
                }
3✔
583
        }
584

585
        log.Debugf("recording flap count for: %v peers", len(updates))
3✔
586

3✔
587
        return c.cfg.WriteFlapCount(updates)
3✔
588
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc