• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12412789487

19 Dec 2024 12:34PM UTC coverage: 57.535% (-1.1%) from 58.653%
12412789487

Pull #9315

github

yyforyongyu
contractcourt: include custom records on replayed htlc

Add another case in addition to #9357.
Pull Request #9315: Implement `blockbeat`

1744 of 2518 new or added lines in 30 files covered. (69.26%)

19172 existing lines in 248 files now uncovered.

102529 of 178204 relevant lines covered (57.53%)

24755.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.59
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "golang.org/x/sync/errgroup"
13
)
14

15
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
16
// consumer to finish processing the new block epoch.
17
var DefaultProcessBlockTimeout = 60 * time.Second
18

19
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
20
// to process the block.
21
var ErrProcessBlockTimeout = errors.New("process block timeout")
22

23
// BlockbeatDispatcher is a service that handles dispatching new blocks to
24
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
25
// implement the `Consumer` interface and register themselves via
26
// `RegisterQueue`. When two subsystems are independent of each other, they
27
// should be registered in different queues so blocks are notified concurrently.
28
// Otherwise, when living in the same queue, the subsystems are notified of the
29
// new blocks sequentially, which means it's critical to understand the
30
// relationship of these systems to properly handle the order.
31
type BlockbeatDispatcher struct {
32
        wg sync.WaitGroup
33

34
        // notifier is used to receive new block epochs.
35
        notifier chainntnfs.ChainNotifier
36

37
        // beat is the latest blockbeat received.
38
        beat Blockbeat
39

40
        // consumerQueues is a map of consumers that will receive blocks. Its
41
        // key is a unique counter and its value is a queue of consumers. Each
42
        // queue is notified concurrently, and consumers in the same queue is
43
        // notified sequentially.
44
        consumerQueues map[uint32][]Consumer
45

46
        // counter is used to assign a unique id to each queue.
47
        counter atomic.Uint32
48

49
        // quit is used to signal the BlockbeatDispatcher to stop.
50
        quit chan struct{}
51
}
52

53
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
54
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
5✔
55
        return &BlockbeatDispatcher{
5✔
56
                notifier:       n,
5✔
57
                quit:           make(chan struct{}),
5✔
58
                consumerQueues: make(map[uint32][]Consumer),
5✔
59
        }
5✔
60
}
5✔
61

62
// RegisterQueue takes a list of consumers and registers them in the same
63
// queue.
64
//
65
// NOTE: these consumers are notified sequentially.
66
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
6✔
67
        qid := b.counter.Add(1)
6✔
68

6✔
69
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
6✔
70
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
6✔
71
                len(consumers))
6✔
72

6✔
73
        for _, c := range consumers {
13✔
74
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
7✔
75
                        qid)
7✔
76
        }
7✔
77
}
78

79
// Start starts the blockbeat dispatcher - it registers a block notification
80
// and monitors and dispatches new blocks in a goroutine. It will refuse to
81
// start if there are no registered consumers.
82
func (b *BlockbeatDispatcher) Start() error {
3✔
83
        // Make sure consumers are registered.
3✔
84
        if len(b.consumerQueues) == 0 {
4✔
85
                return fmt.Errorf("no consumers registered")
1✔
86
        }
1✔
87

88
        // Start listening to new block epochs. We should get a notification
89
        // with the current best block immediately.
90
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
2✔
91
        if err != nil {
3✔
92
                return fmt.Errorf("register block epoch ntfn: %w", err)
1✔
93
        }
1✔
94

95
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
1✔
96
                len(b.consumerQueues))
1✔
97
        defer clog.Debug("BlockbeatDispatcher started")
1✔
98

1✔
99
        b.wg.Add(1)
1✔
100
        go b.dispatchBlocks(blockEpochs)
1✔
101

1✔
102
        return nil
1✔
103
}
104

105
// Stop shuts down the blockbeat dispatcher.
106
func (b *BlockbeatDispatcher) Stop() {
1✔
107
        clog.Info("BlockbeatDispatcher is stopping")
1✔
108
        defer clog.Debug("BlockbeatDispatcher stopped")
1✔
109

1✔
110
        // Signal the dispatchBlocks goroutine to stop.
1✔
111
        close(b.quit)
1✔
112
        b.wg.Wait()
1✔
113
}
1✔
114

115
func (b *BlockbeatDispatcher) log() btclog.Logger {
9✔
116
        return b.beat.logger()
9✔
117
}
9✔
118

119
// dispatchBlocks listens to new block epoch and dispatches it to all the
120
// consumers. Each queue is notified concurrently, and the consumers in the
121
// same queue are notified sequentially.
122
//
123
// NOTE: Must be run as a goroutine.
124
func (b *BlockbeatDispatcher) dispatchBlocks(
125
        blockEpochs *chainntnfs.BlockEpochEvent) {
2✔
126

2✔
127
        defer b.wg.Done()
2✔
128
        defer blockEpochs.Cancel()
2✔
129

2✔
130
        for {
5✔
131
                select {
3✔
132
                case blockEpoch, ok := <-blockEpochs.Epochs:
1✔
133
                        if !ok {
1✔
NEW
134
                                clog.Debugf("Block epoch channel closed")
×
NEW
135

×
NEW
136
                                return
×
NEW
137
                        }
×
138

139
                        clog.Infof("Received new block %v at height %d, "+
1✔
140
                                "notifying consumers...", blockEpoch.Hash,
1✔
141
                                blockEpoch.Height)
1✔
142

1✔
143
                        // Record the time it takes the consumer to process
1✔
144
                        // this block.
1✔
145
                        start := time.Now()
1✔
146

1✔
147
                        // Update the current block epoch.
1✔
148
                        b.beat = NewBeat(*blockEpoch)
1✔
149

1✔
150
                        // Notify all consumers.
1✔
151
                        err := b.notifyQueues()
1✔
152
                        if err != nil {
1✔
NEW
153
                                b.log().Errorf("Notify block failed: %v", err)
×
NEW
154
                        }
×
155

156
                        b.log().Infof("Notified all consumers on new block "+
1✔
157
                                "in %v", time.Since(start))
1✔
158

159
                case <-b.quit:
1✔
160
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
1✔
161
                                "received")
1✔
162

1✔
163
                        return
1✔
164
                }
165
        }
166
}
167

168
// notifyQueues notifies each queue concurrently about the latest block epoch.
169
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
170
        // errChans is a map of channels that will be used to receive errors
3✔
171
        // returned from notifying the consumers.
3✔
172
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
173

3✔
174
        // Notify each queue in goroutines.
3✔
175
        for qid, consumers := range b.consumerQueues {
7✔
176
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
4✔
177
                        len(consumers))
4✔
178

4✔
179
                // Create a signal chan.
4✔
180
                errChan := make(chan error, 1)
4✔
181
                errChans[qid] = errChan
4✔
182

4✔
183
                // Notify each queue concurrently.
4✔
184
                go func(qid uint32, c []Consumer, beat Blockbeat) {
8✔
185
                        // Notify each consumer in this queue sequentially.
4✔
186
                        errChan <- DispatchSequential(beat, c)
4✔
187
                }(qid, consumers, b.beat)
4✔
188
        }
189

190
        // Wait for all consumers in each queue to finish.
191
        for qid, errChan := range errChans {
7✔
192
                select {
4✔
193
                case err := <-errChan:
4✔
194
                        if err != nil {
5✔
195
                                return fmt.Errorf("queue=%d got err: %w", qid,
1✔
196
                                        err)
1✔
197
                        }
1✔
198

199
                        b.log().Debugf("Notified queue=%d", qid)
3✔
200

NEW
201
                case <-b.quit:
×
NEW
202
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
×
NEW
203
                                "received, exit notifyQueues")
×
NEW
204

×
NEW
205
                        return nil
×
206
                }
207
        }
208

209
        return nil
2✔
210
}
211

212
// DispatchSequential takes a list of consumers and notify them about the new
213
// epoch sequentially. It requires the consumer to finish processing the block
214
// within the specified time, otherwise a timeout error is returned.
215
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
5✔
216
        for _, c := range consumers {
12✔
217
                // Send the beat to the consumer.
7✔
218
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
7✔
219
                if err != nil {
8✔
220
                        b.logger().Errorf("Failed to process block: %v", err)
1✔
221

1✔
222
                        return err
1✔
223
                }
1✔
224
        }
225

226
        return nil
4✔
227
}
228

229
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
230
// It requires the consumer to finish processing the block within the specified
231
// time, otherwise a timeout error is returned.
NEW
232
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
×
NEW
233
        eg := &errgroup.Group{}
×
NEW
234

×
NEW
235
        // Notify each queue in goroutines.
×
NEW
236
        for _, c := range consumers {
×
NEW
237
                // Notify each consumer concurrently.
×
NEW
238
                eg.Go(func() error {
×
NEW
239
                        // Send the beat to the consumer.
×
NEW
240
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
×
NEW
241

×
NEW
242
                        // Exit early if there's no error.
×
NEW
243
                        if err == nil {
×
NEW
244
                                return nil
×
NEW
245
                        }
×
246

NEW
247
                        b.logger().Errorf("Consumer=%v failed to process "+
×
NEW
248
                                "block: %v", c.Name(), err)
×
NEW
249

×
NEW
250
                        return err
×
251
                })
252
        }
253

254
        // Wait for all consumers in each queue to finish.
NEW
255
        if err := eg.Wait(); err != nil {
×
NEW
256
                return err
×
NEW
257
        }
×
258

NEW
259
        return nil
×
260
}
261

262
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
263
// consumer to finish processing the block within the specified time, otherwise
264
// a timeout error is returned.
265
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
10✔
266
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
10✔
267

10✔
268
        // Record the time it takes the consumer to process this block.
10✔
269
        start := time.Now()
10✔
270

10✔
271
        errChan := make(chan error, 1)
10✔
272
        go func() {
20✔
273
                errChan <- c.ProcessBlock(b)
10✔
274
        }()
10✔
275

276
        // We expect the consumer to finish processing this block under 30s,
277
        // otherwise a timeout error is returned.
278
        select {
10✔
279
        case err := <-errChan:
9✔
280
                if err == nil {
16✔
281
                        break
7✔
282
                }
283

284
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
2✔
285
                        err)
2✔
286

287
        case <-time.After(timeout):
1✔
288
                return fmt.Errorf("consumer %s: %w", c.Name(),
1✔
289
                        ErrProcessBlockTimeout)
1✔
290
        }
291

292
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
7✔
293
                time.Since(start))
7✔
294

7✔
295
        return nil
7✔
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc