• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.59
/chainio/dispatcher.go
1
package chainio
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "golang.org/x/sync/errgroup"
13
)
14

15
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
16
// consumer to finish processing the new block epoch.
17
var DefaultProcessBlockTimeout = 60 * time.Second
18

19
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
20
// to process the block.
21
var ErrProcessBlockTimeout = errors.New("process block timeout")
22

23
// BlockbeatDispatcher is a service that handles dispatching new blocks to
24
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
25
// implement the `Consumer` interface and register themselves via
26
// `RegisterQueue`. When two subsystems are independent of each other, they
27
// should be registered in different queues so blocks are notified concurrently.
28
// Otherwise, when living in the same queue, the subsystems are notified of the
29
// new blocks sequentially, which means it's critical to understand the
30
// relationship of these systems to properly handle the order.
31
type BlockbeatDispatcher struct {
32
        wg sync.WaitGroup
33

34
        // notifier is used to receive new block epochs.
35
        notifier chainntnfs.ChainNotifier
36

37
        // beat is the latest blockbeat received.
38
        beat Blockbeat
39

40
        // consumerQueues is a map of consumers that will receive blocks. Its
41
        // key is a unique counter and its value is a queue of consumers. Each
42
        // queue is notified concurrently, and consumers in the same queue is
43
        // notified sequentially.
44
        consumerQueues map[uint32][]Consumer
45

46
        // counter is used to assign a unique id to each queue.
47
        counter atomic.Uint32
48

49
        // quit is used to signal the BlockbeatDispatcher to stop.
50
        quit chan struct{}
51
}
52

53
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
54
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
5✔
55
        return &BlockbeatDispatcher{
5✔
56
                notifier:       n,
5✔
57
                quit:           make(chan struct{}),
5✔
58
                consumerQueues: make(map[uint32][]Consumer),
5✔
59
        }
5✔
60
}
5✔
61

62
// RegisterQueue takes a list of consumers and registers them in the same
63
// queue.
64
//
65
// NOTE: these consumers are notified sequentially.
66
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
6✔
67
        qid := b.counter.Add(1)
6✔
68

6✔
69
        b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
6✔
70
        clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
6✔
71
                len(consumers))
6✔
72

6✔
73
        for _, c := range consumers {
13✔
74
                clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
7✔
75
                        qid)
7✔
76
        }
7✔
77
}
78

79
// Start starts the blockbeat dispatcher - it registers a block notification
80
// and monitors and dispatches new blocks in a goroutine. It will refuse to
81
// start if there are no registered consumers.
82
func (b *BlockbeatDispatcher) Start() error {
3✔
83
        // Make sure consumers are registered.
3✔
84
        if len(b.consumerQueues) == 0 {
4✔
85
                return fmt.Errorf("no consumers registered")
1✔
86
        }
1✔
87

88
        // Start listening to new block epochs. We should get a notification
89
        // with the current best block immediately.
90
        blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
2✔
91
        if err != nil {
3✔
92
                return fmt.Errorf("register block epoch ntfn: %w", err)
1✔
93
        }
1✔
94

95
        clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
1✔
96
                len(b.consumerQueues))
1✔
97
        defer clog.Debug("BlockbeatDispatcher started")
1✔
98

1✔
99
        b.wg.Add(1)
1✔
100
        go b.dispatchBlocks(blockEpochs)
1✔
101

1✔
102
        return nil
1✔
103
}
104

105
// Stop shuts down the blockbeat dispatcher.
106
func (b *BlockbeatDispatcher) Stop() {
1✔
107
        clog.Info("BlockbeatDispatcher is stopping")
1✔
108
        defer clog.Debug("BlockbeatDispatcher stopped")
1✔
109

1✔
110
        // Signal the dispatchBlocks goroutine to stop.
1✔
111
        close(b.quit)
1✔
112
        b.wg.Wait()
1✔
113
}
1✔
114

115
func (b *BlockbeatDispatcher) log() btclog.Logger {
9✔
116
        return b.beat.logger()
9✔
117
}
9✔
118

119
// dispatchBlocks listens to new block epoch and dispatches it to all the
120
// consumers. Each queue is notified concurrently, and the consumers in the
121
// same queue are notified sequentially.
122
//
123
// NOTE: Must be run as a goroutine.
124
func (b *BlockbeatDispatcher) dispatchBlocks(
125
        blockEpochs *chainntnfs.BlockEpochEvent) {
2✔
126

2✔
127
        defer b.wg.Done()
2✔
128
        defer blockEpochs.Cancel()
2✔
129

2✔
130
        for {
5✔
131
                select {
3✔
132
                case blockEpoch, ok := <-blockEpochs.Epochs:
1✔
133
                        if !ok {
1✔
NEW
134
                                clog.Debugf("Block epoch channel closed")
×
NEW
135

×
NEW
136
                                return
×
NEW
137
                        }
×
138

139
                        clog.Infof("Received new block %v at height %d, "+
1✔
140
                                "notifying consumers...", blockEpoch.Hash,
1✔
141
                                blockEpoch.Height)
1✔
142

1✔
143
                        // Record the time it takes the consumer to process
1✔
144
                        // this block.
1✔
145
                        start := time.Now()
1✔
146

1✔
147
                        // Update the current block epoch.
1✔
148
                        b.beat = NewBeat(*blockEpoch)
1✔
149

1✔
150
                        // Notify all consumers.
1✔
151
                        err := b.notifyQueues()
1✔
152
                        if err != nil {
1✔
NEW
153
                                b.log().Errorf("Notify block failed: %v", err)
×
NEW
154
                        }
×
155

156
                        b.log().Infof("Notified all consumers on new block "+
1✔
157
                                "in %v", time.Since(start))
1✔
158

159
                case <-b.quit:
1✔
160
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
1✔
161
                                "received")
1✔
162

1✔
163
                        return
1✔
164
                }
165
        }
166
}
167

168
// notifyQueues notifies each queue concurrently about the latest block epoch.
169
func (b *BlockbeatDispatcher) notifyQueues() error {
3✔
170
        // errChans is a map of channels that will be used to receive errors
3✔
171
        // returned from notifying the consumers.
3✔
172
        errChans := make(map[uint32]chan error, len(b.consumerQueues))
3✔
173

3✔
174
        // Notify each queue in goroutines.
3✔
175
        for qid, consumers := range b.consumerQueues {
7✔
176
                b.log().Debugf("Notifying queue=%d with %d consumers", qid,
4✔
177
                        len(consumers))
4✔
178

4✔
179
                // Create a signal chan.
4✔
180
                errChan := make(chan error, 1)
4✔
181
                errChans[qid] = errChan
4✔
182

4✔
183
                // Notify each queue concurrently.
4✔
184
                go func(qid uint32, c []Consumer, beat Blockbeat) {
8✔
185
                        // Notify each consumer in this queue sequentially.
4✔
186
                        errChan <- DispatchSequential(beat, c)
4✔
187
                }(qid, consumers, b.beat)
4✔
188
        }
189

190
        // Wait for all consumers in each queue to finish.
191
        for qid, errChan := range errChans {
7✔
192
                select {
4✔
193
                case err := <-errChan:
4✔
194
                        if err != nil {
5✔
195
                                return fmt.Errorf("queue=%d got err: %w", qid,
1✔
196
                                        err)
1✔
197
                        }
1✔
198

199
                        b.log().Debugf("Notified queue=%d", qid)
3✔
200

NEW
201
                case <-b.quit:
×
NEW
202
                        b.log().Debugf("BlockbeatDispatcher quit signal " +
×
NEW
203
                                "received, exit notifyQueues")
×
NEW
204

×
NEW
205
                        return nil
×
206
                }
207
        }
208

209
        return nil
2✔
210
}
211

212
// DispatchSequential takes a list of consumers and notify them about the new
213
// epoch sequentially. It requires the consumer to finish processing the block
214
// within the specified time, otherwise a timeout error is returned.
215
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
5✔
216
        for _, c := range consumers {
12✔
217
                // Send the beat to the consumer.
7✔
218
                err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
7✔
219
                if err != nil {
8✔
220
                        b.logger().Errorf("Failed to process block: %v", err)
1✔
221

1✔
222
                        return err
1✔
223
                }
1✔
224
        }
225

226
        return nil
4✔
227
}
228

229
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
230
// It requires the consumer to finish processing the block within the specified
231
// time, otherwise a timeout error is returned.
NEW
232
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
×
NEW
233
        eg := &errgroup.Group{}
×
NEW
234

×
NEW
235
        // Notify each queue in goroutines.
×
NEW
236
        for _, c := range consumers {
×
NEW
237
                // Notify each consumer concurrently.
×
NEW
238
                eg.Go(func() error {
×
NEW
239
                        // Send the beat to the consumer.
×
NEW
240
                        err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
×
NEW
241

×
NEW
242
                        // Exit early if there's no error.
×
NEW
243
                        if err == nil {
×
NEW
244
                                return nil
×
NEW
245
                        }
×
246

NEW
247
                        b.logger().Errorf("Consumer=%v failed to process "+
×
NEW
248
                                "block: %v", c.Name(), err)
×
NEW
249

×
NEW
250
                        return err
×
251
                })
252
        }
253

254
        // Wait for all consumers in each queue to finish.
NEW
255
        if err := eg.Wait(); err != nil {
×
NEW
256
                return err
×
NEW
257
        }
×
258

NEW
259
        return nil
×
260
}
261

262
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
263
// consumer to finish processing the block within the specified time, otherwise
264
// a timeout error is returned.
265
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
10✔
266
        b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
10✔
267

10✔
268
        // Record the time it takes the consumer to process this block.
10✔
269
        start := time.Now()
10✔
270

10✔
271
        errChan := make(chan error, 1)
10✔
272
        go func() {
20✔
273
                errChan <- c.ProcessBlock(b)
10✔
274
        }()
10✔
275

276
        // We expect the consumer to finish processing this block under 30s,
277
        // otherwise a timeout error is returned.
278
        select {
10✔
279
        case err := <-errChan:
9✔
280
                if err == nil {
16✔
281
                        break
7✔
282
                }
283

284
                return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
2✔
285
                        err)
2✔
286

287
        case <-time.After(timeout):
1✔
288
                return fmt.Errorf("consumer %s: %w", c.Name(),
1✔
289
                        ErrProcessBlockTimeout)
1✔
290
        }
291

292
        b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
7✔
293
                time.Since(start))
7✔
294

7✔
295
        return nil
7✔
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc