• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18016273007

25 Sep 2025 05:55PM UTC coverage: 54.653% (-12.0%) from 66.622%
18016273007

Pull #10248

github

web-flow
Merge 128443298 into b09b20c69
Pull Request #10248: Enforce TLV when creating a Route

25 of 30 new or added lines in 4 files covered. (83.33%)

23906 existing lines in 281 files now uncovered.

109536 of 200421 relevant lines covered (54.65%)

21816.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.64
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/lnutils"
13
        "golang.org/x/sync/errgroup"
14
)
15

16
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
17
// consumer to finish processing the new block epoch.
18
var DefaultProcessBlockTimeout = 60 * time.Second
19

20
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
21
// to process the block.
22
var ErrProcessBlockTimeout = errors.New("process block timeout")
23

24
// BlockbeatDispatcher is a service that handles dispatching new blocks to
25
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
26
// implement the `Consumer` interface and register themselves via
27
// `RegisterQueue`. When two subsystems are independent of each other, they
28
// should be registered in different queues so blocks are notified concurrently.
29
// Otherwise, when living in the same queue, the subsystems are notified of the
30
// new blocks sequentially, which means it's critical to understand the
31
// relationship of these systems to properly handle the order.
32
type BlockbeatDispatcher struct {
33
        wg sync.WaitGroup
34

35
        // notifier is used to receive new block epochs.
36
        notifier chainntnfs.ChainNotifier
37

38
        // beat is the latest blockbeat received.
39
        beat Blockbeat
40

41
        // consumerQueues is a map of consumers that will receive blocks. Its
42
        // key is a unique counter and its value is a queue of consumers. Each
43
        // queue is notified concurrently, and consumers in the same queue is
44
        // notified sequentially.
45
        consumerQueues map[uint32][]Consumer
46

47
        // counter is used to assign a unique id to each queue.
48
        counter atomic.Uint32
49

50
        // quit is used to signal the BlockbeatDispatcher to stop.
51
        quit chan struct{}
52

53
        // queryHeightChan is used to receive queries on the current height of
54
        // the dispatcher.
55
        queryHeightChan chan *query
56
}
57

58
// query is used to fetch the internal state of the dispatcher.
59
type query struct {
60
        // respChan is used to send back the current height back to the caller.
61
        //
62
        // NOTE: This channel must be buffered.
63
        respChan chan int32
64
}
65

66
// newQuery creates a query to be used to fetch the internal state of the
67
// dispatcher.
68
func newQuery() *query {
2✔
69
        return &query{
2✔
70
                respChan: make(chan int32, 1),
2✔
71
        }
2✔
72
}
2✔
73

74
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
75
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
6✔
76
        return &BlockbeatDispatcher{
6✔
77
                notifier:        n,
6✔
78
                quit:            make(chan struct{}),
6✔
79
                consumerQueues:  make(map[uint32][]Consumer),
6✔
80
                queryHeightChan: make(chan *query, 1),
6✔
81
        }
6✔
82
}
6✔
83

84
// RegisterQueue takes a list of consumers and registers them in the same
85
// queue.
86
//
87
// NOTE: these consumers are notified sequentially.
88
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
7✔
89
        qid := b.counter.Add(1)
7✔
90

7✔
91
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
7✔
92
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
7✔
93
                len(consumers))
7✔
94

7✔
95
        for _, c := range consumers {
15✔
96
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
8✔
97
                        qid)
8✔
98
        }
8✔
99
}
100

101
// Start starts the blockbeat dispatcher - it registers a block notification
102
// and monitors and dispatches new blocks in a goroutine. It will refuse to
103
// start if there are no registered consumers.
104
func (b *BlockbeatDispatcher) Start() error {
4✔
105
        // Make sure consumers are registered.
4✔
106
        if len(b.consumerQueues) == 0 {
5✔
107
                return fmt.Errorf("no consumers registered")
1✔
108
        }
1✔
109

110
        // Start listening to new block epochs. We should get a notification
111
        // with the current best block immediately.
112
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
3✔
113
        if err != nil {
4✔
114
                return fmt.Errorf("register block epoch ntfn: %w", err)
1✔
115
        }
1✔
116

117
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
2✔
118
                len(b.consumerQueues))
2✔
119
        defer clog.Debug("BlockbeatDispatcher started")
2✔
120

2✔
121
        b.wg.Add(1)
2✔
122
        go b.dispatchBlocks(blockEpochs)
2✔
123

2✔
124
        return nil
2✔
125
}
126

127
// Stop shuts down the blockbeat dispatcher.
128
func (b *BlockbeatDispatcher) Stop() {
2✔
129
        clog.Info("BlockbeatDispatcher is stopping")
2✔
130
        defer clog.Debug("BlockbeatDispatcher stopped")
2✔
131

2✔
132
        // Signal the dispatchBlocks goroutine to stop.
2✔
133
        close(b.quit)
2✔
134
        b.wg.Wait()
2✔
135
}
2✔
136

137
func (b *BlockbeatDispatcher) log() btclog.Logger {
10✔
138
        // There's no guarantee that the `b.beat` is initialized when the
10✔
139
        // dispatcher shuts down, especially in the case where the node is
10✔
140
        // running as a remote signer, which doesn't have a chainbackend. In
10✔
141
        // that case we will use the package logger.
10✔
142
        if b.beat == nil {
10✔
143
                return clog
×
144
        }
×
145

146
        return b.beat.logger()
10✔
147
}
148

149
// dispatchBlocks listens to new block epoch and dispatches it to all the
150
// consumers. Each queue is notified concurrently, and the consumers in the
151
// same queue are notified sequentially.
152
//
153
// NOTE: Must be run as a goroutine.
154
func (b *BlockbeatDispatcher) dispatchBlocks(
155
        blockEpochs *chainntnfs.BlockEpochEvent) {
3✔
156

3✔
157
        defer b.wg.Done()
3✔
158
        defer blockEpochs.Cancel()
3✔
159

3✔
160
        for {
8✔
161
                select {
5✔
162
                case blockEpoch, ok := <-blockEpochs.Epochs:
1✔
163
                        if !ok {
1✔
164
                                clog.Debugf("Block epoch channel closed")
×
165

×
166
                                return
×
167
                        }
×
168

169
                        // Log a separator so it's easier to identify when a
170
                        // new block arrives for subsystems.
171
                        clog.Debugf("%v", lnutils.NewSeparatorClosure())
1✔
172

1✔
173
                        clog.Debugf("Received new block %v at height %d, "+
1✔
174
                                "notifying consumers...", blockEpoch.Hash,
1✔
175
                                blockEpoch.Height)
1✔
176

1✔
177
                        // Record the time it takes the consumer to process
1✔
178
                        // this block.
1✔
179
                        start := time.Now()
1✔
180

1✔
181
                        // Update the current block epoch.
1✔
182
                        b.beat = NewBeat(*blockEpoch)
1✔
183

1✔
184
                        // Notify all consumers.
1✔
185
                        err := b.notifyQueues()
1✔
186
                        if err != nil {
1✔
187
                                b.log().Errorf("Notify block failed: %v", err)
×
188
                        }
×
189

190
                        b.log().Debugf("Notified all consumers on new block "+
1✔
191
                                "in %v", time.Since(start))
1✔
192

193
                // A query has been made to fetch the current height, we now
194
                // send the height from its current beat.
195
                case query := <-b.queryHeightChan:
1✔
196
                        // The beat may not be set yet, e.g., during the startup
1✔
197
                        // the query is made before the block epoch being sent.
1✔
198
                        height := int32(0)
1✔
199
                        if b.beat != nil {
2✔
200
                                height = b.beat.Height()
1✔
201
                        }
1✔
202

203
                        query.respChan <- height
1✔
204

205
                case <-b.quit:
2✔
206
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
2✔
207
                                "received")
2✔
208

2✔
209
                        return
2✔
210
                }
211
        }
212
}
213

214
// CurrentHeight returns the current best height known to the dispatcher. 0 is
215
// returned if the dispatcher is shutting down.
216
func (b *BlockbeatDispatcher) CurrentHeight() int32 {
2✔
217
        query := newQuery()
2✔
218

2✔
219
        select {
2✔
220
        case b.queryHeightChan <- query:
1✔
221

222
        case <-b.quit:
1✔
223
                clog.Debugf("BlockbeatDispatcher quit before query")
1✔
224
                return 0
1✔
225
        }
226

227
        select {
1✔
228
        case height := <-query.respChan:
1✔
229
                clog.Debugf("Responded current height: %v", height)
1✔
230
                return height
1✔
231

UNCOV
232
        case <-b.quit:
×
UNCOV
233
                clog.Debugf("BlockbeatDispatcher quit before response")
×
UNCOV
234
                return 0
×
235
        }
236
}
237

238
// notifyQueues notifies each queue concurrently about the latest block epoch.
239
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
240
        // errChans is a map of channels that will be used to receive errors
3✔
241
        // returned from notifying the consumers.
3✔
242
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
243

3✔
244
        // Notify each queue in goroutines.
3✔
245
        for qid, consumers := range b.consumerQueues {
7✔
246
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
4✔
247
                        len(consumers))
4✔
248

4✔
249
                // Create a signal chan.
4✔
250
                errChan := make(chan error, 1)
4✔
251
                errChans[qid] = errChan
4✔
252

4✔
253
                // Notify each queue concurrently.
4✔
254
                go func(qid uint32, c []Consumer, beat Blockbeat) {
8✔
255
                        // Notify each consumer in this queue sequentially.
4✔
256
                        errChan <- DispatchSequential(beat, c)
4✔
257
                }(qid, consumers, b.beat)
4✔
258
        }
259

260
        // Wait for all consumers in each queue to finish.
261
        for qid, errChan := range errChans {
7✔
262
                select {
4✔
263
                case err := <-errChan:
4✔
264
                        if err != nil {
5✔
265
                                return fmt.Errorf("queue=%d got err: %w", qid,
1✔
266
                                        err)
1✔
267
                        }
1✔
268

269
                        b.log().Debugf("Notified queue=%d", qid)
3✔
270

271
                case <-b.quit:
×
272
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
×
273
                                "received, exit notifyQueues")
×
274

×
275
                        return nil
×
276
                }
277
        }
278

279
        return nil
2✔
280
}
281

282
// DispatchSequential takes a list of consumers and notify them about the new
283
// epoch sequentially. It requires the consumer to finish processing the block
284
// within the specified time, otherwise a timeout error is returned.
285
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
5✔
286
        for _, c := range consumers {
12✔
287
                // Send the beat to the consumer.
7✔
288
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
7✔
289
                if err != nil {
8✔
290
                        b.logger().Errorf("Failed to process block: %v", err)
1✔
291

1✔
292
                        return err
1✔
293
                }
1✔
294
        }
295

296
        return nil
4✔
297
}
298

299
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
300
// It requires the consumer to finish processing the block within the specified
301
// time, otherwise a timeout error is returned.
UNCOV
302
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
×
UNCOV
303
        eg := &errgroup.Group{}
×
UNCOV
304

×
UNCOV
305
        // Notify each queue in goroutines.
×
UNCOV
306
        for _, c := range consumers {
×
UNCOV
307
                // Notify each consumer concurrently.
×
UNCOV
308
                eg.Go(func() error {
×
UNCOV
309
                        // Send the beat to the consumer.
×
UNCOV
310
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
×
UNCOV
311

×
UNCOV
312
                        // Exit early if there's no error.
×
UNCOV
313
                        if err == nil {
×
UNCOV
314
                                return nil
×
UNCOV
315
                        }
×
316

317
                        b.logger().Errorf("Consumer=%v failed to process "+
×
318
                                "block: %v", c.Name(), err)
×
319

×
320
                        return err
×
321
                })
322
        }
323

324
        // Wait for all consumers in each queue to finish.
UNCOV
325
        if err := eg.Wait(); err != nil {
×
326
                return err
×
327
        }
×
328

UNCOV
329
        return nil
×
330
}
331

332
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
333
// consumer to finish processing the block within the specified time, otherwise
334
// a timeout error is returned.
335
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
10✔
336
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
10✔
337

10✔
338
        // Record the time it takes the consumer to process this block.
10✔
339
        start := time.Now()
10✔
340

10✔
341
        errChan := make(chan error, 1)
10✔
342
        go func() {
20✔
343
                errChan <- c.ProcessBlock(b)
10✔
344
        }()
10✔
345

346
        // We expect the consumer to finish processing this block under 30s,
347
        // otherwise a timeout error is returned.
348
        select {
10✔
349
        case err := <-errChan:
9✔
350
                if err == nil {
16✔
351
                        break
7✔
352
                }
353

354
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
2✔
355
                        err)
2✔
356

357
        case <-time.After(timeout):
1✔
358
                return fmt.Errorf("consumer %s: %w", c.Name(),
1✔
359
                        ErrProcessBlockTimeout)
1✔
360
        }
361

362
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
7✔
363
                time.Since(start))
7✔
364

7✔
365
        return nil
7✔
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc