• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.72
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/lnutils"
13
        "golang.org/x/sync/errgroup"
14
)
15

16
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
17
// consumer to finish processing the new block epoch.
18
var DefaultProcessBlockTimeout = 60 * time.Second
19

20
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
21
// to process the block.
22
var ErrProcessBlockTimeout = errors.New("process block timeout")
23

24
// BlockbeatDispatcher is a service that handles dispatching new blocks to
25
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
26
// implement the `Consumer` interface and register themselves via
27
// `RegisterQueue`. When two subsystems are independent of each other, they
28
// should be registered in different queues so blocks are notified concurrently.
29
// Otherwise, when living in the same queue, the subsystems are notified of the
30
// new blocks sequentially, which means it's critical to understand the
31
// relationship of these systems to properly handle the order.
32
type BlockbeatDispatcher struct {
33
        wg sync.WaitGroup
34

35
        // notifier is used to receive new block epochs.
36
        notifier chainntnfs.ChainNotifier
37

38
        // beat is the latest blockbeat received.
39
        beat Blockbeat
40

41
        // consumerQueues is a map of consumers that will receive blocks. Its
42
        // key is a unique counter and its value is a queue of consumers. Each
43
        // queue is notified concurrently, and consumers in the same queue is
44
        // notified sequentially.
45
        consumerQueues map[uint32][]Consumer
46

47
        // counter is used to assign a unique id to each queue.
48
        counter atomic.Uint32
49

50
        // quit is used to signal the BlockbeatDispatcher to stop.
51
        quit chan struct{}
52
}
53

54
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
55
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
3✔
56
        return &BlockbeatDispatcher{
3✔
57
                notifier:       n,
3✔
58
                quit:           make(chan struct{}),
3✔
59
                consumerQueues: make(map[uint32][]Consumer),
3✔
60
        }
3✔
61
}
3✔
62

63
// RegisterQueue takes a list of consumers and registers them in the same
64
// queue.
65
//
66
// NOTE: these consumers are notified sequentially.
67
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
3✔
68
        qid := b.counter.Add(1)
3✔
69

3✔
70
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
3✔
71
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
3✔
72
                len(consumers))
3✔
73

3✔
74
        for _, c := range consumers {
6✔
75
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
3✔
76
                        qid)
3✔
77
        }
3✔
78
}
79

80
// Start starts the blockbeat dispatcher - it registers a block notification
81
// and monitors and dispatches new blocks in a goroutine. It will refuse to
82
// start if there are no registered consumers.
83
func (b *BlockbeatDispatcher) Start() error {
3✔
84
        // Make sure consumers are registered.
3✔
85
        if len(b.consumerQueues) == 0 {
3✔
UNCOV
86
                return fmt.Errorf("no consumers registered")
×
UNCOV
87
        }
×
88

89
        // Start listening to new block epochs. We should get a notification
90
        // with the current best block immediately.
91
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
3✔
92
        if err != nil {
3✔
UNCOV
93
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
UNCOV
94
        }
×
95

96
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
3✔
97
                len(b.consumerQueues))
3✔
98
        defer clog.Debug("BlockbeatDispatcher started")
3✔
99

3✔
100
        b.wg.Add(1)
3✔
101
        go b.dispatchBlocks(blockEpochs)
3✔
102

3✔
103
        return nil
3✔
104
}
105

106
// Stop shuts down the blockbeat dispatcher.
107
func (b *BlockbeatDispatcher) Stop() {
3✔
108
        clog.Info("BlockbeatDispatcher is stopping")
3✔
109
        defer clog.Debug("BlockbeatDispatcher stopped")
3✔
110

3✔
111
        // Signal the dispatchBlocks goroutine to stop.
3✔
112
        close(b.quit)
3✔
113
        b.wg.Wait()
3✔
114
}
3✔
115

116
func (b *BlockbeatDispatcher) log() btclog.Logger {
3✔
117
        return b.beat.logger()
3✔
118
}
3✔
119

120
// dispatchBlocks listens to new block epoch and dispatches it to all the
121
// consumers. Each queue is notified concurrently, and the consumers in the
122
// same queue are notified sequentially.
123
//
124
// NOTE: Must be run as a goroutine.
125
func (b *BlockbeatDispatcher) dispatchBlocks(
126
        blockEpochs *chainntnfs.BlockEpochEvent) {
3✔
127

3✔
128
        defer b.wg.Done()
3✔
129
        defer blockEpochs.Cancel()
3✔
130

3✔
131
        for {
6✔
132
                select {
3✔
133
                case blockEpoch, ok := <-blockEpochs.Epochs:
3✔
134
                        if !ok {
3✔
135
                                clog.Debugf("Block epoch channel closed")
×
136

×
137
                                return
×
138
                        }
×
139

140
                        // Log a separator so it's easier to identify when a
141
                        // new block arrives for subsystems.
142
                        clog.Debugf("%v", lnutils.NewSeparatorClosure())
3✔
143

3✔
144
                        clog.Infof("Received new block %v at height %d, "+
3✔
145
                                "notifying consumers...", blockEpoch.Hash,
3✔
146
                                blockEpoch.Height)
3✔
147

3✔
148
                        // Record the time it takes the consumer to process
3✔
149
                        // this block.
3✔
150
                        start := time.Now()
3✔
151

3✔
152
                        // Update the current block epoch.
3✔
153
                        b.beat = NewBeat(*blockEpoch)
3✔
154

3✔
155
                        // Notify all consumers.
3✔
156
                        err := b.notifyQueues()
3✔
157
                        if err != nil {
3✔
158
                                b.log().Errorf("Notify block failed: %v", err)
×
159
                        }
×
160

161
                        b.log().Infof("Notified all consumers on new block "+
3✔
162
                                "in %v", time.Since(start))
3✔
163

164
                case <-b.quit:
3✔
165
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
3✔
166
                                "received")
3✔
167

3✔
168
                        return
3✔
169
                }
170
        }
171
}
172

173
// notifyQueues notifies each queue concurrently about the latest block epoch.
174
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
175
        // errChans is a map of channels that will be used to receive errors
3✔
176
        // returned from notifying the consumers.
3✔
177
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
178

3✔
179
        // Notify each queue in goroutines.
3✔
180
        for qid, consumers := range b.consumerQueues {
6✔
181
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
3✔
182
                        len(consumers))
3✔
183

3✔
184
                // Create a signal chan.
3✔
185
                errChan := make(chan error, 1)
3✔
186
                errChans[qid] = errChan
3✔
187

3✔
188
                // Notify each queue concurrently.
3✔
189
                go func(qid uint32, c []Consumer, beat Blockbeat) {
6✔
190
                        // Notify each consumer in this queue sequentially.
3✔
191
                        errChan <- DispatchSequential(beat, c)
3✔
192
                }(qid, consumers, b.beat)
3✔
193
        }
194

195
        // Wait for all consumers in each queue to finish.
196
        for qid, errChan := range errChans {
6✔
197
                select {
3✔
198
                case err := <-errChan:
3✔
199
                        if err != nil {
3✔
UNCOV
200
                                return fmt.Errorf("queue=%d got err: %w", qid,
×
UNCOV
201
                                        err)
×
UNCOV
202
                        }
×
203

204
                        b.log().Debugf("Notified queue=%d", qid)
3✔
205

206
                case <-b.quit:
1✔
207
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
1✔
208
                                "received, exit notifyQueues")
1✔
209

1✔
210
                        return nil
1✔
211
                }
212
        }
213

214
        return nil
3✔
215
}
216

217
// DispatchSequential takes a list of consumers and notify them about the new
218
// epoch sequentially. It requires the consumer to finish processing the block
219
// within the specified time, otherwise a timeout error is returned.
220
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
3✔
221
        for _, c := range consumers {
6✔
222
                // Send the beat to the consumer.
3✔
223
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
3✔
224
                if err != nil {
3✔
UNCOV
225
                        b.logger().Errorf("Failed to process block: %v", err)
×
UNCOV
226

×
UNCOV
227
                        return err
×
UNCOV
228
                }
×
229
        }
230

231
        return nil
3✔
232
}
233

234
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
235
// It requires the consumer to finish processing the block within the specified
236
// time, otherwise a timeout error is returned.
237
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
3✔
238
        eg := &errgroup.Group{}
3✔
239

3✔
240
        // Notify each queue in goroutines.
3✔
241
        for _, c := range consumers {
6✔
242
                // Notify each consumer concurrently.
3✔
243
                eg.Go(func() error {
6✔
244
                        // Send the beat to the consumer.
3✔
245
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
3✔
246

3✔
247
                        // Exit early if there's no error.
3✔
248
                        if err == nil {
6✔
249
                                return nil
3✔
250
                        }
3✔
251

252
                        b.logger().Errorf("Consumer=%v failed to process "+
×
253
                                "block: %v", c.Name(), err)
×
254

×
255
                        return err
×
256
                })
257
        }
258

259
        // Wait for all consumers in each queue to finish.
260
        if err := eg.Wait(); err != nil {
3✔
261
                return err
×
262
        }
×
263

264
        return nil
3✔
265
}
266

267
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
268
// consumer to finish processing the block within the specified time, otherwise
269
// a timeout error is returned.
270
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
3✔
271
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
3✔
272

3✔
273
        // Record the time it takes the consumer to process this block.
3✔
274
        start := time.Now()
3✔
275

3✔
276
        errChan := make(chan error, 1)
3✔
277
        go func() {
6✔
278
                errChan <- c.ProcessBlock(b)
3✔
279
        }()
3✔
280

281
        // We expect the consumer to finish processing this block under 30s,
282
        // otherwise a timeout error is returned.
283
        select {
3✔
284
        case err := <-errChan:
3✔
285
                if err == nil {
6✔
286
                        break
3✔
287
                }
288

UNCOV
289
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
×
UNCOV
290
                        err)
×
291

UNCOV
292
        case <-time.After(timeout):
×
UNCOV
293
                return fmt.Errorf("consumer %s: %w", c.Name(),
×
UNCOV
294
                        ErrProcessBlockTimeout)
×
295
        }
296

297
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
3✔
298
                time.Since(start))
3✔
299

3✔
300
        return nil
3✔
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc