• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13567496470

27 Feb 2025 01:26PM UTC coverage: 58.757% (-0.1%) from 58.858%
13567496470

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

40 of 54 new or added lines in 4 files covered. (74.07%)

307 existing lines in 27 files now uncovered.

136396 of 232137 relevant lines covered (58.76%)

19208.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.05
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/lnutils"
13
        "golang.org/x/sync/errgroup"
14
)
15

16
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
17
// consumer to finish processing the new block epoch.
18
var DefaultProcessBlockTimeout = 60 * time.Second
19

20
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
21
// to process the block.
22
var ErrProcessBlockTimeout = errors.New("process block timeout")
23

24
// BlockbeatDispatcher is a service that handles dispatching new blocks to
25
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
26
// implement the `Consumer` interface and register themselves via
27
// `RegisterQueue`. When two subsystems are independent of each other, they
28
// should be registered in different queues so blocks are notified concurrently.
29
// Otherwise, when living in the same queue, the subsystems are notified of the
30
// new blocks sequentially, which means it's critical to understand the
31
// relationship of these systems to properly handle the order.
32
type BlockbeatDispatcher struct {
33
        wg sync.WaitGroup
34

35
        // notifier is used to receive new block epochs.
36
        notifier chainntnfs.ChainNotifier
37

38
        // beat is the latest blockbeat received.
39
        beat Blockbeat
40

41
        // consumerQueues is a map of consumers that will receive blocks. Its
42
        // key is a unique counter and its value is a queue of consumers. Each
43
        // queue is notified concurrently, and consumers in the same queue is
44
        // notified sequentially.
45
        consumerQueues map[uint32][]Consumer
46

47
        // counter is used to assign a unique id to each queue.
48
        counter atomic.Uint32
49

50
        // quit is used to signal the BlockbeatDispatcher to stop.
51
        quit chan struct{}
52

53
        // queryHeightChan is used to receive queries on the current height of
54
        // the dispatcher.
55
        queryHeightChan chan *query
56
}
57

58
// query is used to fetch the internal state of the dispatcher.
59
type query struct {
60
        // respChan is used to send back the current height back to the caller.
61
        //
62
        // NOTE: This channel must be buffered.
63
        respChan chan int32
64
}
65

66
// newQuery creates a query to be used to fetch the internal state of the
67
// dispatcher.
68
func newQuery() *query {
4✔
69
        return &query{
4✔
70
                respChan: make(chan int32, 1),
4✔
71
        }
4✔
72
}
4✔
73

74
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
75
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
8✔
76
        return &BlockbeatDispatcher{
8✔
77
                notifier:        n,
8✔
78
                quit:            make(chan struct{}),
8✔
79
                consumerQueues:  make(map[uint32][]Consumer),
8✔
80
                queryHeightChan: make(chan *query, 1),
8✔
81
        }
8✔
82
}
8✔
83

84
// RegisterQueue takes a list of consumers and registers them in the same
85
// queue.
86
//
87
// NOTE: these consumers are notified sequentially.
88
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
9✔
89
        qid := b.counter.Add(1)
9✔
90

9✔
91
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
9✔
92
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
9✔
93
                len(consumers))
9✔
94

9✔
95
        for _, c := range consumers {
19✔
96
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
10✔
97
                        qid)
10✔
98
        }
10✔
99
}
100

101
// Start starts the blockbeat dispatcher - it registers a block notification
102
// and monitors and dispatches new blocks in a goroutine. It will refuse to
103
// start if there are no registered consumers.
104
func (b *BlockbeatDispatcher) Start() error {
6✔
105
        // Make sure consumers are registered.
6✔
106
        if len(b.consumerQueues) == 0 {
7✔
107
                return fmt.Errorf("no consumers registered")
1✔
108
        }
1✔
109

110
        // Start listening to new block epochs. We should get a notification
111
        // with the current best block immediately.
112
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
5✔
113
        if err != nil {
6✔
114
                return fmt.Errorf("register block epoch ntfn: %w", err)
1✔
115
        }
1✔
116

117
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
4✔
118
                len(b.consumerQueues))
4✔
119
        defer clog.Debug("BlockbeatDispatcher started")
4✔
120

4✔
121
        b.wg.Add(1)
4✔
122
        go b.dispatchBlocks(blockEpochs)
4✔
123

4✔
124
        return nil
4✔
125
}
126

127
// Stop shuts down the blockbeat dispatcher.
128
func (b *BlockbeatDispatcher) Stop() {
4✔
129
        clog.Info("BlockbeatDispatcher is stopping")
4✔
130
        defer clog.Debug("BlockbeatDispatcher stopped")
4✔
131

4✔
132
        // Signal the dispatchBlocks goroutine to stop.
4✔
133
        close(b.quit)
4✔
134
        b.wg.Wait()
4✔
135
}
4✔
136

137
func (b *BlockbeatDispatcher) log() btclog.Logger {
12✔
138
        return b.beat.logger()
12✔
139
}
12✔
140

141
// dispatchBlocks listens to new block epoch and dispatches it to all the
142
// consumers. Each queue is notified concurrently, and the consumers in the
143
// same queue are notified sequentially.
144
//
145
// NOTE: Must be run as a goroutine.
146
func (b *BlockbeatDispatcher) dispatchBlocks(
147
        blockEpochs *chainntnfs.BlockEpochEvent) {
5✔
148

5✔
149
        defer b.wg.Done()
5✔
150
        defer blockEpochs.Cancel()
5✔
151

5✔
152
        for {
12✔
153
                select {
7✔
154
                case blockEpoch, ok := <-blockEpochs.Epochs:
3✔
155
                        if !ok {
3✔
156
                                clog.Debugf("Block epoch channel closed")
×
157

×
158
                                return
×
159
                        }
×
160

161
                        // Log a separator so it's easier to identify when a
162
                        // new block arrives for subsystems.
163
                        clog.Debugf("%v", lnutils.NewSeparatorClosure())
3✔
164

3✔
165
                        clog.Infof("Received new block %v at height %d, "+
3✔
166
                                "notifying consumers...", blockEpoch.Hash,
3✔
167
                                blockEpoch.Height)
3✔
168

3✔
169
                        // Record the time it takes the consumer to process
3✔
170
                        // this block.
3✔
171
                        start := time.Now()
3✔
172

3✔
173
                        // Update the current block epoch.
3✔
174
                        b.beat = NewBeat(*blockEpoch)
3✔
175

3✔
176
                        // Notify all consumers.
3✔
177
                        err := b.notifyQueues()
3✔
178
                        if err != nil {
3✔
179
                                b.log().Errorf("Notify block failed: %v", err)
×
180
                        }
×
181

182
                        b.log().Infof("Notified all consumers on new block "+
3✔
183
                                "in %v", time.Since(start))
3✔
184

185
                // A query has been made to fetch the current height, we now
186
                // send the height from its current beat.
187
                case query := <-b.queryHeightChan:
3✔
188
                        // The beat may not be set yet, e.g., during the startup
3✔
189
                        // the query is made before the block epoch being sent.
3✔
190
                        height := int32(0)
3✔
191
                        if b.beat != nil {
6✔
192
                                height = b.beat.Height()
3✔
193
                        }
3✔
194

195
                        query.respChan <- height
3✔
196

197
                case <-b.quit:
4✔
198
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
4✔
199
                                "received")
4✔
200

4✔
201
                        return
4✔
202
                }
203
        }
204
}
205

206
// CurrentHeight returns the current best height known to the dispatcher. 0 is
207
// returned if the dispatcher is shutting down.
208
func (b *BlockbeatDispatcher) CurrentHeight() int32 {
4✔
209
        query := newQuery()
4✔
210

4✔
211
        select {
4✔
212
        case b.queryHeightChan <- query:
3✔
213

214
        case <-b.quit:
1✔
215
                clog.Debugf("BlockbeatDispatcher quit before query")
1✔
216
                return 0
1✔
217
        }
218

219
        select {
3✔
220
        case height := <-query.respChan:
3✔
221
                clog.Debugf("Responded current height: %v", height)
3✔
222
                return height
3✔
223

UNCOV
224
        case <-b.quit:
×
UNCOV
225
                clog.Debugf("BlockbeatDispatcher quit before response")
×
UNCOV
226
                return 0
×
227
        }
228
}
229

230
// notifyQueues notifies each queue concurrently about the latest block epoch.
231
func (b *BlockbeatDispatcher) notifyQueues() error {
5✔
232
        // errChans is a map of channels that will be used to receive errors
5✔
233
        // returned from notifying the consumers.
5✔
234
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
5✔
235

5✔
236
        // Notify each queue in goroutines.
5✔
237
        for qid, consumers := range b.consumerQueues {
11✔
238
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
6✔
239
                        len(consumers))
6✔
240

6✔
241
                // Create a signal chan.
6✔
242
                errChan := make(chan error, 1)
6✔
243
                errChans[qid] = errChan
6✔
244

6✔
245
                // Notify each queue concurrently.
6✔
246
                go func(qid uint32, c []Consumer, beat Blockbeat) {
12✔
247
                        // Notify each consumer in this queue sequentially.
6✔
248
                        errChan <- DispatchSequential(beat, c)
6✔
249
                }(qid, consumers, b.beat)
6✔
250
        }
251

252
        // Wait for all consumers in each queue to finish.
253
        for qid, errChan := range errChans {
11✔
254
                select {
6✔
255
                case err := <-errChan:
6✔
256
                        if err != nil {
7✔
257
                                return fmt.Errorf("queue=%d got err: %w", qid,
1✔
258
                                        err)
1✔
259
                        }
1✔
260

261
                        b.log().Debugf("Notified queue=%d", qid)
5✔
262

263
                case <-b.quit:
×
264
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
×
265
                                "received, exit notifyQueues")
×
266

×
267
                        return nil
×
268
                }
269
        }
270

271
        return nil
4✔
272
}
273

274
// DispatchSequential takes a list of consumers and notify them about the new
275
// epoch sequentially. It requires the consumer to finish processing the block
276
// within the specified time, otherwise a timeout error is returned.
277
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
7✔
278
        for _, c := range consumers {
16✔
279
                // Send the beat to the consumer.
9✔
280
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
9✔
281
                if err != nil {
10✔
282
                        b.logger().Errorf("Failed to process block: %v", err)
1✔
283

1✔
284
                        return err
1✔
285
                }
1✔
286
        }
287

288
        return nil
6✔
289
}
290

291
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
292
// It requires the consumer to finish processing the block within the specified
293
// time, otherwise a timeout error is returned.
294
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
2✔
295
        eg := &errgroup.Group{}
2✔
296

2✔
297
        // Notify each queue in goroutines.
2✔
298
        for _, c := range consumers {
4✔
299
                // Notify each consumer concurrently.
2✔
300
                eg.Go(func() error {
4✔
301
                        // Send the beat to the consumer.
2✔
302
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
2✔
303

2✔
304
                        // Exit early if there's no error.
2✔
305
                        if err == nil {
4✔
306
                                return nil
2✔
307
                        }
2✔
308

309
                        b.logger().Errorf("Consumer=%v failed to process "+
×
310
                                "block: %v", c.Name(), err)
×
311

×
312
                        return err
×
313
                })
314
        }
315

316
        // Wait for all consumers in each queue to finish.
317
        if err := eg.Wait(); err != nil {
2✔
318
                return err
×
319
        }
×
320

321
        return nil
2✔
322
}
323

324
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
325
// consumer to finish processing the block within the specified time, otherwise
326
// a timeout error is returned.
327
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
12✔
328
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
12✔
329

12✔
330
        // Record the time it takes the consumer to process this block.
12✔
331
        start := time.Now()
12✔
332

12✔
333
        errChan := make(chan error, 1)
12✔
334
        go func() {
24✔
335
                errChan <- c.ProcessBlock(b)
12✔
336
        }()
12✔
337

338
        // We expect the consumer to finish processing this block under 30s,
339
        // otherwise a timeout error is returned.
340
        select {
12✔
341
        case err := <-errChan:
11✔
342
                if err == nil {
20✔
343
                        break
9✔
344
                }
345

346
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
2✔
347
                        err)
2✔
348

349
        case <-time.After(timeout):
1✔
350
                return fmt.Errorf("consumer %s: %w", c.Name(),
1✔
351
                        ErrProcessBlockTimeout)
1✔
352
        }
353

354
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
9✔
355
                time.Since(start))
9✔
356

9✔
357
        return nil
9✔
358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc