• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.22
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "sync"
16
        "time"
17

18
        "github.com/btcsuite/btcd/wire"
19
        "github.com/lightningnetwork/lnd/channeldb"
20
        "github.com/lightningnetwork/lnd/channelnotifier"
21
        "github.com/lightningnetwork/lnd/clock"
22
        "github.com/lightningnetwork/lnd/peernotifier"
23
        "github.com/lightningnetwork/lnd/routing/route"
24
        "github.com/lightningnetwork/lnd/subscribe"
25
        "github.com/lightningnetwork/lnd/ticker"
26
)
27

28
const (
29
        // FlapCountFlushRate determines how often we write peer total flap
30
        // count to disk.
31
        FlapCountFlushRate = time.Hour
32
)
33

34
var (
35
        // errShuttingDown is returned when the store cannot respond to a query
36
        // because it has received the shutdown signal.
37
        errShuttingDown = errors.New("channel event store shutting down")
38

39
        // ErrChannelNotFound is returned when a query is made for a channel
40
        // that the event store does not have knowledge of.
41
        ErrChannelNotFound = errors.New("channel not found in event store")
42

43
        // ErrPeerNotFound is returned when a query is made for a channel
44
        // that has a peer that the event store is not currently tracking.
45
        ErrPeerNotFound = errors.New("peer not found in event store")
46
)
47

48
// ChannelEventStore maintains a set of event logs for the node's channels to
49
// provide insight into the performance and health of channels.
50
type ChannelEventStore struct {
51
        cfg *Config
52

53
        // peers tracks all of our currently monitored peers and their channels.
54
        peers map[route.Vertex]peerMonitor
55

56
        // chanInfoRequests serves requests for information about our channel.
57
        chanInfoRequests chan channelInfoRequest
58

59
        // peerRequests serves requests for information about a peer.
60
        peerRequests chan peerRequest
61

62
        quit chan struct{}
63

64
        wg sync.WaitGroup
65
}
66

67
// Config provides the event store with functions required to monitor channel
68
// activity. All elements of the config must be non-nil for the event store to
69
// operate.
70
type Config struct {
71
        // SubscribeChannelEvents provides a subscription client which provides
72
        // a stream of channel events.
73
        SubscribeChannelEvents func() (subscribe.Subscription, error)
74

75
        // SubscribePeerEvents provides a subscription client which provides a
76
        // stream of peer online/offline events.
77
        SubscribePeerEvents func() (subscribe.Subscription, error)
78

79
        // GetOpenChannels provides a list of existing open channels which is
80
        // used to populate the ChannelEventStore with a set of channels on
81
        // startup.
82
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
83

84
        // Clock is the time source that the subsystem uses, provided here
85
        // for ease of testing.
86
        Clock clock.Clock
87

88
        // WriteFlapCounts records the flap count for a set of peers on disk.
89
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
90

91
        // ReadFlapCount gets the flap count for a peer on disk.
92
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
93

94
        // FlapCountTicker is a ticker which controls how often we flush our
95
        // peer's flap count to disk.
96
        FlapCountTicker ticker.Ticker
97
}
98

99
// peerFlapCountMap is the map used to map peers to flap counts, declared here
100
// to allow shorter function signatures.
101
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
102

103
type channelInfoRequest struct {
104
        peer         route.Vertex
105
        channelPoint wire.OutPoint
106
        responseChan chan channelInfoResponse
107
}
108

109
type channelInfoResponse struct {
110
        info *ChannelInfo
111
        err  error
112
}
113

114
type peerRequest struct {
115
        peer         route.Vertex
116
        responseChan chan peerResponse
117
}
118

119
type peerResponse struct {
120
        flapCount int
121
        ts        *time.Time
122
        err       error
123
}
124

125
// NewChannelEventStore initializes an event store with the config provided.
126
// Note that this function does not start the main event loop, Start() must be
127
// called.
128
func NewChannelEventStore(config *Config) *ChannelEventStore {
3✔
129
        store := &ChannelEventStore{
3✔
130
                cfg:              config,
3✔
131
                peers:            make(map[route.Vertex]peerMonitor),
3✔
132
                chanInfoRequests: make(chan channelInfoRequest),
3✔
133
                peerRequests:     make(chan peerRequest),
3✔
134
                quit:             make(chan struct{}),
3✔
135
        }
3✔
136

3✔
137
        return store
3✔
138
}
3✔
139

140
// Start adds all existing open channels to the event store and starts the main
141
// loop which records channel and peer events, and serves requests for
142
// information from the store. If this function fails, it cancels its existing
143
// subscriptions and returns an error.
144
func (c *ChannelEventStore) Start() error {
3✔
145
        log.Info("ChannelEventStore starting")
3✔
146

3✔
147
        // Create a subscription to channel events.
3✔
148
        channelClient, err := c.cfg.SubscribeChannelEvents()
3✔
149
        if err != nil {
3✔
150
                return err
×
151
        }
×
152

153
        // Create a subscription to peer events. If an error occurs, cancel the
154
        // existing subscription to channel events and return.
155
        peerClient, err := c.cfg.SubscribePeerEvents()
3✔
156
        if err != nil {
3✔
157
                channelClient.Cancel()
×
158
                return err
×
159
        }
×
160

161
        // cancel should be called to cancel all subscriptions if an error
162
        // occurs.
163
        cancel := func() {
6✔
164
                channelClient.Cancel()
3✔
165
                peerClient.Cancel()
3✔
166
        }
3✔
167

168
        // Add the existing set of channels to the event store. This is required
169
        // because channel events will not be triggered for channels that exist
170
        // at startup time.
171
        channels, err := c.cfg.GetOpenChannels()
3✔
172
        if err != nil {
3✔
173
                cancel()
×
174
                return err
×
175
        }
×
176

177
        log.Infof("Adding %v channels to event store", len(channels))
3✔
178

3✔
179
        for _, ch := range channels {
6✔
180
                peerKey, err := route.NewVertexFromBytes(
3✔
181
                        ch.IdentityPub.SerializeCompressed(),
3✔
182
                )
3✔
183
                if err != nil {
3✔
184
                        cancel()
×
185
                        return err
×
186
                }
×
187

188
                // Add existing channels to the channel store with an initial
189
                // peer online or offline event.
190
                c.addChannel(ch.FundingOutpoint, peerKey)
3✔
191
        }
192

193
        // Start a goroutine that consumes events from all subscriptions.
194
        c.wg.Add(1)
3✔
195
        go c.consume(&subscriptions{
3✔
196
                channelUpdates: channelClient.Updates(),
3✔
197
                peerUpdates:    peerClient.Updates(),
3✔
198
                cancel:         cancel,
3✔
199
        })
3✔
200

3✔
201
        return nil
3✔
202
}
203

204
// Stop terminates all goroutines started by the event store.
205
func (c *ChannelEventStore) Stop() {
3✔
206
        log.Info("ChannelEventStore shutting down...")
3✔
207
        defer log.Debug("ChannelEventStore shutdown complete")
3✔
208

3✔
209
        // Stop the consume goroutine.
3✔
210
        close(c.quit)
3✔
211
        c.wg.Wait()
3✔
212

3✔
213
        // Stop the ticker after the goroutine reading from it has exited, to
3✔
214
        // avoid a race.
3✔
215
        c.cfg.FlapCountTicker.Stop()
3✔
216
}
3✔
217

218
// addChannel checks whether we are already tracking a channel's peer, creates a
219
// new peer log to track it if we are not yet monitoring it, and adds the
220
// channel.
221
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
222
        peer route.Vertex) {
3✔
223

3✔
224
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
225
        if err != nil {
3✔
226
                log.Error("could not create monitor: %v", err)
×
227
                return
×
228
        }
×
229

230
        if err := peerMonitor.addChannel(channelPoint); err != nil {
3✔
231
                log.Errorf("could not add channel: %v", err)
×
232
        }
×
233
}
234

235
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
236
// and falls back to creating a new monitor if it is not currently known.
237
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
238
        error) {
3✔
239

3✔
240
        peerMonitor, ok := c.peers[peer]
3✔
241
        if ok {
6✔
242
                return peerMonitor, nil
3✔
243
        }
3✔
244

245
        var (
3✔
246
                flapCount int
3✔
247
                lastFlap  *time.Time
3✔
248
        )
3✔
249

3✔
250
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
3✔
251
        switch err {
3✔
252
        // If we do not have any records for this peer we set a 0 flap count
253
        // and timestamp.
254
        case channeldb.ErrNoPeerBucket:
3✔
255

256
        case nil:
3✔
257
                flapCount = int(historicalFlap.Count)
3✔
258
                lastFlap = &historicalFlap.LastFlap
3✔
259

260
        // Return if we get an unexpected error.
261
        default:
×
262
                return nil, err
×
263
        }
264

265
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
3✔
266
        c.peers[peer] = peerMonitor
3✔
267

3✔
268
        return peerMonitor, nil
3✔
269
}
270

271
// closeChannel records a closed time for a channel, and returns early is the
272
// channel is not known to the event store. We log warnings (rather than errors)
273
// when we cannot find a peer/channel because channels that we restore from a
274
// static channel backup do not have their open notified, so the event store
275
// never learns about them, but they are closed using the regular flow so we
276
// will try to remove them on close. At present, we cannot easily distinguish
277
// between these closes and others.
278
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
279
        peer route.Vertex) {
3✔
280

3✔
281
        peerMonitor, ok := c.peers[peer]
3✔
282
        if !ok {
6✔
283
                log.Warnf("peer not known to store: %v", peer)
3✔
284
                return
3✔
285
        }
3✔
286

287
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
6✔
288
                log.Warnf("could not remove channel: %v", err)
3✔
289
        }
3✔
290
}
291

292
// peerEvent creates a peer monitor for a peer if we do not currently have
293
// one, and adds an online event to it.
294
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
3✔
295
        peerMonitor, err := c.getPeerMonitor(peer)
3✔
296
        if err != nil {
3✔
297
                log.Error("could not create monitor: %v", err)
×
298
                return
×
299
        }
×
300

301
        peerMonitor.onlineEvent(online)
3✔
302
}
303

304
// subscriptions abstracts away from subscription clients to allow for mocking.
305
type subscriptions struct {
306
        channelUpdates <-chan interface{}
307
        peerUpdates    <-chan interface{}
308
        cancel         func()
309
}
310

311
// consume is the event store's main loop. It consumes subscriptions to update
312
// the event store with channel and peer events, and serves requests for channel
313
// uptime and lifespan.
314
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
3✔
315
        // Start our flap count ticker.
3✔
316
        c.cfg.FlapCountTicker.Resume()
3✔
317

3✔
318
        // On exit, we will cancel our subscriptions and write our most recent
3✔
319
        // flap counts to disk. This ensures that we have consistent data in
3✔
320
        // the case of a graceful shutdown. If we do not shutdown gracefully,
3✔
321
        // our worst case is data from our last flap count tick (1H).
3✔
322
        defer func() {
6✔
323
                subscriptions.cancel()
3✔
324

3✔
325
                if err := c.recordFlapCount(); err != nil {
3✔
326
                        log.Errorf("error recording flap on shutdown: %v", err)
×
327
                }
×
328

329
                c.wg.Done()
3✔
330
        }()
331

332
        // Consume events until the channel is closed.
333
        for {
6✔
334
                select {
3✔
335
                // Process channel opened and closed events.
336
                case e := <-subscriptions.channelUpdates:
3✔
337
                        switch event := e.(type) {
3✔
338
                        // A new channel has been opened, we must add the
339
                        // channel to the store and record a channel open event.
340
                        case channelnotifier.OpenChannelEvent:
3✔
341
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
3✔
342
                                peerKey, err := route.NewVertexFromBytes(
3✔
343
                                        compressed,
3✔
344
                                )
3✔
345
                                if err != nil {
3✔
346
                                        log.Errorf("Could not get vertex "+
×
347
                                                "from: %v", compressed)
×
348
                                }
×
349

350
                                c.addChannel(
3✔
351
                                        event.Channel.FundingOutpoint, peerKey,
3✔
352
                                )
3✔
353

354
                        // A channel has been closed, we must remove the channel
355
                        // from the store and record a channel closed event.
356
                        case channelnotifier.ClosedChannelEvent:
3✔
357
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
3✔
358
                                peerKey, err := route.NewVertexFromBytes(
3✔
359
                                        compressed,
3✔
360
                                )
3✔
361
                                if err != nil {
3✔
362
                                        log.Errorf("Could not get vertex "+
×
363
                                                "from: %v", compressed)
×
364
                                        continue
×
365
                                }
366

367
                                c.closeChannel(
3✔
368
                                        event.CloseSummary.ChanPoint, peerKey,
3✔
369
                                )
3✔
370
                        }
371

372
                // Process peer online and offline events.
373
                case e := <-subscriptions.peerUpdates:
3✔
374
                        switch event := e.(type) {
3✔
375
                        // We have reestablished a connection with our peer,
376
                        // and should record an online event for any channels
377
                        // with that peer.
378
                        case peernotifier.PeerOnlineEvent:
3✔
379
                                c.peerEvent(event.PubKey, true)
3✔
380

381
                        // We have lost a connection with our peer, and should
382
                        // record an offline event for any channels with that
383
                        // peer.
384
                        case peernotifier.PeerOfflineEvent:
3✔
385
                                c.peerEvent(event.PubKey, false)
3✔
386
                        }
387

388
                // Serve all requests for channel lifetime.
389
                case req := <-c.chanInfoRequests:
3✔
390
                        var resp channelInfoResponse
3✔
391

3✔
392
                        resp.info, resp.err = c.getChanInfo(req)
3✔
393
                        req.responseChan <- resp
3✔
394

395
                // Serve all requests for information about our peer.
396
                case req := <-c.peerRequests:
3✔
397
                        var resp peerResponse
3✔
398

3✔
399
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
3✔
400
                                req.peer,
3✔
401
                        )
3✔
402
                        req.responseChan <- resp
3✔
403

404
                case <-c.cfg.FlapCountTicker.Ticks():
×
405
                        if err := c.recordFlapCount(); err != nil {
×
406
                                log.Errorf("could not record flap "+
×
407
                                        "count: %v", err)
×
408
                        }
×
409

410
                // Exit if the store receives the signal to shutdown.
411
                case <-c.quit:
3✔
412
                        return
3✔
413
                }
414
        }
415
}
416

417
// ChannelInfo provides the set of information that the event store has recorded
418
// for a channel.
419
type ChannelInfo struct {
420
        // Lifetime is the total amount of time we have monitored the channel
421
        // for.
422
        Lifetime time.Duration
423

424
        // Uptime is the total amount of time that the channel peer has been
425
        // observed as online during the monitored lifespan.
426
        Uptime time.Duration
427
}
428

429
// GetChanInfo gets all the information we have on a channel in the event store.
430
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
431
        peer route.Vertex) (*ChannelInfo, error) {
3✔
432

3✔
433
        request := channelInfoRequest{
3✔
434
                peer:         peer,
3✔
435
                channelPoint: channelPoint,
3✔
436
                responseChan: make(chan channelInfoResponse),
3✔
437
        }
3✔
438

3✔
439
        // Send a request for the channel's information to the main event loop,
3✔
440
        // or return early with an error if the store has already received a
3✔
441
        // shutdown signal.
3✔
442
        select {
3✔
443
        case c.chanInfoRequests <- request:
3✔
444
        case <-c.quit:
×
445
                return nil, errShuttingDown
×
446
        }
447

448
        // Return the response we receive on the response channel or exit early
449
        // if the store is instructed to exit.
450
        select {
3✔
451
        case resp := <-request.responseChan:
3✔
452
                return resp.info, resp.err
3✔
453

454
        case <-c.quit:
×
455
                return nil, errShuttingDown
×
456
        }
457
}
458

459
// getChanInfo collects channel information for a channel. It gets uptime over
460
// the full lifetime of the channel.
461
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
462
        error) {
3✔
463

3✔
464
        peerMonitor, ok := c.peers[req.peer]
3✔
465
        if !ok {
3✔
466
                return nil, ErrPeerNotFound
×
467
        }
×
468

469
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
3✔
470
        if err != nil {
6✔
471
                return nil, err
3✔
472
        }
3✔
473

474
        return &ChannelInfo{
3✔
475
                Lifetime: lifetime,
3✔
476
                Uptime:   uptime,
3✔
477
        }, nil
3✔
478
}
479

480
// FlapCount returns the flap count we have for a peer and the timestamp of its
481
// last flap. If we do not have any flaps recorded for the peer, the last flap
482
// timestamp will be nil.
483
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
484
        error) {
3✔
485

3✔
486
        request := peerRequest{
3✔
487
                peer:         peer,
3✔
488
                responseChan: make(chan peerResponse),
3✔
489
        }
3✔
490

3✔
491
        // Send a request for the peer's information to the main event loop,
3✔
492
        // or return early with an error if the store has already received a
3✔
493
        // shutdown signal.
3✔
494
        select {
3✔
495
        case c.peerRequests <- request:
3✔
496
        case <-c.quit:
×
497
                return 0, nil, errShuttingDown
×
498
        }
499

500
        // Return the response we receive on the response channel or exit early
501
        // if the store is instructed to exit.
502
        select {
3✔
503
        case resp := <-request.responseChan:
3✔
504
                return resp.flapCount, resp.ts, resp.err
3✔
505

506
        case <-c.quit:
×
507
                return 0, nil, errShuttingDown
×
508
        }
509
}
510

511
// flapCount gets our peer flap count and last flap timestamp from our in memory
512
// record of a peer, falling back to on disk if we are not currently tracking
513
// the peer. If we have no flap count recorded for the peer, a nil last flap
514
// time will be returned.
515
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
516
        error) {
3✔
517

3✔
518
        // First check whether we are tracking this peer in memory, because this
3✔
519
        // record will have the most accurate flap count. We do not fail if we
3✔
520
        // can't find the peer in memory, because we may have previously
3✔
521
        // recorded its flap count on disk.
3✔
522
        peerMonitor, ok := c.peers[peer]
3✔
523
        if ok {
6✔
524
                count, ts := peerMonitor.getFlapCount()
3✔
525
                return count, ts, nil
3✔
526
        }
3✔
527

528
        // Try to get our flap count from the database. If this value is not
529
        // recorded, we return a nil last flap time to indicate that we have no
530
        // record of the peer's flap count.
531
        flapCount, err := c.cfg.ReadFlapCount(peer)
×
532
        switch err {
×
533
        case channeldb.ErrNoPeerBucket:
×
534
                return 0, nil, nil
×
535

536
        case nil:
×
537
                return int(flapCount.Count), &flapCount.LastFlap, nil
×
538

539
        default:
×
540
                return 0, nil, err
×
541
        }
542
}
543

544
// recordFlapCount will record our flap count for each peer that we are
545
// currently tracking, skipping peers that have a 0 flap count.
546
func (c *ChannelEventStore) recordFlapCount() error {
3✔
547
        updates := make(peerFlapCountMap)
3✔
548

3✔
549
        for peer, monitor := range c.peers {
6✔
550
                flapCount, lastFlap := monitor.getFlapCount()
3✔
551
                if lastFlap == nil {
3✔
552
                        continue
×
553
                }
554

555
                updates[peer] = &channeldb.FlapCount{
3✔
556
                        Count:    uint32(flapCount),
3✔
557
                        LastFlap: *lastFlap,
3✔
558
                }
3✔
559
        }
560

561
        log.Debugf("recording flap count for: %v peers", len(updates))
3✔
562

3✔
563
        return c.cfg.WriteFlapCount(updates)
3✔
564
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc