• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.72
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/lnutils"
13
        "golang.org/x/sync/errgroup"
14
)
15

16
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
17
// consumer to finish processing the new block epoch.
18
var DefaultProcessBlockTimeout = 60 * time.Second
19

20
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
21
// to process the block.
22
var ErrProcessBlockTimeout = errors.New("process block timeout")
23

24
// BlockbeatDispatcher is a service that handles dispatching new blocks to
25
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
26
// implement the `Consumer` interface and register themselves via
27
// `RegisterQueue`. When two subsystems are independent of each other, they
28
// should be registered in different queues so blocks are notified concurrently.
29
// Otherwise, when living in the same queue, the subsystems are notified of the
30
// new blocks sequentially, which means it's critical to understand the
31
// relationship of these systems to properly handle the order.
32
type BlockbeatDispatcher struct {
33
        wg sync.WaitGroup
34

35
        // notifier is used to receive new block epochs.
36
        notifier chainntnfs.ChainNotifier
37

38
        // beat is the latest blockbeat received.
39
        beat Blockbeat
40

41
        // consumerQueues is a map of consumers that will receive blocks. Its
42
        // key is a unique counter and its value is a queue of consumers. Each
43
        // queue is notified concurrently, and consumers in the same queue is
44
        // notified sequentially.
45
        consumerQueues map[uint32][]Consumer
46

47
        // counter is used to assign a unique id to each queue.
48
        counter atomic.Uint32
49

50
        // quit is used to signal the BlockbeatDispatcher to stop.
51
        quit chan struct{}
52
}
53

54
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
55
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
3✔
56
        return &BlockbeatDispatcher{
3✔
57
                notifier:       n,
3✔
58
                quit:           make(chan struct{}),
3✔
59
                consumerQueues: make(map[uint32][]Consumer),
3✔
60
        }
3✔
61
}
3✔
62

63
// RegisterQueue takes a list of consumers and registers them in the same
64
// queue.
65
//
66
// NOTE: these consumers are notified sequentially.
67
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
3✔
68
        qid := b.counter.Add(1)
3✔
69

3✔
70
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
3✔
71
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
3✔
72
                len(consumers))
3✔
73

3✔
74
        for _, c := range consumers {
6✔
75
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
3✔
76
                        qid)
3✔
77
        }
3✔
78
}
79

80
// Start starts the blockbeat dispatcher - it registers a block notification
81
// and monitors and dispatches new blocks in a goroutine. It will refuse to
82
// start if there are no registered consumers.
83
func (b *BlockbeatDispatcher) Start() error {
3✔
84
        // Make sure consumers are registered.
3✔
85
        if len(b.consumerQueues) == 0 {
3✔
86
                return fmt.Errorf("no consumers registered")
×
87
        }
×
88

89
        // Start listening to new block epochs. We should get a notification
90
        // with the current best block immediately.
91
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
3✔
92
        if err != nil {
3✔
93
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
94
        }
×
95

96
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
3✔
97
                len(b.consumerQueues))
3✔
98
        defer clog.Debug("BlockbeatDispatcher started")
3✔
99

3✔
100
        b.wg.Add(1)
3✔
101
        go b.dispatchBlocks(blockEpochs)
3✔
102

3✔
103
        return nil
3✔
104
}
105

106
// Stop shuts down the blockbeat dispatcher.
107
func (b *BlockbeatDispatcher) Stop() {
3✔
108
        clog.Info("BlockbeatDispatcher is stopping")
3✔
109
        defer clog.Debug("BlockbeatDispatcher stopped")
3✔
110

3✔
111
        // Signal the dispatchBlocks goroutine to stop.
3✔
112
        close(b.quit)
3✔
113
        b.wg.Wait()
3✔
114
}
3✔
115

116
func (b *BlockbeatDispatcher) log() btclog.Logger {
3✔
117
        return b.beat.logger()
3✔
118
}
3✔
119

120
// dispatchBlocks listens to new block epoch and dispatches it to all the
121
// consumers. Each queue is notified concurrently, and the consumers in the
122
// same queue are notified sequentially.
123
//
124
// NOTE: Must be run as a goroutine.
125
func (b *BlockbeatDispatcher) dispatchBlocks(
126
        blockEpochs *chainntnfs.BlockEpochEvent) {
3✔
127

3✔
128
        defer b.wg.Done()
3✔
129
        defer blockEpochs.Cancel()
3✔
130

3✔
131
        for {
6✔
132
                select {
3✔
133
                case blockEpoch, ok := <-blockEpochs.Epochs:
3✔
134
                        if !ok {
3✔
135
                                clog.Debugf("Block epoch channel closed")
×
136

×
137
                                return
×
138
                        }
×
139

140
                        // Log a separator so it's easier to identify when a
141
                        // new block arrives for subsystems.
142
                        clog.Debugf("%v", lnutils.NewSeparatorClosure())
3✔
143

3✔
144
                        clog.Infof("Received new block %v at height %d, "+
3✔
145
                                "notifying consumers...", blockEpoch.Hash,
3✔
146
                                blockEpoch.Height)
3✔
147

3✔
148
                        // Record the time it takes the consumer to process
3✔
149
                        // this block.
3✔
150
                        start := time.Now()
3✔
151

3✔
152
                        // Update the current block epoch.
3✔
153
                        b.beat = NewBeat(*blockEpoch)
3✔
154

3✔
155
                        // Notify all consumers.
3✔
156
                        err := b.notifyQueues()
3✔
157
                        if err != nil {
3✔
158
                                b.log().Errorf("Notify block failed: %v", err)
×
159
                        }
×
160

161
                        b.log().Infof("Notified all consumers on new block "+
3✔
162
                                "in %v", time.Since(start))
3✔
163

164
                case <-b.quit:
3✔
165
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
3✔
166
                                "received")
3✔
167

3✔
168
                        return
3✔
169
                }
170
        }
171
}
172

173
// notifyQueues notifies each queue concurrently about the latest block epoch.
174
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
175
        // errChans is a map of channels that will be used to receive errors
3✔
176
        // returned from notifying the consumers.
3✔
177
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
178

3✔
179
        // Notify each queue in goroutines.
3✔
180
        for qid, consumers := range b.consumerQueues {
6✔
181
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
3✔
182
                        len(consumers))
3✔
183

3✔
184
                // Create a signal chan.
3✔
185
                errChan := make(chan error, 1)
3✔
186
                errChans[qid] = errChan
3✔
187

3✔
188
                // Notify each queue concurrently.
3✔
189
                go func(qid uint32, c []Consumer, beat Blockbeat) {
6✔
190
                        // Notify each consumer in this queue sequentially.
3✔
191
                        errChan <- DispatchSequential(beat, c)
3✔
192
                }(qid, consumers, b.beat)
3✔
193
        }
194

195
        // Wait for all consumers in each queue to finish.
196
        for qid, errChan := range errChans {
6✔
197
                select {
3✔
198
                case err := <-errChan:
3✔
199
                        if err != nil {
3✔
200
                                return fmt.Errorf("queue=%d got err: %w", qid,
×
201
                                        err)
×
202
                        }
×
203

204
                        b.log().Debugf("Notified queue=%d", qid)
3✔
205

206
                case <-b.quit:
2✔
207
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
2✔
208
                                "received, exit notifyQueues")
2✔
209

2✔
210
                        return nil
2✔
211
                }
212
        }
213

214
        return nil
3✔
215
}
216

217
// DispatchSequential takes a list of consumers and notify them about the new
218
// epoch sequentially. It requires the consumer to finish processing the block
219
// within the specified time, otherwise a timeout error is returned.
220
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
3✔
221
        for _, c := range consumers {
6✔
222
                // Send the beat to the consumer.
3✔
223
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
3✔
224
                if err != nil {
3✔
225
                        b.logger().Errorf("Failed to process block: %v", err)
×
226

×
227
                        return err
×
228
                }
×
229
        }
230

231
        return nil
3✔
232
}
233

234
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
235
// It requires the consumer to finish processing the block within the specified
236
// time, otherwise a timeout error is returned.
237
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
3✔
238
        eg := &errgroup.Group{}
3✔
239

3✔
240
        // Notify each queue in goroutines.
3✔
241
        for _, c := range consumers {
6✔
242
                // Notify each consumer concurrently.
3✔
243
                eg.Go(func() error {
6✔
244
                        // Send the beat to the consumer.
3✔
245
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
3✔
246

3✔
247
                        // Exit early if there's no error.
3✔
248
                        if err == nil {
6✔
249
                                return nil
3✔
250
                        }
3✔
251

252
                        b.logger().Errorf("Consumer=%v failed to process "+
×
253
                                "block: %v", c.Name(), err)
×
254

×
255
                        return err
×
256
                })
257
        }
258

259
        // Wait for all consumers in each queue to finish.
260
        if err := eg.Wait(); err != nil {
3✔
261
                return err
×
262
        }
×
263

264
        return nil
3✔
265
}
266

267
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
268
// consumer to finish processing the block within the specified time, otherwise
269
// a timeout error is returned.
270
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
3✔
271
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
3✔
272

3✔
273
        // Record the time it takes the consumer to process this block.
3✔
274
        start := time.Now()
3✔
275

3✔
276
        errChan := make(chan error, 1)
3✔
277
        go func() {
6✔
278
                errChan <- c.ProcessBlock(b)
3✔
279
        }()
3✔
280

281
        // We expect the consumer to finish processing this block under 30s,
282
        // otherwise a timeout error is returned.
283
        select {
3✔
284
        case err := <-errChan:
3✔
285
                if err == nil {
6✔
286
                        break
3✔
287
                }
288

289
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
×
290
                        err)
×
291

292
        case <-time.After(timeout):
×
293
                return fmt.Errorf("consumer %s: %w", c.Name(),
×
294
                        ErrProcessBlockTimeout)
×
295
        }
296

297
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
3✔
298
                time.Since(start))
3✔
299

3✔
300
        return nil
3✔
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc