• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 19153182187

06 Nov 2025 11:36PM UTC coverage: 54.793% (-11.9%) from 66.712%
19153182187

Pull #10352

github

web-flow
Merge d6c3e8fa9 into 096ab65b1
Pull Request #10352: [WIP] chainrpc: return Unavailable while notifier starts

110400 of 201486 relevant lines covered (54.79%)

21823.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.35
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "fmt"
16
        "sync"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channelnotifier"
23
        "github.com/lightningnetwork/lnd/clock"
24
        "github.com/lightningnetwork/lnd/peernotifier"
25
        "github.com/lightningnetwork/lnd/routing/route"
26
        "github.com/lightningnetwork/lnd/subscribe"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // FlapCountFlushRate determines how often we write peer total flap
32
        // count to disk.
33
        FlapCountFlushRate = time.Hour
34
)
35

36
var (
37
        // errShuttingDown is returned when the store cannot respond to a query
38
        // because it has received the shutdown signal.
39
        errShuttingDown = errors.New("channel event store shutting down")
40

41
        // ErrChannelNotFound is returned when a query is made for a channel
42
        // that the event store does not have knowledge of.
43
        ErrChannelNotFound = errors.New("channel not found in event store")
44

45
        // ErrPeerNotFound is returned when a query is made for a channel
46
        // that has a peer that the event store is not currently tracking.
47
        ErrPeerNotFound = errors.New("peer not found in event store")
48
)
49

50
// ChannelEventStore maintains a set of event logs for the node's channels to
51
// provide insight into the performance and health of channels.
52
type ChannelEventStore struct {
53
        started atomic.Bool
54
        stopped atomic.Bool
55

56
        cfg *Config
57

58
        // peers tracks all of our currently monitored peers and their channels.
59
        peers map[route.Vertex]peerMonitor
60

61
        // chanInfoRequests serves requests for information about our channel.
62
        chanInfoRequests chan channelInfoRequest
63

64
        // peerRequests serves requests for information about a peer.
65
        peerRequests chan peerRequest
66

67
        quit chan struct{}
68

69
        wg sync.WaitGroup
70
}
71

72
// Config provides the event store with functions required to monitor channel
73
// activity. All elements of the config must be non-nil for the event store to
74
// operate.
75
type Config struct {
76
        // SubscribeChannelEvents provides a subscription client which provides
77
        // a stream of channel events.
78
        SubscribeChannelEvents func() (subscribe.Subscription, error)
79

80
        // SubscribePeerEvents provides a subscription client which provides a
81
        // stream of peer online/offline events.
82
        SubscribePeerEvents func() (subscribe.Subscription, error)
83

84
        // GetOpenChannels provides a list of existing open channels which is
85
        // used to populate the ChannelEventStore with a set of channels on
86
        // startup.
87
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
88

89
        // Clock is the time source that the subsystem uses, provided here
90
        // for ease of testing.
91
        Clock clock.Clock
92

93
        // WriteFlapCount records the flap count for a set of peers on disk.
94
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
95

96
        // ReadFlapCount gets the flap count for a peer on disk.
97
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
98

99
        // FlapCountTicker is a ticker which controls how often we flush our
100
        // peer's flap count to disk.
101
        FlapCountTicker ticker.Ticker
102
}
103

104
// peerFlapCountMap is the map used to map peers to flap counts, declared here
105
// to allow shorter function signatures.
106
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
107

108
type channelInfoRequest struct {
109
        peer         route.Vertex
110
        channelPoint wire.OutPoint
111
        responseChan chan channelInfoResponse
112
}
113

114
type channelInfoResponse struct {
115
        info *ChannelInfo
116
        err  error
117
}
118

119
type peerRequest struct {
120
        peer         route.Vertex
121
        responseChan chan peerResponse
122
}
123

124
type peerResponse struct {
125
        flapCount int
126
        ts        *time.Time
127
        err       error
128
}
129

130
// NewChannelEventStore initializes an event store with the config provided.
131
// Note that this function does not start the main event loop, Start() must be
132
// called.
133
func NewChannelEventStore(config *Config) *ChannelEventStore {
11✔
134
        store := &ChannelEventStore{
11✔
135
                cfg:              config,
11✔
136
                peers:            make(map[route.Vertex]peerMonitor),
11✔
137
                chanInfoRequests: make(chan channelInfoRequest),
11✔
138
                peerRequests:     make(chan peerRequest),
11✔
139
                quit:             make(chan struct{}),
11✔
140
        }
11✔
141

11✔
142
        return store
11✔
143
}
11✔
144

145
// Start adds all existing open channels to the event store and starts the main
146
// loop which records channel and peer events, and serves requests for
147
// information from the store. If this function fails, it cancels its existing
148
// subscriptions and returns an error.
149
func (c *ChannelEventStore) Start() error {
11✔
150
        log.Info("ChannelEventStore starting...")
11✔
151

11✔
152
        if c.started.Swap(true) {
11✔
153
                return fmt.Errorf("ChannelEventStore started more than once")
×
154
        }
×
155

156
        // Create a subscription to channel events.
157
        channelClient, err := c.cfg.SubscribeChannelEvents()
11✔
158
        if err != nil {
12✔
159
                return err
1✔
160
        }
1✔
161

162
        // Create a subscription to peer events. If an error occurs, cancel the
163
        // existing subscription to channel events and return.
164
        peerClient, err := c.cfg.SubscribePeerEvents()
10✔
165
        if err != nil {
11✔
166
                channelClient.Cancel()
1✔
167
                return err
1✔
168
        }
1✔
169

170
        // cancel should be called to cancel all subscriptions if an error
171
        // occurs.
172
        cancel := func() {
18✔
173
                channelClient.Cancel()
9✔
174
                peerClient.Cancel()
9✔
175
        }
9✔
176

177
        // Add the existing set of channels to the event store. This is required
178
        // because channel events will not be triggered for channels that exist
179
        // at startup time.
180
        channels, err := c.cfg.GetOpenChannels()
9✔
181
        if err != nil {
10✔
182
                cancel()
1✔
183
                return err
1✔
184
        }
1✔
185

186
        log.Infof("Adding %v channels to event store", len(channels))
8✔
187

8✔
188
        for _, ch := range channels {
8✔
189
                peerKey, err := route.NewVertexFromBytes(
×
190
                        ch.IdentityPub.SerializeCompressed(),
×
191
                )
×
192
                if err != nil {
×
193
                        cancel()
×
194
                        return err
×
195
                }
×
196

197
                // Add existing channels to the channel store with an initial
198
                // peer online or offline event.
199
                c.addChannel(ch.FundingOutpoint, peerKey)
×
200
        }
201

202
        // Start a goroutine that consumes events from all subscriptions.
203
        c.wg.Add(1)
8✔
204
        go c.consume(&subscriptions{
8✔
205
                channelUpdates: channelClient.Updates(),
8✔
206
                peerUpdates:    peerClient.Updates(),
8✔
207
                cancel:         cancel,
8✔
208
        })
8✔
209

8✔
210
        log.Debug("ChannelEventStore started")
8✔
211

8✔
212
        return nil
8✔
213
}
214

215
// Stop terminates all goroutines started by the event store.
216
func (c *ChannelEventStore) Stop() error {
8✔
217
        log.Info("ChannelEventStore shutting down...")
8✔
218

8✔
219
        if c.stopped.Swap(true) {
8✔
220
                return fmt.Errorf("ChannelEventStore stopped more than once")
×
221
        }
×
222

223
        // Stop the consume goroutine.
224
        close(c.quit)
8✔
225
        c.wg.Wait()
8✔
226

8✔
227
        // Stop the ticker after the goroutine reading from it has exited, to
8✔
228
        // avoid a race.
8✔
229
        var err error
8✔
230
        if c.cfg.FlapCountTicker == nil {
8✔
231
                err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
×
232
                        "initialized")
×
233
        } else {
8✔
234
                c.cfg.FlapCountTicker.Stop()
8✔
235
        }
8✔
236

237
        log.Debugf("ChannelEventStore shutdown complete")
8✔
238

8✔
239
        return err
8✔
240
}
241

242
// addChannel checks whether we are already tracking a channel's peer, creates a
243
// new peer log to track it if we are not yet monitoring it, and adds the
244
// channel.
245
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
246
        peer route.Vertex) {
11✔
247

11✔
248
        peerMonitor, err := c.getOrCreatePeerMonitor(peer)
11✔
249
        if err != nil {
11✔
250
                log.Error("could not create monitor: %v", err)
×
251
                return
×
252
        }
×
253

254
        if err := peerMonitor.addChannel(channelPoint); err != nil {
12✔
255
                log.Errorf("could not add channel: %v", err)
1✔
256
        }
1✔
257
}
258

259
// getOrCreatePeerMonitor tries to get an existing peer monitor from our in
260
// memory list. If the peer is not yet known to us, it will create a new
261
// monitor and add it to the list. When a new monitor is created, we also send
262
// an initial online event for the peer.
263
func (c *ChannelEventStore) getOrCreatePeerMonitor(
264
        peer route.Vertex) (peerMonitor, error) {
11✔
265

11✔
266
        peerMonitor, ok := c.peers[peer]
11✔
267
        if ok {
14✔
268
                return peerMonitor, nil
3✔
269
        }
3✔
270

271
        var (
8✔
272
                flapCount int
8✔
273
                lastFlap  *time.Time
8✔
274
        )
8✔
275

8✔
276
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
8✔
277
        switch err {
8✔
278
        // If we do not have any records for this peer we set a 0 flap count
279
        // and timestamp.
280
        case channeldb.ErrNoPeerBucket:
8✔
281

282
        case nil:
×
283
                flapCount = int(historicalFlap.Count)
×
284
                lastFlap = &historicalFlap.LastFlap
×
285

286
        // Return if we get an unexpected error.
287
        default:
×
288
                return nil, err
×
289
        }
290

291
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
8✔
292
        c.peers[peer] = peerMonitor
8✔
293

8✔
294
        // Send an online event given it's the first time we see this peer.
8✔
295
        peerMonitor.onlineEvent(true)
8✔
296

8✔
297
        return peerMonitor, nil
8✔
298
}
299

300
// closeChannel records a closed time for a channel, and returns early is the
301
// channel is not known to the event store. We log warnings (rather than errors)
302
// when we cannot find a peer/channel because channels that we restore from a
303
// static channel backup do not have their open notified, so the event store
304
// never learns about them, but they are closed using the regular flow so we
305
// will try to remove them on close. At present, we cannot easily distinguish
306
// between these closes and others.
307
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
308
        peer route.Vertex) {
1✔
309

1✔
310
        peerMonitor, ok := c.peers[peer]
1✔
311
        if !ok {
1✔
312
                log.Warnf("peer not known to store: %v", peer)
×
313
                return
×
314
        }
×
315

316
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
1✔
317
                log.Warnf("could not remove channel: %v", err)
×
318
        }
×
319
}
320

321
// peerEvent adds an online event to a peer's monitor. If the peer is not
322
// yet known to the event store, the event is ignored. A peer is only known to
323
// the event store if we have an open channel with them.
324
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
15✔
325
        peerMonitor, ok := c.peers[peer]
15✔
326
        if !ok {
20✔
327
                log.Tracef("Ignore peer event (online=%v) from non-channel "+
5✔
328
                        "peer: %v", online, peer)
5✔
329

5✔
330
                return
5✔
331
        }
5✔
332

333
        peerMonitor.onlineEvent(online)
10✔
334
}
335

336
// subscriptions abstracts away from subscription clients to allow for mocking.
337
type subscriptions struct {
338
        channelUpdates <-chan interface{}
339
        peerUpdates    <-chan interface{}
340
        cancel         func()
341
}
342

343
// consume is the event store's main loop. It consumes subscriptions to update
344
// the event store with channel and peer events, and serves requests for channel
345
// uptime and lifespan.
346
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
8✔
347
        // Start our flap count ticker.
8✔
348
        c.cfg.FlapCountTicker.Resume()
8✔
349

8✔
350
        // On exit, we will cancel our subscriptions and write our most recent
8✔
351
        // flap counts to disk. This ensures that we have consistent data in
8✔
352
        // the case of a graceful shutdown. If we do not shutdown gracefully,
8✔
353
        // our worst case is data from our last flap count tick (1H).
8✔
354
        defer func() {
16✔
355
                subscriptions.cancel()
8✔
356

8✔
357
                if err := c.recordFlapCount(); err != nil {
8✔
358
                        log.Errorf("error recording flap on shutdown: %v", err)
×
359
                }
×
360

361
                c.wg.Done()
8✔
362
        }()
363

364
        // Consume events until the channel is closed.
365
        for {
54✔
366
                select {
46✔
367
                // Process channel opened and closed events.
368
                case e := <-subscriptions.channelUpdates:
12✔
369
                        switch event := e.(type) {
12✔
370
                        // A new channel has been opened, we must add the
371
                        // channel to the store and record a channel open event.
372
                        case channelnotifier.OpenChannelEvent:
11✔
373
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
11✔
374
                                peerKey, err := route.NewVertexFromBytes(
11✔
375
                                        compressed,
11✔
376
                                )
11✔
377
                                if err != nil {
11✔
378
                                        log.Errorf("Could not get vertex "+
×
379
                                                "from: %v", compressed)
×
380
                                }
×
381

382
                                op := event.Channel.FundingOutpoint
11✔
383

11✔
384
                                log.Tracef("Received OpenChannelEvent(%v) "+
11✔
385
                                        "from %v", op, peerKey)
11✔
386

11✔
387
                                c.addChannel(op, peerKey)
11✔
388

389
                        // A channel has been closed, we must remove the channel
390
                        // from the store and record a channel closed event.
391
                        case channelnotifier.ClosedChannelEvent:
1✔
392
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
1✔
393
                                peerKey, err := route.NewVertexFromBytes(
1✔
394
                                        compressed,
1✔
395
                                )
1✔
396
                                if err != nil {
1✔
397
                                        log.Errorf("Could not get vertex "+
×
398
                                                "from: %v", compressed)
×
399
                                        continue
×
400
                                }
401

402
                                c.closeChannel(
1✔
403
                                        event.CloseSummary.ChanPoint, peerKey,
1✔
404
                                )
1✔
405
                        }
406

407
                // Process peer online and offline events.
408
                case e := <-subscriptions.peerUpdates:
15✔
409
                        switch event := e.(type) {
15✔
410
                        // We have reestablished a connection with our peer,
411
                        // and should record an online event for any channels
412
                        // with that peer.
413
                        case peernotifier.PeerOnlineEvent:
10✔
414
                                c.peerEvent(event.PubKey, true)
10✔
415

416
                        // We have lost a connection with our peer, and should
417
                        // record an offline event for any channels with that
418
                        // peer.
419
                        case peernotifier.PeerOfflineEvent:
5✔
420
                                c.peerEvent(event.PubKey, false)
5✔
421
                        }
422

423
                // Serve all requests for channel lifetime.
424
                case req := <-c.chanInfoRequests:
5✔
425
                        var resp channelInfoResponse
5✔
426

5✔
427
                        resp.info, resp.err = c.getChanInfo(req)
5✔
428
                        req.responseChan <- resp
5✔
429

430
                // Serve all requests for information about our peer.
431
                case req := <-c.peerRequests:
4✔
432
                        var resp peerResponse
4✔
433

4✔
434
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
4✔
435
                                req.peer,
4✔
436
                        )
4✔
437
                        req.responseChan <- resp
4✔
438

439
                case <-c.cfg.FlapCountTicker.Ticks():
2✔
440
                        if err := c.recordFlapCount(); err != nil {
2✔
441
                                log.Errorf("could not record flap "+
×
442
                                        "count: %v", err)
×
443
                        }
×
444

445
                // Exit if the store receives the signal to shutdown.
446
                case <-c.quit:
8✔
447
                        return
8✔
448
                }
449
        }
450
}
451

452
// ChannelInfo provides the set of information that the event store has recorded
453
// for a channel.
454
type ChannelInfo struct {
455
        // Lifetime is the total amount of time we have monitored the channel
456
        // for.
457
        Lifetime time.Duration
458

459
        // Uptime is the total amount of time that the channel peer has been
460
        // observed as online during the monitored lifespan.
461
        Uptime time.Duration
462
}
463

464
// GetChanInfo gets all the information we have on a channel in the event store.
465
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
466
        peer route.Vertex) (*ChannelInfo, error) {
5✔
467

5✔
468
        request := channelInfoRequest{
5✔
469
                peer:         peer,
5✔
470
                channelPoint: channelPoint,
5✔
471
                responseChan: make(chan channelInfoResponse),
5✔
472
        }
5✔
473

5✔
474
        // Send a request for the channel's information to the main event loop,
5✔
475
        // or return early with an error if the store has already received a
5✔
476
        // shutdown signal.
5✔
477
        select {
5✔
478
        case c.chanInfoRequests <- request:
5✔
479
        case <-c.quit:
×
480
                return nil, errShuttingDown
×
481
        }
482

483
        // Return the response we receive on the response channel or exit early
484
        // if the store is instructed to exit.
485
        select {
5✔
486
        case resp := <-request.responseChan:
5✔
487
                return resp.info, resp.err
5✔
488

489
        case <-c.quit:
×
490
                return nil, errShuttingDown
×
491
        }
492
}
493

494
// getChanInfo collects channel information for a channel. It gets uptime over
495
// the full lifetime of the channel.
496
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
497
        error) {
5✔
498

5✔
499
        peerMonitor, ok := c.peers[req.peer]
5✔
500
        if !ok {
6✔
501
                return nil, fmt.Errorf("%w: %v", ErrPeerNotFound, req.peer)
1✔
502
        }
1✔
503

504
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
4✔
505
        if err != nil {
4✔
506
                return nil, err
×
507
        }
×
508

509
        return &ChannelInfo{
4✔
510
                Lifetime: lifetime,
4✔
511
                Uptime:   uptime,
4✔
512
        }, nil
4✔
513
}
514

515
// FlapCount returns the flap count we have for a peer and the timestamp of its
516
// last flap. If we do not have any flaps recorded for the peer, the last flap
517
// timestamp will be nil.
518
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
519
        error) {
4✔
520

4✔
521
        request := peerRequest{
4✔
522
                peer:         peer,
4✔
523
                responseChan: make(chan peerResponse),
4✔
524
        }
4✔
525

4✔
526
        // Send a request for the peer's information to the main event loop,
4✔
527
        // or return early with an error if the store has already received a
4✔
528
        // shutdown signal.
4✔
529
        select {
4✔
530
        case c.peerRequests <- request:
4✔
531
        case <-c.quit:
×
532
                return 0, nil, errShuttingDown
×
533
        }
534

535
        // Return the response we receive on the response channel or exit early
536
        // if the store is instructed to exit.
537
        select {
4✔
538
        case resp := <-request.responseChan:
4✔
539
                return resp.flapCount, resp.ts, resp.err
4✔
540

541
        case <-c.quit:
×
542
                return 0, nil, errShuttingDown
×
543
        }
544
}
545

546
// flapCount gets our peer flap count and last flap timestamp from our in memory
547
// record of a peer, falling back to on disk if we are not currently tracking
548
// the peer. If we have no flap count recorded for the peer, a nil last flap
549
// time will be returned.
550
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
551
        error) {
4✔
552

4✔
553
        // First check whether we are tracking this peer in memory, because this
4✔
554
        // record will have the most accurate flap count. We do not fail if we
4✔
555
        // can't find the peer in memory, because we may have previously
4✔
556
        // recorded its flap count on disk.
4✔
557
        peerMonitor, ok := c.peers[peer]
4✔
558
        if ok {
5✔
559
                count, ts := peerMonitor.getFlapCount()
1✔
560
                return count, ts, nil
1✔
561
        }
1✔
562

563
        // Try to get our flap count from the database. If this value is not
564
        // recorded, we return a nil last flap time to indicate that we have no
565
        // record of the peer's flap count.
566
        flapCount, err := c.cfg.ReadFlapCount(peer)
3✔
567
        switch err {
3✔
568
        case channeldb.ErrNoPeerBucket:
2✔
569
                return 0, nil, nil
2✔
570

571
        case nil:
1✔
572
                return int(flapCount.Count), &flapCount.LastFlap, nil
1✔
573

574
        default:
×
575
                return 0, nil, err
×
576
        }
577
}
578

579
// recordFlapCount will record our flap count for each peer that we are
580
// currently tracking, skipping peers that have a 0 flap count.
581
func (c *ChannelEventStore) recordFlapCount() error {
10✔
582
        updates := make(peerFlapCountMap)
10✔
583

10✔
584
        for peer, monitor := range c.peers {
20✔
585
                flapCount, lastFlap := monitor.getFlapCount()
10✔
586
                if lastFlap == nil {
10✔
587
                        continue
×
588
                }
589

590
                updates[peer] = &channeldb.FlapCount{
10✔
591
                        Count:    uint32(flapCount),
10✔
592
                        LastFlap: *lastFlap,
10✔
593
                }
10✔
594
        }
595

596
        log.Debugf("recording flap count for: %v peers", len(updates))
10✔
597

10✔
598
        return c.cfg.WriteFlapCount(updates)
10✔
599
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc