• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16911773184

12 Aug 2025 02:21PM UTC coverage: 57.471% (-9.4%) from 66.9%
16911773184

Pull #10103

github

web-flow
Merge d64a1234d into f3e1f2f35
Pull Request #10103: Rate limit outgoing gossip bandwidth by peer

57 of 77 new or added lines in 5 files covered. (74.03%)

28294 existing lines in 457 files now uncovered.

99110 of 172451 relevant lines covered (57.47%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.78
/chainio/consumer.go
1
package chainio
2

3
// BeatConsumer defines a supplementary component that should be used by
4
// subsystems which implement the `Consumer` interface. It partially implements
5
// the `Consumer` interface by providing the method `ProcessBlock` such that
6
// subsystems don't need to re-implement it.
7
//
8
// While inheritance is not commonly used in Go, subsystems embedding this
9
// struct cannot pass the interface check for `Consumer` because the `Name`
10
// method is not implemented, which gives us a "mortise and tenon" structure.
11
// In addition to reducing code duplication, this design allows `ProcessBlock`
12
// to work on the concrete type `Beat` to access its internal states.
13
type BeatConsumer struct {
14
        // BlockbeatChan is a channel to receive blocks from Blockbeat. The
15
        // received block contains the best known height and the txns confirmed
16
        // in this block.
17
        BlockbeatChan chan Blockbeat
18

19
        // name is the name of the consumer which embeds the BlockConsumer.
20
        name string
21

22
        // quit is a channel that closes when the BlockConsumer is shutting
23
        // down.
24
        //
25
        // NOTE: this quit channel should be mounted to the same quit channel
26
        // used by the subsystem.
27
        quit chan struct{}
28

29
        // errChan is a buffered chan that receives an error returned from
30
        // processing this block.
31
        errChan chan error
32
}
33

34
// NewBeatConsumer creates a new BlockConsumer.
35
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
3✔
36
        // Refuse to start `lnd` if the quit channel is not initialized. We
3✔
37
        // treat this case as if we are facing a nil pointer dereference, as
3✔
38
        // there's no point to return an error here, which will cause the node
3✔
39
        // to fail to be started anyway.
3✔
40
        if quit == nil {
3✔
41
                panic("quit channel is nil")
×
42
        }
43

44
        b := BeatConsumer{
3✔
45
                BlockbeatChan: make(chan Blockbeat),
3✔
46
                name:          name,
3✔
47
                errChan:       make(chan error, 1),
3✔
48
                quit:          quit,
3✔
49
        }
3✔
50

3✔
51
        return b
3✔
52
}
53

54
// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
55
// channel. It will send it to the subsystem's BlockbeatChan, and block until
56
// the processed result is received from the subsystem. The subsystem must call
57
// `NotifyBlockProcessed` after it has finished processing the block.
58
//
59
// NOTE: part of the `chainio.Consumer` interface.
60
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
3✔
61
        // Update the current height.
3✔
62
        beat.logger().Tracef("set current height for [%s]", b.name)
3✔
63

3✔
64
        select {
3✔
65
        // Send the beat to the blockbeat channel. It's expected that the
66
        // consumer will read from this channel and process the block. Once
67
        // processed, it should return the error or nil to the beat.Err chan.
68
        case b.BlockbeatChan <- beat:
3✔
69
                beat.logger().Tracef("Sent blockbeat to [%s]", b.name)
3✔
70

71
        case <-b.quit:
3✔
72
                beat.logger().Debugf("[%s] received shutdown before sending "+
3✔
73
                        "beat", b.name)
3✔
74

3✔
75
                return nil
3✔
76
        }
77

78
        // Check the consumer's err chan. We expect the consumer to call
79
        // `beat.NotifyBlockProcessed` to send the error back here.
80
        select {
3✔
81
        case err := <-b.errChan:
3✔
82
                beat.logger().Tracef("[%s] processed beat: err=%v", b.name, err)
3✔
83

3✔
84
                return err
3✔
85

UNCOV
86
        case <-b.quit:
×
UNCOV
87
                beat.logger().Debugf("[%s] received shutdown", b.name)
×
88
        }
89

UNCOV
90
        return nil
×
91
}
92

93
// NotifyBlockProcessed signals that the block has been processed. It takes the
94
// blockbeat being processed and an error resulted from processing it. This
95
// error is then sent back to the consumer's err chan to unblock
96
// `ProcessBlock`.
97
//
98
// NOTE: This method must be called by the subsystem after it has finished
99
// processing the block.
100
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
3✔
101
        // Update the current height.
3✔
102
        beat.logger().Tracef("[%s]: notifying beat processed", b.name)
3✔
103

3✔
104
        select {
3✔
105
        case b.errChan <- err:
3✔
106
                beat.logger().Tracef("[%s]: notified beat processed, err=%v",
3✔
107
                        b.name, err)
3✔
108

UNCOV
109
        case <-b.quit:
×
UNCOV
110
                beat.logger().Debugf("[%s] received shutdown before notifying "+
×
UNCOV
111
                        "beat processed", b.name)
×
112
        }
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc