• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/lnutils"
13
        "golang.org/x/sync/errgroup"
14
)
15

16
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
17
// consumer to finish processing the new block epoch.
18
var DefaultProcessBlockTimeout = 60 * time.Second
19

20
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
21
// to process the block.
22
var ErrProcessBlockTimeout = errors.New("process block timeout")
23

24
// BlockbeatDispatcher is a service that handles dispatching new blocks to
25
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
26
// implement the `Consumer` interface and register themselves via
27
// `RegisterQueue`. When two subsystems are independent of each other, they
28
// should be registered in different queues so blocks are notified concurrently.
29
// Otherwise, when living in the same queue, the subsystems are notified of the
30
// new blocks sequentially, which means it's critical to understand the
31
// relationship of these systems to properly handle the order.
32
type BlockbeatDispatcher struct {
33
        wg sync.WaitGroup
34

35
        // notifier is used to receive new block epochs.
36
        notifier chainntnfs.ChainNotifier
37

38
        // beat is the latest blockbeat received.
39
        beat Blockbeat
40

41
        // consumerQueues is a map of consumers that will receive blocks. Its
42
        // key is a unique counter and its value is a queue of consumers. Each
43
        // queue is notified concurrently, and consumers in the same queue is
44
        // notified sequentially.
45
        consumerQueues map[uint32][]Consumer
46

47
        // counter is used to assign a unique id to each queue.
48
        counter atomic.Uint32
49

50
        // quit is used to signal the BlockbeatDispatcher to stop.
51
        quit chan struct{}
52
}
53

54
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
55
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
5✔
56
        return &BlockbeatDispatcher{
5✔
57
                notifier:       n,
5✔
58
                quit:           make(chan struct{}),
5✔
59
                consumerQueues: make(map[uint32][]Consumer),
5✔
60
        }
5✔
61
}
5✔
62

63
// RegisterQueue takes a list of consumers and registers them in the same
64
// queue.
65
//
66
// NOTE: these consumers are notified sequentially.
67
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
6✔
68
        qid := b.counter.Add(1)
6✔
69

6✔
70
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
6✔
71
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
6✔
72
                len(consumers))
6✔
73

6✔
74
        for _, c := range consumers {
13✔
75
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
7✔
76
                        qid)
7✔
77
        }
7✔
78
}
79

80
// Start starts the blockbeat dispatcher - it registers a block notification
81
// and monitors and dispatches new blocks in a goroutine. It will refuse to
82
// start if there are no registered consumers.
83
func (b *BlockbeatDispatcher) Start() error {
3✔
84
        // Make sure consumers are registered.
3✔
85
        if len(b.consumerQueues) == 0 {
4✔
86
                return fmt.Errorf("no consumers registered")
1✔
87
        }
1✔
88

89
        // Start listening to new block epochs. We should get a notification
90
        // with the current best block immediately.
91
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
2✔
92
        if err != nil {
3✔
93
                return fmt.Errorf("register block epoch ntfn: %w", err)
1✔
94
        }
1✔
95

96
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
1✔
97
                len(b.consumerQueues))
1✔
98
        defer clog.Debug("BlockbeatDispatcher started")
1✔
99

1✔
100
        b.wg.Add(1)
1✔
101
        go b.dispatchBlocks(blockEpochs)
1✔
102

1✔
103
        return nil
1✔
104
}
105

106
// Stop shuts down the blockbeat dispatcher.
107
func (b *BlockbeatDispatcher) Stop() {
1✔
108
        clog.Info("BlockbeatDispatcher is stopping")
1✔
109
        defer clog.Debug("BlockbeatDispatcher stopped")
1✔
110

1✔
111
        // Signal the dispatchBlocks goroutine to stop.
1✔
112
        close(b.quit)
1✔
113
        b.wg.Wait()
1✔
114
}
1✔
115

116
func (b *BlockbeatDispatcher) log() btclog.Logger {
9✔
117
        return b.beat.logger()
9✔
118
}
9✔
119

120
// dispatchBlocks listens to new block epoch and dispatches it to all the
121
// consumers. Each queue is notified concurrently, and the consumers in the
122
// same queue are notified sequentially.
123
//
124
// NOTE: Must be run as a goroutine.
125
func (b *BlockbeatDispatcher) dispatchBlocks(
126
        blockEpochs *chainntnfs.BlockEpochEvent) {
2✔
127

2✔
128
        defer b.wg.Done()
2✔
129
        defer blockEpochs.Cancel()
2✔
130

2✔
131
        for {
5✔
132
                select {
3✔
133
                case blockEpoch, ok := <-blockEpochs.Epochs:
1✔
134
                        if !ok {
1✔
135
                                clog.Debugf("Block epoch channel closed")
×
136

×
137
                                return
×
138
                        }
×
139

140
                        // Log a separator so it's easier to identify when a
141
                        // new block arrives for subsystems.
142
                        clog.Debugf("%v", lnutils.NewSeparatorClosure())
1✔
143

1✔
144
                        clog.Infof("Received new block %v at height %d, "+
1✔
145
                                "notifying consumers...", blockEpoch.Hash,
1✔
146
                                blockEpoch.Height)
1✔
147

1✔
148
                        // Record the time it takes the consumer to process
1✔
149
                        // this block.
1✔
150
                        start := time.Now()
1✔
151

1✔
152
                        // Update the current block epoch.
1✔
153
                        b.beat = NewBeat(*blockEpoch)
1✔
154

1✔
155
                        // Notify all consumers.
1✔
156
                        err := b.notifyQueues()
1✔
157
                        if err != nil {
1✔
158
                                b.log().Errorf("Notify block failed: %v", err)
×
159
                        }
×
160

161
                        b.log().Infof("Notified all consumers on new block "+
1✔
162
                                "in %v", time.Since(start))
1✔
163

164
                case <-b.quit:
1✔
165
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
1✔
166
                                "received")
1✔
167

1✔
168
                        return
1✔
169
                }
170
        }
171
}
172

173
// notifyQueues notifies each queue concurrently about the latest block epoch.
174
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
175
        // errChans is a map of channels that will be used to receive errors
3✔
176
        // returned from notifying the consumers.
3✔
177
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
178

3✔
179
        // Notify each queue in goroutines.
3✔
180
        for qid, consumers := range b.consumerQueues {
7✔
181
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
4✔
182
                        len(consumers))
4✔
183

4✔
184
                // Create a signal chan.
4✔
185
                errChan := make(chan error, 1)
4✔
186
                errChans[qid] = errChan
4✔
187

4✔
188
                // Notify each queue concurrently.
4✔
189
                go func(qid uint32, c []Consumer, beat Blockbeat) {
8✔
190
                        // Notify each consumer in this queue sequentially.
4✔
191
                        errChan <- DispatchSequential(beat, c)
4✔
192
                }(qid, consumers, b.beat)
4✔
193
        }
194

195
        // Wait for all consumers in each queue to finish.
196
        for qid, errChan := range errChans {
7✔
197
                select {
4✔
198
                case err := <-errChan:
4✔
199
                        if err != nil {
5✔
200
                                return fmt.Errorf("queue=%d got err: %w", qid,
1✔
201
                                        err)
1✔
202
                        }
1✔
203

204
                        b.log().Debugf("Notified queue=%d", qid)
3✔
205

UNCOV
206
                case <-b.quit:
×
UNCOV
207
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
×
UNCOV
208
                                "received, exit notifyQueues")
×
UNCOV
209

×
UNCOV
210
                        return nil
×
211
                }
212
        }
213

214
        return nil
2✔
215
}
216

217
// DispatchSequential takes a list of consumers and notify them about the new
218
// epoch sequentially. It requires the consumer to finish processing the block
219
// within the specified time, otherwise a timeout error is returned.
220
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
5✔
221
        for _, c := range consumers {
12✔
222
                // Send the beat to the consumer.
7✔
223
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
7✔
224
                if err != nil {
8✔
225
                        b.logger().Errorf("Failed to process block: %v", err)
1✔
226

1✔
227
                        return err
1✔
228
                }
1✔
229
        }
230

231
        return nil
4✔
232
}
233

234
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
235
// It requires the consumer to finish processing the block within the specified
236
// time, otherwise a timeout error is returned.
UNCOV
237
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
×
UNCOV
238
        eg := &errgroup.Group{}
×
UNCOV
239

×
UNCOV
240
        // Notify each queue in goroutines.
×
UNCOV
241
        for _, c := range consumers {
×
UNCOV
242
                // Notify each consumer concurrently.
×
UNCOV
243
                eg.Go(func() error {
×
UNCOV
244
                        // Send the beat to the consumer.
×
UNCOV
245
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
×
UNCOV
246

×
UNCOV
247
                        // Exit early if there's no error.
×
UNCOV
248
                        if err == nil {
×
UNCOV
249
                                return nil
×
UNCOV
250
                        }
×
251

252
                        b.logger().Errorf("Consumer=%v failed to process "+
×
253
                                "block: %v", c.Name(), err)
×
254

×
255
                        return err
×
256
                })
257
        }
258

259
        // Wait for all consumers in each queue to finish.
UNCOV
260
        if err := eg.Wait(); err != nil {
×
261
                return err
×
262
        }
×
263

UNCOV
264
        return nil
×
265
}
266

267
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
268
// consumer to finish processing the block within the specified time, otherwise
269
// a timeout error is returned.
270
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
10✔
271
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
10✔
272

10✔
273
        // Record the time it takes the consumer to process this block.
10✔
274
        start := time.Now()
10✔
275

10✔
276
        errChan := make(chan error, 1)
10✔
277
        go func() {
20✔
278
                errChan <- c.ProcessBlock(b)
10✔
279
        }()
10✔
280

281
        // We expect the consumer to finish processing this block under 30s,
282
        // otherwise a timeout error is returned.
283
        select {
10✔
284
        case err := <-errChan:
9✔
285
                if err == nil {
16✔
286
                        break
7✔
287
                }
288

289
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
2✔
290
                        err)
2✔
291

292
        case <-time.After(timeout):
1✔
293
                return fmt.Errorf("consumer %s: %w", c.Name(),
1✔
294
                        ErrProcessBlockTimeout)
1✔
295
        }
296

297
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
7✔
298
                time.Since(start))
7✔
299

7✔
300
        return nil
7✔
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc