• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.0
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "fmt"
16
        "sync"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channelnotifier"
23
        "github.com/lightningnetwork/lnd/clock"
24
        "github.com/lightningnetwork/lnd/peernotifier"
25
        "github.com/lightningnetwork/lnd/routing/route"
26
        "github.com/lightningnetwork/lnd/subscribe"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // FlapCountFlushRate determines how often we write peer total flap
32
        // count to disk.
33
        FlapCountFlushRate = time.Hour
34
)
35

36
var (
37
        // errShuttingDown is returned when the store cannot respond to a query
38
        // because it has received the shutdown signal.
39
        errShuttingDown = errors.New("channel event store shutting down")
40

41
        // ErrChannelNotFound is returned when a query is made for a channel
42
        // that the event store does not have knowledge of.
43
        ErrChannelNotFound = errors.New("channel not found in event store")
44

45
        // ErrPeerNotFound is returned when a query is made for a channel
46
        // that has a peer that the event store is not currently tracking.
47
        ErrPeerNotFound = errors.New("peer not found in event store")
48
)
49

50
// ChannelEventStore maintains a set of event logs for the node's channels to
51
// provide insight into the performance and health of channels.
52
type ChannelEventStore struct {
53
        started atomic.Bool
54
        stopped atomic.Bool
55

56
        cfg *Config
57

58
        // peers tracks all of our currently monitored peers and their channels.
59
        peers map[route.Vertex]peerMonitor
60

61
        // chanInfoRequests serves requests for information about our channel.
62
        chanInfoRequests chan channelInfoRequest
63

64
        // peerRequests serves requests for information about a peer.
65
        peerRequests chan peerRequest
66

67
        quit chan struct{}
68

69
        wg sync.WaitGroup
70
}
71

72
// Config provides the event store with functions required to monitor channel
73
// activity. All elements of the config must be non-nil for the event store to
74
// operate.
75
type Config struct {
76
        // SubscribeChannelEvents provides a subscription client which provides
77
        // a stream of channel events.
78
        SubscribeChannelEvents func() (subscribe.Subscription, error)
79

80
        // SubscribePeerEvents provides a subscription client which provides a
81
        // stream of peer online/offline events.
82
        SubscribePeerEvents func() (subscribe.Subscription, error)
83

84
        // GetOpenChannels provides a list of existing open channels which is
85
        // used to populate the ChannelEventStore with a set of channels on
86
        // startup.
87
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
88

89
        // Clock is the time source that the subsystem uses, provided here
90
        // for ease of testing.
91
        Clock clock.Clock
92

93
        // WriteFlapCount records the flap count for a set of peers on disk.
94
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
95

96
        // ReadFlapCount gets the flap count for a peer on disk.
97
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
98

99
        // FlapCountTicker is a ticker which controls how often we flush our
100
        // peer's flap count to disk.
101
        FlapCountTicker ticker.Ticker
102
}
103

104
// peerFlapCountMap is the map used to map peers to flap counts, declared here
105
// to allow shorter function signatures.
106
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
107

108
type channelInfoRequest struct {
109
        peer         route.Vertex
110
        channelPoint wire.OutPoint
111
        responseChan chan channelInfoResponse
112
}
113

114
type channelInfoResponse struct {
115
        info *ChannelInfo
116
        err  error
117
}
118

119
type peerRequest struct {
120
        peer         route.Vertex
121
        responseChan chan peerResponse
122
}
123

124
type peerResponse struct {
125
        flapCount int
126
        ts        *time.Time
127
        err       error
128
}
129

130
// NewChannelEventStore initializes an event store with the config provided.
131
// Note that this function does not start the main event loop, Start() must be
132
// called.
133
func NewChannelEventStore(config *Config) *ChannelEventStore {
2✔
134
        store := &ChannelEventStore{
2✔
135
                cfg:              config,
2✔
136
                peers:            make(map[route.Vertex]peerMonitor),
2✔
137
                chanInfoRequests: make(chan channelInfoRequest),
2✔
138
                peerRequests:     make(chan peerRequest),
2✔
139
                quit:             make(chan struct{}),
2✔
140
        }
2✔
141

2✔
142
        return store
2✔
143
}
2✔
144

145
// Start adds all existing open channels to the event store and starts the main
146
// loop which records channel and peer events, and serves requests for
147
// information from the store. If this function fails, it cancels its existing
148
// subscriptions and returns an error.
149
func (c *ChannelEventStore) Start() error {
2✔
150
        log.Info("ChannelEventStore starting...")
2✔
151

2✔
152
        if c.started.Swap(true) {
2✔
153
                return fmt.Errorf("ChannelEventStore started more than once")
×
154
        }
×
155

156
        // Create a subscription to channel events.
157
        channelClient, err := c.cfg.SubscribeChannelEvents()
2✔
158
        if err != nil {
2✔
UNCOV
159
                return err
×
UNCOV
160
        }
×
161

162
        // Create a subscription to peer events. If an error occurs, cancel the
163
        // existing subscription to channel events and return.
164
        peerClient, err := c.cfg.SubscribePeerEvents()
2✔
165
        if err != nil {
2✔
UNCOV
166
                channelClient.Cancel()
×
UNCOV
167
                return err
×
UNCOV
168
        }
×
169

170
        // cancel should be called to cancel all subscriptions if an error
171
        // occurs.
172
        cancel := func() {
4✔
173
                channelClient.Cancel()
2✔
174
                peerClient.Cancel()
2✔
175
        }
2✔
176

177
        // Add the existing set of channels to the event store. This is required
178
        // because channel events will not be triggered for channels that exist
179
        // at startup time.
180
        channels, err := c.cfg.GetOpenChannels()
2✔
181
        if err != nil {
2✔
UNCOV
182
                cancel()
×
UNCOV
183
                return err
×
UNCOV
184
        }
×
185

186
        log.Infof("Adding %v channels to event store", len(channels))
2✔
187

2✔
188
        for _, ch := range channels {
4✔
189
                peerKey, err := route.NewVertexFromBytes(
2✔
190
                        ch.IdentityPub.SerializeCompressed(),
2✔
191
                )
2✔
192
                if err != nil {
2✔
193
                        cancel()
×
194
                        return err
×
195
                }
×
196

197
                // Add existing channels to the channel store with an initial
198
                // peer online or offline event.
199
                c.addChannel(ch.FundingOutpoint, peerKey)
2✔
200
        }
201

202
        // Start a goroutine that consumes events from all subscriptions.
203
        c.wg.Add(1)
2✔
204
        go c.consume(&subscriptions{
2✔
205
                channelUpdates: channelClient.Updates(),
2✔
206
                peerUpdates:    peerClient.Updates(),
2✔
207
                cancel:         cancel,
2✔
208
        })
2✔
209

2✔
210
        log.Debug("ChannelEventStore started")
2✔
211

2✔
212
        return nil
2✔
213
}
214

215
// Stop terminates all goroutines started by the event store.
216
func (c *ChannelEventStore) Stop() error {
2✔
217
        log.Info("ChannelEventStore shutting down...")
2✔
218

2✔
219
        if c.stopped.Swap(true) {
2✔
220
                return fmt.Errorf("ChannelEventStore stopped more than once")
×
221
        }
×
222

223
        // Stop the consume goroutine.
224
        close(c.quit)
2✔
225
        c.wg.Wait()
2✔
226

2✔
227
        // Stop the ticker after the goroutine reading from it has exited, to
2✔
228
        // avoid a race.
2✔
229
        var err error
2✔
230
        if c.cfg.FlapCountTicker == nil {
2✔
231
                err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
×
232
                        "initialized")
×
233
        } else {
2✔
234
                c.cfg.FlapCountTicker.Stop()
2✔
235
        }
2✔
236

237
        log.Debugf("ChannelEventStore shutdown complete")
2✔
238

2✔
239
        return err
2✔
240
}
241

242
// addChannel checks whether we are already tracking a channel's peer, creates a
243
// new peer log to track it if we are not yet monitoring it, and adds the
244
// channel.
245
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
246
        peer route.Vertex) {
2✔
247

2✔
248
        peerMonitor, err := c.getPeerMonitor(peer)
2✔
249
        if err != nil {
2✔
250
                log.Error("could not create monitor: %v", err)
×
251
                return
×
252
        }
×
253

254
        if err := peerMonitor.addChannel(channelPoint); err != nil {
2✔
UNCOV
255
                log.Errorf("could not add channel: %v", err)
×
UNCOV
256
        }
×
257
}
258

259
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
260
// and falls back to creating a new monitor if it is not currently known.
261
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
262
        error) {
2✔
263

2✔
264
        peerMonitor, ok := c.peers[peer]
2✔
265
        if ok {
4✔
266
                return peerMonitor, nil
2✔
267
        }
2✔
268

269
        var (
2✔
270
                flapCount int
2✔
271
                lastFlap  *time.Time
2✔
272
        )
2✔
273

2✔
274
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
2✔
275
        switch err {
2✔
276
        // If we do not have any records for this peer we set a 0 flap count
277
        // and timestamp.
278
        case channeldb.ErrNoPeerBucket:
2✔
279

280
        case nil:
2✔
281
                flapCount = int(historicalFlap.Count)
2✔
282
                lastFlap = &historicalFlap.LastFlap
2✔
283

284
        // Return if we get an unexpected error.
285
        default:
×
286
                return nil, err
×
287
        }
288

289
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
2✔
290
        c.peers[peer] = peerMonitor
2✔
291

2✔
292
        return peerMonitor, nil
2✔
293
}
294

295
// closeChannel records a closed time for a channel, and returns early is the
296
// channel is not known to the event store. We log warnings (rather than errors)
297
// when we cannot find a peer/channel because channels that we restore from a
298
// static channel backup do not have their open notified, so the event store
299
// never learns about them, but they are closed using the regular flow so we
300
// will try to remove them on close. At present, we cannot easily distinguish
301
// between these closes and others.
302
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
303
        peer route.Vertex) {
2✔
304

2✔
305
        peerMonitor, ok := c.peers[peer]
2✔
306
        if !ok {
4✔
307
                log.Warnf("peer not known to store: %v", peer)
2✔
308
                return
2✔
309
        }
2✔
310

311
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
4✔
312
                log.Warnf("could not remove channel: %v", err)
2✔
313
        }
2✔
314
}
315

316
// peerEvent creates a peer monitor for a peer if we do not currently have
317
// one, and adds an online event to it.
318
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
2✔
319
        peerMonitor, err := c.getPeerMonitor(peer)
2✔
320
        if err != nil {
2✔
321
                log.Error("could not create monitor: %v", err)
×
322
                return
×
323
        }
×
324

325
        peerMonitor.onlineEvent(online)
2✔
326
}
327

328
// subscriptions abstracts away from subscription clients to allow for mocking.
329
type subscriptions struct {
330
        channelUpdates <-chan interface{}
331
        peerUpdates    <-chan interface{}
332
        cancel         func()
333
}
334

335
// consume is the event store's main loop. It consumes subscriptions to update
336
// the event store with channel and peer events, and serves requests for channel
337
// uptime and lifespan.
338
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
2✔
339
        // Start our flap count ticker.
2✔
340
        c.cfg.FlapCountTicker.Resume()
2✔
341

2✔
342
        // On exit, we will cancel our subscriptions and write our most recent
2✔
343
        // flap counts to disk. This ensures that we have consistent data in
2✔
344
        // the case of a graceful shutdown. If we do not shutdown gracefully,
2✔
345
        // our worst case is data from our last flap count tick (1H).
2✔
346
        defer func() {
4✔
347
                subscriptions.cancel()
2✔
348

2✔
349
                if err := c.recordFlapCount(); err != nil {
2✔
350
                        log.Errorf("error recording flap on shutdown: %v", err)
×
351
                }
×
352

353
                c.wg.Done()
2✔
354
        }()
355

356
        // Consume events until the channel is closed.
357
        for {
4✔
358
                select {
2✔
359
                // Process channel opened and closed events.
360
                case e := <-subscriptions.channelUpdates:
2✔
361
                        switch event := e.(type) {
2✔
362
                        // A new channel has been opened, we must add the
363
                        // channel to the store and record a channel open event.
364
                        case channelnotifier.OpenChannelEvent:
2✔
365
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
2✔
366
                                peerKey, err := route.NewVertexFromBytes(
2✔
367
                                        compressed,
2✔
368
                                )
2✔
369
                                if err != nil {
2✔
370
                                        log.Errorf("Could not get vertex "+
×
371
                                                "from: %v", compressed)
×
372
                                }
×
373

374
                                c.addChannel(
2✔
375
                                        event.Channel.FundingOutpoint, peerKey,
2✔
376
                                )
2✔
377

378
                        // A channel has been closed, we must remove the channel
379
                        // from the store and record a channel closed event.
380
                        case channelnotifier.ClosedChannelEvent:
2✔
381
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
2✔
382
                                peerKey, err := route.NewVertexFromBytes(
2✔
383
                                        compressed,
2✔
384
                                )
2✔
385
                                if err != nil {
2✔
386
                                        log.Errorf("Could not get vertex "+
×
387
                                                "from: %v", compressed)
×
388
                                        continue
×
389
                                }
390

391
                                c.closeChannel(
2✔
392
                                        event.CloseSummary.ChanPoint, peerKey,
2✔
393
                                )
2✔
394
                        }
395

396
                // Process peer online and offline events.
397
                case e := <-subscriptions.peerUpdates:
2✔
398
                        switch event := e.(type) {
2✔
399
                        // We have reestablished a connection with our peer,
400
                        // and should record an online event for any channels
401
                        // with that peer.
402
                        case peernotifier.PeerOnlineEvent:
2✔
403
                                c.peerEvent(event.PubKey, true)
2✔
404

405
                        // We have lost a connection with our peer, and should
406
                        // record an offline event for any channels with that
407
                        // peer.
408
                        case peernotifier.PeerOfflineEvent:
2✔
409
                                c.peerEvent(event.PubKey, false)
2✔
410
                        }
411

412
                // Serve all requests for channel lifetime.
413
                case req := <-c.chanInfoRequests:
2✔
414
                        var resp channelInfoResponse
2✔
415

2✔
416
                        resp.info, resp.err = c.getChanInfo(req)
2✔
417
                        req.responseChan <- resp
2✔
418

419
                // Serve all requests for information about our peer.
420
                case req := <-c.peerRequests:
2✔
421
                        var resp peerResponse
2✔
422

2✔
423
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
2✔
424
                                req.peer,
2✔
425
                        )
2✔
426
                        req.responseChan <- resp
2✔
427

UNCOV
428
                case <-c.cfg.FlapCountTicker.Ticks():
×
UNCOV
429
                        if err := c.recordFlapCount(); err != nil {
×
430
                                log.Errorf("could not record flap "+
×
431
                                        "count: %v", err)
×
432
                        }
×
433

434
                // Exit if the store receives the signal to shutdown.
435
                case <-c.quit:
2✔
436
                        return
2✔
437
                }
438
        }
439
}
440

441
// ChannelInfo provides the set of information that the event store has recorded
442
// for a channel.
443
type ChannelInfo struct {
444
        // Lifetime is the total amount of time we have monitored the channel
445
        // for.
446
        Lifetime time.Duration
447

448
        // Uptime is the total amount of time that the channel peer has been
449
        // observed as online during the monitored lifespan.
450
        Uptime time.Duration
451
}
452

453
// GetChanInfo gets all the information we have on a channel in the event store.
454
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
455
        peer route.Vertex) (*ChannelInfo, error) {
2✔
456

2✔
457
        request := channelInfoRequest{
2✔
458
                peer:         peer,
2✔
459
                channelPoint: channelPoint,
2✔
460
                responseChan: make(chan channelInfoResponse),
2✔
461
        }
2✔
462

2✔
463
        // Send a request for the channel's information to the main event loop,
2✔
464
        // or return early with an error if the store has already received a
2✔
465
        // shutdown signal.
2✔
466
        select {
2✔
467
        case c.chanInfoRequests <- request:
2✔
468
        case <-c.quit:
×
469
                return nil, errShuttingDown
×
470
        }
471

472
        // Return the response we receive on the response channel or exit early
473
        // if the store is instructed to exit.
474
        select {
2✔
475
        case resp := <-request.responseChan:
2✔
476
                return resp.info, resp.err
2✔
477

478
        case <-c.quit:
×
479
                return nil, errShuttingDown
×
480
        }
481
}
482

483
// getChanInfo collects channel information for a channel. It gets uptime over
484
// the full lifetime of the channel.
485
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
486
        error) {
2✔
487

2✔
488
        peerMonitor, ok := c.peers[req.peer]
2✔
489
        if !ok {
2✔
490
                return nil, ErrPeerNotFound
×
491
        }
×
492

493
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
2✔
494
        if err != nil {
4✔
495
                return nil, err
2✔
496
        }
2✔
497

498
        return &ChannelInfo{
2✔
499
                Lifetime: lifetime,
2✔
500
                Uptime:   uptime,
2✔
501
        }, nil
2✔
502
}
503

504
// FlapCount returns the flap count we have for a peer and the timestamp of its
505
// last flap. If we do not have any flaps recorded for the peer, the last flap
506
// timestamp will be nil.
507
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
508
        error) {
2✔
509

2✔
510
        request := peerRequest{
2✔
511
                peer:         peer,
2✔
512
                responseChan: make(chan peerResponse),
2✔
513
        }
2✔
514

2✔
515
        // Send a request for the peer's information to the main event loop,
2✔
516
        // or return early with an error if the store has already received a
2✔
517
        // shutdown signal.
2✔
518
        select {
2✔
519
        case c.peerRequests <- request:
2✔
520
        case <-c.quit:
×
521
                return 0, nil, errShuttingDown
×
522
        }
523

524
        // Return the response we receive on the response channel or exit early
525
        // if the store is instructed to exit.
526
        select {
2✔
527
        case resp := <-request.responseChan:
2✔
528
                return resp.flapCount, resp.ts, resp.err
2✔
529

530
        case <-c.quit:
×
531
                return 0, nil, errShuttingDown
×
532
        }
533
}
534

535
// flapCount gets our peer flap count and last flap timestamp from our in memory
536
// record of a peer, falling back to on disk if we are not currently tracking
537
// the peer. If we have no flap count recorded for the peer, a nil last flap
538
// time will be returned.
539
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
540
        error) {
2✔
541

2✔
542
        // First check whether we are tracking this peer in memory, because this
2✔
543
        // record will have the most accurate flap count. We do not fail if we
2✔
544
        // can't find the peer in memory, because we may have previously
2✔
545
        // recorded its flap count on disk.
2✔
546
        peerMonitor, ok := c.peers[peer]
2✔
547
        if ok {
4✔
548
                count, ts := peerMonitor.getFlapCount()
2✔
549
                return count, ts, nil
2✔
550
        }
2✔
551

552
        // Try to get our flap count from the database. If this value is not
553
        // recorded, we return a nil last flap time to indicate that we have no
554
        // record of the peer's flap count.
UNCOV
555
        flapCount, err := c.cfg.ReadFlapCount(peer)
×
UNCOV
556
        switch err {
×
UNCOV
557
        case channeldb.ErrNoPeerBucket:
×
UNCOV
558
                return 0, nil, nil
×
559

UNCOV
560
        case nil:
×
UNCOV
561
                return int(flapCount.Count), &flapCount.LastFlap, nil
×
562

563
        default:
×
564
                return 0, nil, err
×
565
        }
566
}
567

568
// recordFlapCount will record our flap count for each peer that we are
569
// currently tracking, skipping peers that have a 0 flap count.
570
func (c *ChannelEventStore) recordFlapCount() error {
2✔
571
        updates := make(peerFlapCountMap)
2✔
572

2✔
573
        for peer, monitor := range c.peers {
4✔
574
                flapCount, lastFlap := monitor.getFlapCount()
2✔
575
                if lastFlap == nil {
2✔
576
                        continue
×
577
                }
578

579
                updates[peer] = &channeldb.FlapCount{
2✔
580
                        Count:    uint32(flapCount),
2✔
581
                        LastFlap: *lastFlap,
2✔
582
                }
2✔
583
        }
584

585
        log.Debugf("recording flap count for: %v peers", len(updates))
2✔
586

2✔
587
        return c.cfg.WriteFlapCount(updates)
2✔
588
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc