• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15951470896

29 Jun 2025 04:23AM UTC coverage: 67.594% (-0.01%) from 67.606%
15951470896

Pull #9751

github

web-flow
Merge 599d9b051 into 6290edf14
Pull Request #9751: multi: update Go to 1.23.10 and update some packages

135088 of 199851 relevant lines covered (67.59%)

21909.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.83
/chainio/consumer.go
1
package chainio
2

3
// BeatConsumer defines a supplementary component that should be used by
4
// subsystems which implement the `Consumer` interface. It partially implements
5
// the `Consumer` interface by providing the method `ProcessBlock` such that
6
// subsystems don't need to re-implement it.
7
//
8
// While inheritance is not commonly used in Go, subsystems embedding this
9
// struct cannot pass the interface check for `Consumer` because the `Name`
10
// method is not implemented, which gives us a "mortise and tenon" structure.
11
// In addition to reducing code duplication, this design allows `ProcessBlock`
12
// to work on the concrete type `Beat` to access its internal states.
13
type BeatConsumer struct {
14
        // BlockbeatChan is a channel to receive blocks from Blockbeat. The
15
        // received block contains the best known height and the txns confirmed
16
        // in this block.
17
        BlockbeatChan chan Blockbeat
18

19
        // name is the name of the consumer which embeds the BlockConsumer.
20
        name string
21

22
        // quit is a channel that closes when the BlockConsumer is shutting
23
        // down.
24
        //
25
        // NOTE: this quit channel should be mounted to the same quit channel
26
        // used by the subsystem.
27
        quit chan struct{}
28

29
        // errChan is a buffered chan that receives an error returned from
30
        // processing this block.
31
        errChan chan error
32
}
33

34
// NewBeatConsumer creates a new BlockConsumer.
35
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
129✔
36
        // Refuse to start `lnd` if the quit channel is not initialized. We
129✔
37
        // treat this case as if we are facing a nil pointer dereference, as
129✔
38
        // there's no point to return an error here, which will cause the node
129✔
39
        // to fail to be started anyway.
129✔
40
        if quit == nil {
129✔
41
                panic("quit channel is nil")
×
42
        }
43

44
        b := BeatConsumer{
129✔
45
                BlockbeatChan: make(chan Blockbeat),
129✔
46
                name:          name,
129✔
47
                errChan:       make(chan error, 1),
129✔
48
                quit:          quit,
129✔
49
        }
129✔
50

129✔
51
        return b
129✔
52
}
53

54
// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
55
// channel. It will send it to the subsystem's BlockbeatChan, and block until
56
// the processed result is received from the subsystem. The subsystem must call
57
// `NotifyBlockProcessed` after it has finished processing the block.
58
//
59
// NOTE: part of the `chainio.Consumer` interface.
60
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
6✔
61
        // Update the current height.
6✔
62
        beat.logger().Tracef("set current height for [%s]", b.name)
6✔
63

6✔
64
        select {
6✔
65
        // Send the beat to the blockbeat channel. It's expected that the
66
        // consumer will read from this channel and process the block. Once
67
        // processed, it should return the error or nil to the beat.Err chan.
68
        case b.BlockbeatChan <- beat:
5✔
69
                beat.logger().Tracef("Sent blockbeat to [%s]", b.name)
5✔
70

71
        case <-b.quit:
4✔
72
                beat.logger().Debugf("[%s] received shutdown before sending "+
4✔
73
                        "beat", b.name)
4✔
74

4✔
75
                return nil
4✔
76
        }
77

78
        // Check the consumer's err chan. We expect the consumer to call
79
        // `beat.NotifyBlockProcessed` to send the error back here.
80
        select {
5✔
81
        case err := <-b.errChan:
4✔
82
                beat.logger().Tracef("[%s] processed beat: err=%v", b.name, err)
4✔
83

4✔
84
                return err
4✔
85

86
        case <-b.quit:
1✔
87
                beat.logger().Debugf("[%s] received shutdown", b.name)
1✔
88
        }
89

90
        return nil
1✔
91
}
92

93
// NotifyBlockProcessed signals that the block has been processed. It takes the
94
// blockbeat being processed and an error resulted from processing it. This
95
// error is then sent back to the consumer's err chan to unblock
96
// `ProcessBlock`.
97
//
98
// NOTE: This method must be called by the subsystem after it has finished
99
// processing the block.
100
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
26✔
101
        // Update the current height.
26✔
102
        beat.logger().Tracef("[%s]: notifying beat processed", b.name)
26✔
103

26✔
104
        select {
26✔
105
        case b.errChan <- err:
21✔
106
                beat.logger().Tracef("[%s]: notified beat processed, err=%v",
21✔
107
                        b.name, err)
21✔
108

109
        case <-b.quit:
5✔
110
                beat.logger().Debugf("[%s] received shutdown before notifying "+
5✔
111
                        "beat processed", b.name)
5✔
112
        }
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc