• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.78
/chanfitness/chaneventstore.go
1
// Package chanfitness monitors the behaviour of channels to provide insight
2
// into the health and performance of a channel. This is achieved by maintaining
3
// an event store which tracks events for each channel.
4
//
5
// Lifespan: the period that the channel has been known to the scoring system.
6
// Note that lifespan may not equal the channel's full lifetime because data is
7
// not currently persisted.
8
//
9
// Uptime: the total time within a given period that the channel's remote peer
10
// has been online.
11
package chanfitness
12

13
import (
14
        "errors"
15
        "fmt"
16
        "sync"
17
        "sync/atomic"
18
        "time"
19

20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channelnotifier"
23
        "github.com/lightningnetwork/lnd/clock"
24
        "github.com/lightningnetwork/lnd/peernotifier"
25
        "github.com/lightningnetwork/lnd/routing/route"
26
        "github.com/lightningnetwork/lnd/subscribe"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // FlapCountFlushRate determines how often we write peer total flap
32
        // count to disk.
33
        FlapCountFlushRate = time.Hour
34
)
35

36
var (
37
        // errShuttingDown is returned when the store cannot respond to a query
38
        // because it has received the shutdown signal.
39
        errShuttingDown = errors.New("channel event store shutting down")
40

41
        // ErrChannelNotFound is returned when a query is made for a channel
42
        // that the event store does not have knowledge of.
43
        ErrChannelNotFound = errors.New("channel not found in event store")
44

45
        // ErrPeerNotFound is returned when a query is made for a channel
46
        // that has a peer that the event store is not currently tracking.
47
        ErrPeerNotFound = errors.New("peer not found in event store")
48
)
49

50
// ChannelEventStore maintains a set of event logs for the node's channels to
51
// provide insight into the performance and health of channels.
52
type ChannelEventStore struct {
53
        started atomic.Bool
54
        stopped atomic.Bool
55

56
        cfg *Config
57

58
        // peers tracks all of our currently monitored peers and their channels.
59
        peers map[route.Vertex]peerMonitor
60

61
        // chanInfoRequests serves requests for information about our channel.
62
        chanInfoRequests chan channelInfoRequest
63

64
        // peerRequests serves requests for information about a peer.
65
        peerRequests chan peerRequest
66

67
        quit chan struct{}
68

69
        wg sync.WaitGroup
70
}
71

72
// Config provides the event store with functions required to monitor channel
73
// activity. All elements of the config must be non-nil for the event store to
74
// operate.
75
type Config struct {
76
        // SubscribeChannelEvents provides a subscription client which provides
77
        // a stream of channel events.
78
        SubscribeChannelEvents func() (subscribe.Subscription, error)
79

80
        // SubscribePeerEvents provides a subscription client which provides a
81
        // stream of peer online/offline events.
82
        SubscribePeerEvents func() (subscribe.Subscription, error)
83

84
        // GetOpenChannels provides a list of existing open channels which is
85
        // used to populate the ChannelEventStore with a set of channels on
86
        // startup.
87
        GetOpenChannels func() ([]*channeldb.OpenChannel, error)
88

89
        // Clock is the time source that the subsystem uses, provided here
90
        // for ease of testing.
91
        Clock clock.Clock
92

93
        // WriteFlapCount records the flap count for a set of peers on disk.
94
        WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
95

96
        // ReadFlapCount gets the flap count for a peer on disk.
97
        ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
98

99
        // FlapCountTicker is a ticker which controls how often we flush our
100
        // peer's flap count to disk.
101
        FlapCountTicker ticker.Ticker
102
}
103

104
// peerFlapCountMap is the map used to map peers to flap counts, declared here
105
// to allow shorter function signatures.
106
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
107

108
type channelInfoRequest struct {
109
        peer         route.Vertex
110
        channelPoint wire.OutPoint
111
        responseChan chan channelInfoResponse
112
}
113

114
type channelInfoResponse struct {
115
        info *ChannelInfo
116
        err  error
117
}
118

119
type peerRequest struct {
120
        peer         route.Vertex
121
        responseChan chan peerResponse
122
}
123

124
type peerResponse struct {
125
        flapCount int
126
        ts        *time.Time
127
        err       error
128
}
129

130
// NewChannelEventStore initializes an event store with the config provided.
131
// Note that this function does not start the main event loop, Start() must be
132
// called.
133
func NewChannelEventStore(config *Config) *ChannelEventStore {
11✔
134
        store := &ChannelEventStore{
11✔
135
                cfg:              config,
11✔
136
                peers:            make(map[route.Vertex]peerMonitor),
11✔
137
                chanInfoRequests: make(chan channelInfoRequest),
11✔
138
                peerRequests:     make(chan peerRequest),
11✔
139
                quit:             make(chan struct{}),
11✔
140
        }
11✔
141

11✔
142
        return store
11✔
143
}
11✔
144

145
// Start adds all existing open channels to the event store and starts the main
146
// loop which records channel and peer events, and serves requests for
147
// information from the store. If this function fails, it cancels its existing
148
// subscriptions and returns an error.
149
func (c *ChannelEventStore) Start() error {
11✔
150
        log.Info("ChannelEventStore starting...")
11✔
151

11✔
152
        if c.started.Swap(true) {
11✔
153
                return fmt.Errorf("ChannelEventStore started more than once")
×
154
        }
×
155

156
        // Create a subscription to channel events.
157
        channelClient, err := c.cfg.SubscribeChannelEvents()
11✔
158
        if err != nil {
12✔
159
                return err
1✔
160
        }
1✔
161

162
        // Create a subscription to peer events. If an error occurs, cancel the
163
        // existing subscription to channel events and return.
164
        peerClient, err := c.cfg.SubscribePeerEvents()
10✔
165
        if err != nil {
11✔
166
                channelClient.Cancel()
1✔
167
                return err
1✔
168
        }
1✔
169

170
        // cancel should be called to cancel all subscriptions if an error
171
        // occurs.
172
        cancel := func() {
18✔
173
                channelClient.Cancel()
9✔
174
                peerClient.Cancel()
9✔
175
        }
9✔
176

177
        // Add the existing set of channels to the event store. This is required
178
        // because channel events will not be triggered for channels that exist
179
        // at startup time.
180
        channels, err := c.cfg.GetOpenChannels()
9✔
181
        if err != nil {
10✔
182
                cancel()
1✔
183
                return err
1✔
184
        }
1✔
185

186
        log.Infof("Adding %v channels to event store", len(channels))
8✔
187

8✔
188
        for _, ch := range channels {
8✔
UNCOV
189
                peerKey, err := route.NewVertexFromBytes(
×
UNCOV
190
                        ch.IdentityPub.SerializeCompressed(),
×
UNCOV
191
                )
×
UNCOV
192
                if err != nil {
×
193
                        cancel()
×
194
                        return err
×
195
                }
×
196

197
                // Add existing channels to the channel store with an initial
198
                // peer online or offline event.
UNCOV
199
                c.addChannel(ch.FundingOutpoint, peerKey)
×
200
        }
201

202
        // Start a goroutine that consumes events from all subscriptions.
203
        c.wg.Add(1)
8✔
204
        go c.consume(&subscriptions{
8✔
205
                channelUpdates: channelClient.Updates(),
8✔
206
                peerUpdates:    peerClient.Updates(),
8✔
207
                cancel:         cancel,
8✔
208
        })
8✔
209

8✔
210
        log.Debug("ChannelEventStore started")
8✔
211

8✔
212
        return nil
8✔
213
}
214

215
// Stop terminates all goroutines started by the event store.
216
func (c *ChannelEventStore) Stop() error {
8✔
217
        log.Info("ChannelEventStore shutting down...")
8✔
218

8✔
219
        if c.stopped.Swap(true) {
8✔
220
                return fmt.Errorf("ChannelEventStore stopped more than once")
×
221
        }
×
222

223
        // Stop the consume goroutine.
224
        close(c.quit)
8✔
225
        c.wg.Wait()
8✔
226

8✔
227
        // Stop the ticker after the goroutine reading from it has exited, to
8✔
228
        // avoid a race.
8✔
229
        var err error
8✔
230
        if c.cfg.FlapCountTicker == nil {
8✔
231
                err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
×
232
                        "initialized")
×
233
        } else {
8✔
234
                c.cfg.FlapCountTicker.Stop()
8✔
235
        }
8✔
236

237
        log.Debugf("ChannelEventStore shutdown complete")
8✔
238

8✔
239
        return err
8✔
240
}
241

242
// addChannel checks whether we are already tracking a channel's peer, creates a
243
// new peer log to track it if we are not yet monitoring it, and adds the
244
// channel.
245
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
246
        peer route.Vertex) {
10✔
247

10✔
248
        peerMonitor, err := c.getPeerMonitor(peer)
10✔
249
        if err != nil {
10✔
250
                log.Error("could not create monitor: %v", err)
×
251
                return
×
252
        }
×
253

254
        if err := peerMonitor.addChannel(channelPoint); err != nil {
11✔
255
                log.Errorf("could not add channel: %v", err)
1✔
256
        }
1✔
257
}
258

259
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
260
// and falls back to creating a new monitor if it is not currently known.
261
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
262
        error) {
25✔
263

25✔
264
        peerMonitor, ok := c.peers[peer]
25✔
265
        if ok {
42✔
266
                return peerMonitor, nil
17✔
267
        }
17✔
268

269
        var (
8✔
270
                flapCount int
8✔
271
                lastFlap  *time.Time
8✔
272
        )
8✔
273

8✔
274
        historicalFlap, err := c.cfg.ReadFlapCount(peer)
8✔
275
        switch err {
8✔
276
        // If we do not have any records for this peer we set a 0 flap count
277
        // and timestamp.
278
        case channeldb.ErrNoPeerBucket:
8✔
279

UNCOV
280
        case nil:
×
UNCOV
281
                flapCount = int(historicalFlap.Count)
×
UNCOV
282
                lastFlap = &historicalFlap.LastFlap
×
283

284
        // Return if we get an unexpected error.
285
        default:
×
286
                return nil, err
×
287
        }
288

289
        peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
8✔
290
        c.peers[peer] = peerMonitor
8✔
291

8✔
292
        return peerMonitor, nil
8✔
293
}
294

295
// closeChannel records a closed time for a channel, and returns early is the
296
// channel is not known to the event store. We log warnings (rather than errors)
297
// when we cannot find a peer/channel because channels that we restore from a
298
// static channel backup do not have their open notified, so the event store
299
// never learns about them, but they are closed using the regular flow so we
300
// will try to remove them on close. At present, we cannot easily distinguish
301
// between these closes and others.
302
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
303
        peer route.Vertex) {
1✔
304

1✔
305
        peerMonitor, ok := c.peers[peer]
1✔
306
        if !ok {
1✔
UNCOV
307
                log.Warnf("peer not known to store: %v", peer)
×
UNCOV
308
                return
×
UNCOV
309
        }
×
310

311
        if err := peerMonitor.removeChannel(channelPoint); err != nil {
1✔
UNCOV
312
                log.Warnf("could not remove channel: %v", err)
×
UNCOV
313
        }
×
314
}
315

316
// peerEvent creates a peer monitor for a peer if we do not currently have
317
// one, and adds an online event to it.
318
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
15✔
319
        peerMonitor, err := c.getPeerMonitor(peer)
15✔
320
        if err != nil {
15✔
321
                log.Error("could not create monitor: %v", err)
×
322
                return
×
323
        }
×
324

325
        peerMonitor.onlineEvent(online)
15✔
326
}
327

328
// subscriptions abstracts away from subscription clients to allow for mocking.
329
type subscriptions struct {
330
        channelUpdates <-chan interface{}
331
        peerUpdates    <-chan interface{}
332
        cancel         func()
333
}
334

335
// consume is the event store's main loop. It consumes subscriptions to update
336
// the event store with channel and peer events, and serves requests for channel
337
// uptime and lifespan.
338
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
8✔
339
        // Start our flap count ticker.
8✔
340
        c.cfg.FlapCountTicker.Resume()
8✔
341

8✔
342
        // On exit, we will cancel our subscriptions and write our most recent
8✔
343
        // flap counts to disk. This ensures that we have consistent data in
8✔
344
        // the case of a graceful shutdown. If we do not shutdown gracefully,
8✔
345
        // our worst case is data from our last flap count tick (1H).
8✔
346
        defer func() {
16✔
347
                subscriptions.cancel()
8✔
348

8✔
349
                if err := c.recordFlapCount(); err != nil {
8✔
350
                        log.Errorf("error recording flap on shutdown: %v", err)
×
351
                }
×
352

353
                c.wg.Done()
8✔
354
        }()
355

356
        // Consume events until the channel is closed.
357
        for {
52✔
358
                select {
44✔
359
                // Process channel opened and closed events.
360
                case e := <-subscriptions.channelUpdates:
11✔
361
                        switch event := e.(type) {
11✔
362
                        // A new channel has been opened, we must add the
363
                        // channel to the store and record a channel open event.
364
                        case channelnotifier.OpenChannelEvent:
10✔
365
                                compressed := event.Channel.IdentityPub.SerializeCompressed()
10✔
366
                                peerKey, err := route.NewVertexFromBytes(
10✔
367
                                        compressed,
10✔
368
                                )
10✔
369
                                if err != nil {
10✔
370
                                        log.Errorf("Could not get vertex "+
×
371
                                                "from: %v", compressed)
×
372
                                }
×
373

374
                                c.addChannel(
10✔
375
                                        event.Channel.FundingOutpoint, peerKey,
10✔
376
                                )
10✔
377

378
                        // A channel has been closed, we must remove the channel
379
                        // from the store and record a channel closed event.
380
                        case channelnotifier.ClosedChannelEvent:
1✔
381
                                compressed := event.CloseSummary.RemotePub.SerializeCompressed()
1✔
382
                                peerKey, err := route.NewVertexFromBytes(
1✔
383
                                        compressed,
1✔
384
                                )
1✔
385
                                if err != nil {
1✔
386
                                        log.Errorf("Could not get vertex "+
×
387
                                                "from: %v", compressed)
×
388
                                        continue
×
389
                                }
390

391
                                c.closeChannel(
1✔
392
                                        event.CloseSummary.ChanPoint, peerKey,
1✔
393
                                )
1✔
394
                        }
395

396
                // Process peer online and offline events.
397
                case e := <-subscriptions.peerUpdates:
15✔
398
                        switch event := e.(type) {
15✔
399
                        // We have reestablished a connection with our peer,
400
                        // and should record an online event for any channels
401
                        // with that peer.
402
                        case peernotifier.PeerOnlineEvent:
10✔
403
                                c.peerEvent(event.PubKey, true)
10✔
404

405
                        // We have lost a connection with our peer, and should
406
                        // record an offline event for any channels with that
407
                        // peer.
408
                        case peernotifier.PeerOfflineEvent:
5✔
409
                                c.peerEvent(event.PubKey, false)
5✔
410
                        }
411

412
                // Serve all requests for channel lifetime.
413
                case req := <-c.chanInfoRequests:
5✔
414
                        var resp channelInfoResponse
5✔
415

5✔
416
                        resp.info, resp.err = c.getChanInfo(req)
5✔
417
                        req.responseChan <- resp
5✔
418

419
                // Serve all requests for information about our peer.
420
                case req := <-c.peerRequests:
3✔
421
                        var resp peerResponse
3✔
422

3✔
423
                        resp.flapCount, resp.ts, resp.err = c.flapCount(
3✔
424
                                req.peer,
3✔
425
                        )
3✔
426
                        req.responseChan <- resp
3✔
427

428
                case <-c.cfg.FlapCountTicker.Ticks():
2✔
429
                        if err := c.recordFlapCount(); err != nil {
2✔
430
                                log.Errorf("could not record flap "+
×
431
                                        "count: %v", err)
×
432
                        }
×
433

434
                // Exit if the store receives the signal to shutdown.
435
                case <-c.quit:
8✔
436
                        return
8✔
437
                }
438
        }
439
}
440

441
// ChannelInfo provides the set of information that the event store has recorded
442
// for a channel.
443
type ChannelInfo struct {
444
        // Lifetime is the total amount of time we have monitored the channel
445
        // for.
446
        Lifetime time.Duration
447

448
        // Uptime is the total amount of time that the channel peer has been
449
        // observed as online during the monitored lifespan.
450
        Uptime time.Duration
451
}
452

453
// GetChanInfo gets all the information we have on a channel in the event store.
454
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
455
        peer route.Vertex) (*ChannelInfo, error) {
5✔
456

5✔
457
        request := channelInfoRequest{
5✔
458
                peer:         peer,
5✔
459
                channelPoint: channelPoint,
5✔
460
                responseChan: make(chan channelInfoResponse),
5✔
461
        }
5✔
462

5✔
463
        // Send a request for the channel's information to the main event loop,
5✔
464
        // or return early with an error if the store has already received a
5✔
465
        // shutdown signal.
5✔
466
        select {
5✔
467
        case c.chanInfoRequests <- request:
5✔
468
        case <-c.quit:
×
469
                return nil, errShuttingDown
×
470
        }
471

472
        // Return the response we receive on the response channel or exit early
473
        // if the store is instructed to exit.
474
        select {
5✔
475
        case resp := <-request.responseChan:
5✔
476
                return resp.info, resp.err
5✔
477

478
        case <-c.quit:
×
479
                return nil, errShuttingDown
×
480
        }
481
}
482

483
// getChanInfo collects channel information for a channel. It gets uptime over
484
// the full lifetime of the channel.
485
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
486
        error) {
5✔
487

5✔
488
        peerMonitor, ok := c.peers[req.peer]
5✔
489
        if !ok {
5✔
490
                return nil, ErrPeerNotFound
×
491
        }
×
492

493
        lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
5✔
494
        if err != nil {
6✔
495
                return nil, err
1✔
496
        }
1✔
497

498
        return &ChannelInfo{
4✔
499
                Lifetime: lifetime,
4✔
500
                Uptime:   uptime,
4✔
501
        }, nil
4✔
502
}
503

504
// FlapCount returns the flap count we have for a peer and the timestamp of its
505
// last flap. If we do not have any flaps recorded for the peer, the last flap
506
// timestamp will be nil.
507
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
508
        error) {
3✔
509

3✔
510
        request := peerRequest{
3✔
511
                peer:         peer,
3✔
512
                responseChan: make(chan peerResponse),
3✔
513
        }
3✔
514

3✔
515
        // Send a request for the peer's information to the main event loop,
3✔
516
        // or return early with an error if the store has already received a
3✔
517
        // shutdown signal.
3✔
518
        select {
3✔
519
        case c.peerRequests <- request:
3✔
520
        case <-c.quit:
×
521
                return 0, nil, errShuttingDown
×
522
        }
523

524
        // Return the response we receive on the response channel or exit early
525
        // if the store is instructed to exit.
526
        select {
3✔
527
        case resp := <-request.responseChan:
3✔
528
                return resp.flapCount, resp.ts, resp.err
3✔
529

530
        case <-c.quit:
×
531
                return 0, nil, errShuttingDown
×
532
        }
533
}
534

535
// flapCount gets our peer flap count and last flap timestamp from our in memory
536
// record of a peer, falling back to on disk if we are not currently tracking
537
// the peer. If we have no flap count recorded for the peer, a nil last flap
538
// time will be returned.
539
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
540
        error) {
3✔
541

3✔
542
        // First check whether we are tracking this peer in memory, because this
3✔
543
        // record will have the most accurate flap count. We do not fail if we
3✔
544
        // can't find the peer in memory, because we may have previously
3✔
545
        // recorded its flap count on disk.
3✔
546
        peerMonitor, ok := c.peers[peer]
3✔
547
        if ok {
4✔
548
                count, ts := peerMonitor.getFlapCount()
1✔
549
                return count, ts, nil
1✔
550
        }
1✔
551

552
        // Try to get our flap count from the database. If this value is not
553
        // recorded, we return a nil last flap time to indicate that we have no
554
        // record of the peer's flap count.
555
        flapCount, err := c.cfg.ReadFlapCount(peer)
2✔
556
        switch err {
2✔
557
        case channeldb.ErrNoPeerBucket:
1✔
558
                return 0, nil, nil
1✔
559

560
        case nil:
1✔
561
                return int(flapCount.Count), &flapCount.LastFlap, nil
1✔
562

563
        default:
×
564
                return 0, nil, err
×
565
        }
566
}
567

568
// recordFlapCount will record our flap count for each peer that we are
569
// currently tracking, skipping peers that have a 0 flap count.
570
func (c *ChannelEventStore) recordFlapCount() error {
10✔
571
        updates := make(peerFlapCountMap)
10✔
572

10✔
573
        for peer, monitor := range c.peers {
20✔
574
                flapCount, lastFlap := monitor.getFlapCount()
10✔
575
                if lastFlap == nil {
10✔
576
                        continue
×
577
                }
578

579
                updates[peer] = &channeldb.FlapCount{
10✔
580
                        Count:    uint32(flapCount),
10✔
581
                        LastFlap: *lastFlap,
10✔
582
                }
10✔
583
        }
584

585
        log.Debugf("recording flap count for: %v peers", len(updates))
10✔
586

10✔
587
        return c.cfg.WriteFlapCount(updates)
10✔
588
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc