• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.21
/discovery/validation_barrier.go
1
package discovery
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/go-errors/errors"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
var (
15
        // ErrVBarrierShuttingDown signals that the barrier has been requested
16
        // to shutdown, and that the caller should not treat the wait condition
17
        // as fulfilled.
18
        ErrVBarrierShuttingDown = errors.New("ValidationBarrier shutting down")
19
)
20

21
// JobID identifies an active job in the validation barrier. It is large so
22
// that we don't need to worry about overflows.
23
type JobID uint64
24

25
// jobInfo stores job dependency info for a set of dependent gossip messages.
26
type jobInfo struct {
27
        // activeParentJobIDs is the set of active parent job ids.
28
        activeParentJobIDs fn.Set[JobID]
29

30
        // activeDependentJobs is the set of active dependent job ids.
31
        activeDependentJobs fn.Set[JobID]
32
}
33

34
// ValidationBarrier is a barrier used to enforce a strict validation order
35
// while concurrently validating other updates for channel edges. It uses a set
36
// of maps to track validation dependencies. This is needed in practice because
37
// gossip messages for a given channel may arive in order, but then due to
38
// scheduling in different goroutines, may be validated in the wrong order.
39
// With the ValidationBarrier, the dependent update will wait until the parent
40
// update completes.
41
type ValidationBarrier struct {
42
        // validationSemaphore is a channel of structs which is used as a
43
        // semaphore. Initially we'll fill this with a buffered channel of the
44
        // size of the number of active requests. Each new job will consume
45
        // from this channel, then restore the value upon completion.
46
        validationSemaphore chan struct{}
47

48
        // jobInfoMap stores the set of job ids for each channel.
49
        // NOTE: This MUST be used with the mutex.
50
        // NOTE: This currently stores string representations of
51
        // lnwire.ShortChannelID and route.Vertex. Since these are of different
52
        // lengths, collision cannot occur in their string representations.
53
        // N.B.: Check that any new string-converted types don't collide with
54
        // existing string-converted types.
55
        jobInfoMap map[string]*jobInfo
56

57
        // jobDependencies is a mapping from a child's JobID to the set of
58
        // parent JobID that it depends on.
59
        // NOTE: This MUST be used with the mutex.
60
        jobDependencies map[JobID]fn.Set[JobID]
61

62
        // childJobChans stores the notification channel that each child job
63
        // listens on for parent job completions.
64
        // NOTE: This MUST be used with the mutex.
65
        childJobChans map[JobID]chan struct{}
66

67
        // idCtr is an atomic integer that is used to assign JobIDs.
68
        idCtr atomic.Uint64
69

70
        quit chan struct{}
71
        sync.Mutex
72
}
73

74
// NewValidationBarrier creates a new instance of a validation barrier given
75
// the total number of active requests, and a quit channel which will be used
76
// to know when to kill pending, but unfilled jobs.
77
func NewValidationBarrier(numActiveReqs int,
78
        quitChan chan struct{}) *ValidationBarrier {
32✔
79

32✔
80
        v := &ValidationBarrier{
32✔
81
                jobInfoMap:      make(map[string]*jobInfo),
32✔
82
                jobDependencies: make(map[JobID]fn.Set[JobID]),
32✔
83
                childJobChans:   make(map[JobID]chan struct{}),
32✔
84
                quit:            quitChan,
32✔
85
        }
32✔
86

32✔
87
        // We'll first initialize a set of semaphores to limit our concurrency
32✔
88
        // when validating incoming requests in parallel.
32✔
89
        v.validationSemaphore = make(chan struct{}, numActiveReqs)
32✔
90
        for i := 0; i < numActiveReqs; i++ {
29,064✔
91
                v.validationSemaphore <- struct{}{}
29,032✔
92
        }
29,032✔
93

94
        return v
32✔
95
}
96

97
// InitJobDependencies will wait for a new job slot to become open, and then
98
// sets up any dependent signals/trigger for the new job.
99
func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID,
100
        error) {
346✔
101

346✔
102
        // We'll wait for either a new slot to become open, or for the quit
346✔
103
        // channel to be closed.
346✔
104
        select {
346✔
105
        case <-v.validationSemaphore:
346✔
106
        case <-v.quit:
×
107
        }
108

109
        v.Lock()
346✔
110
        defer v.Unlock()
346✔
111

346✔
112
        // updateOrCreateJobInfo modifies the set of activeParentJobs for this
346✔
113
        // annID and updates jobInfoMap.
346✔
114
        updateOrCreateJobInfo := func(annID string, annJobID JobID) {
1,069✔
115
                info, ok := v.jobInfoMap[annID]
723✔
116
                if ok {
726✔
117
                        // If an entry already exists for annID, then a job
3✔
118
                        // related to it is being validated. Add to the set of
3✔
119
                        // parent job ids. This addition will only affect
3✔
120
                        // _later_, _child_ jobs for the annID.
3✔
121
                        info.activeParentJobIDs.Add(annJobID)
3✔
122
                        return
3✔
123
                }
3✔
124

125
                // No entry exists for annID, meaning that we should create
126
                // one.
127
                parentJobSet := fn.NewSet(annJobID)
720✔
128

720✔
129
                info = &jobInfo{
720✔
130
                        activeParentJobIDs:  parentJobSet,
720✔
131
                        activeDependentJobs: fn.NewSet[JobID](),
720✔
132
                }
720✔
133
                v.jobInfoMap[annID] = info
720✔
134
        }
135

136
        // populateDependencies populates the job dependency mappings (i.e.
137
        // which should complete after another) for the (annID, childJobID)
138
        // tuple.
139
        populateDependencies := func(annID string, childJobID JobID) {
451✔
140
                // If there is no entry in the jobInfoMap, we don't have to
105✔
141
                // wait on any parent jobs to finish.
105✔
142
                info, ok := v.jobInfoMap[annID]
105✔
143
                if !ok {
200✔
144
                        return
95✔
145
                }
95✔
146

147
                // We want to see a snapshot of active parent jobs for this
148
                // annID that are already registered in activeParentJobIDs. The
149
                // child job identified by childJobID can only run after these
150
                // parent jobs have run. After grabbing the snapshot, we then
151
                // want to persist a slice of these jobs.
152

153
                // Create the notification chan that parent jobs will send (or
154
                // close) on when they complete.
155
                jobChan := make(chan struct{})
10✔
156

10✔
157
                // Add to set of activeDependentJobs for this annID.
10✔
158
                info.activeDependentJobs.Add(childJobID)
10✔
159

10✔
160
                // Store in childJobChans. The parent jobs will fetch this chan
10✔
161
                // to notify on. The child job will later fetch this chan to
10✔
162
                // listen on when WaitForParents is called.
10✔
163
                v.childJobChans[childJobID] = jobChan
10✔
164

10✔
165
                // Copy over the parent job IDs at this moment for this annID.
10✔
166
                // This job must be processed AFTER those parent IDs.
10✔
167
                parentJobs := info.activeParentJobIDs.Copy()
10✔
168

10✔
169
                // Populate the jobDependencies mapping.
10✔
170
                v.jobDependencies[childJobID] = parentJobs
10✔
171
        }
172

173
        // Once a slot is open, we'll examine the message of the job, to see if
174
        // there need to be any dependent barriers set up.
175
        switch msg := job.(type) {
346✔
176
        case *lnwire.ChannelAnnouncement1:
241✔
177
                id := JobID(v.idCtr.Add(1))
241✔
178

241✔
179
                updateOrCreateJobInfo(msg.ShortChannelID.String(), id)
241✔
180
                updateOrCreateJobInfo(route.Vertex(msg.NodeID1).String(), id)
241✔
181
                updateOrCreateJobInfo(route.Vertex(msg.NodeID2).String(), id)
241✔
182

241✔
183
                return id, nil
241✔
184

185
        // Populate the dependency mappings for the below child jobs.
186
        case *lnwire.ChannelUpdate1:
80✔
187
                childJobID := JobID(v.idCtr.Add(1))
80✔
188
                populateDependencies(msg.ShortChannelID.String(), childJobID)
80✔
189

80✔
190
                return childJobID, nil
80✔
191
        case *lnwire.NodeAnnouncement:
25✔
192
                childJobID := JobID(v.idCtr.Add(1))
25✔
193
                populateDependencies(
25✔
194
                        route.Vertex(msg.NodeID).String(), childJobID,
25✔
195
                )
25✔
196

25✔
197
                return childJobID, nil
25✔
198
        case *lnwire.AnnounceSignatures1:
×
199
                // TODO(roasbeef): need to wait on chan ann?
×
200
                // - We can do the above by calling populateDependencies. For
×
201
                //   now, while we evaluate potential side effects, don't do
×
202
                //   anything with childJobID and just return it.
×
203
                childJobID := JobID(v.idCtr.Add(1))
×
204
                return childJobID, nil
×
205

206
        default:
×
207
                // An invalid message was passed into InitJobDependencies.
×
208
                // Return an error.
×
209
                return JobID(0), errors.New("invalid message")
×
210
        }
211
}
212

213
// CompleteJob returns a free slot to the set of available job slots. This
214
// should be called once a job has been fully completed. Otherwise, slots may
215
// not be returned to the internal scheduling, causing a deadlock when a new
216
// overflow job is attempted.
217
func (v *ValidationBarrier) CompleteJob() {
321✔
218
        select {
321✔
219
        case v.validationSemaphore <- struct{}{}:
321✔
220
        case <-v.quit:
×
221
        }
222
}
223

224
// WaitForParents will block until all parent job dependencies have went
225
// through the validation pipeline. This allows us a graceful way to run jobs
226
// in goroutines and still have strict ordering guarantees. If this job doesn't
227
// have any parent job dependencies, then this function will return
228
// immediately.
229
func (v *ValidationBarrier) WaitForParents(childJobID JobID,
230
        job interface{}) error {
319✔
231

319✔
232
        var (
319✔
233
                ok      bool
319✔
234
                jobDesc string
319✔
235

319✔
236
                parentJobIDs fn.Set[JobID]
319✔
237
                annID        string
319✔
238
                jobChan      chan struct{}
319✔
239
        )
319✔
240

319✔
241
        // Acquire a lock to read ValidationBarrier.
319✔
242
        v.Lock()
319✔
243

319✔
244
        switch msg := job.(type) {
319✔
245
        // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
246
        // completion of any active ChannelAnnouncement jobs related to them.
247
        case *lnwire.ChannelUpdate1:
64✔
248
                annID = msg.ShortChannelID.String()
64✔
249

64✔
250
                parentJobIDs, ok = v.jobDependencies[childJobID]
64✔
251
                if !ok {
119✔
252
                        // If ok is false, it means that this child job never
55✔
253
                        // had any parent jobs to wait on.
55✔
254
                        v.Unlock()
55✔
255
                        return nil
55✔
256
                }
55✔
257

258
                jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v",
9✔
259
                        msg.ShortChannelID.ToUint64())
9✔
260

261
        case *lnwire.NodeAnnouncement:
25✔
262
                annID = route.Vertex(msg.NodeID).String()
25✔
263

25✔
264
                parentJobIDs, ok = v.jobDependencies[childJobID]
25✔
265
                if !ok {
49✔
266
                        // If ok is false, it means that this child job never
24✔
267
                        // had any parent jobs to wait on.
24✔
268
                        v.Unlock()
24✔
269
                        return nil
24✔
270
                }
24✔
271

272
                jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s",
1✔
273
                        route.Vertex(msg.NodeID))
1✔
274

275
        // Other types of jobs can be executed immediately, so we'll just
276
        // return directly.
277
        case *lnwire.AnnounceSignatures1:
×
278
                // TODO(roasbeef): need to wait on chan ann?
×
279
                v.Unlock()
×
280
                return nil
×
281

282
        case *lnwire.ChannelAnnouncement1:
230✔
283
                v.Unlock()
230✔
284
                return nil
230✔
285
        }
286

287
        // Release the lock once the above read is finished.
288
        v.Unlock()
10✔
289

10✔
290
        log.Debugf("Waiting for dependent on %s", jobDesc)
10✔
291

10✔
292
        v.Lock()
10✔
293
        jobChan, ok = v.childJobChans[childJobID]
10✔
294
        if !ok {
10✔
295
                v.Unlock()
×
296

×
297
                // The entry may not exist because this job does not depend on
×
298
                // any parent jobs.
×
299
                return nil
×
300
        }
×
301
        v.Unlock()
10✔
302

10✔
303
        for {
20✔
304
                select {
10✔
305
                case <-v.quit:
4✔
306
                        return ErrVBarrierShuttingDown
4✔
307

308
                case <-jobChan:
6✔
309
                        // Every time this is sent on or if it's closed, a
6✔
310
                        // parent job has finished. The parent jobs have to
6✔
311
                        // also potentially close the channel because if all
6✔
312
                        // the parent jobs finish and call SignalDependents
6✔
313
                        // before the goroutine running WaitForParents has a
6✔
314
                        // chance to grab the notification chan from
6✔
315
                        // childJobChans, then the running goroutine will wait
6✔
316
                        // here for a notification forever. By having the last
6✔
317
                        // parent job close the notificiation chan, we avoid
6✔
318
                        // this issue.
6✔
319

6✔
320
                        // Check and see if we have any parent jobs left. If we
6✔
321
                        // don't, we can finish up.
6✔
322
                        v.Lock()
6✔
323
                        info, found := v.jobInfoMap[annID]
6✔
324
                        if !found {
12✔
325
                                v.Unlock()
6✔
326

6✔
327
                                // No parent job info found, proceed with
6✔
328
                                // validation.
6✔
329
                                return nil
6✔
330
                        }
6✔
331

UNCOV
332
                        x := parentJobIDs.Intersect(info.activeParentJobIDs)
×
UNCOV
333
                        v.Unlock()
×
UNCOV
334
                        if x.IsEmpty() {
×
UNCOV
335
                                // The parent jobs have all completed. We can
×
UNCOV
336
                                // proceed with validation.
×
UNCOV
337
                                return nil
×
UNCOV
338
                        }
×
339

340
                        // If we've reached this point, we are still waiting on
341
                        // a parent job to complete.
342
                }
343
        }
344
}
345

346
// SignalDependents signals to any child jobs that this parent job has
347
// finished.
348
func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error {
316✔
349
        v.Lock()
316✔
350
        defer v.Unlock()
316✔
351

316✔
352
        // removeJob either removes a child job or a parent job. If it is
316✔
353
        // removing a child job, then it removes the child's JobID from the set
316✔
354
        // of dependent jobs for the announcement ID. If this is removing a
316✔
355
        // parent job, then it removes the parentJobID from the set of active
316✔
356
        // parent jobs and notifies the child jobs that it has finished
316✔
357
        // validating.
316✔
358
        removeJob := func(annID string, id JobID, child bool) error {
1,106✔
359
                if child {
869✔
360
                        // If we're removing a child job, check jobInfoMap and
79✔
361
                        // remove this job from activeDependentJobs.
79✔
362
                        info, ok := v.jobInfoMap[annID]
79✔
363
                        if ok {
79✔
UNCOV
364
                                info.activeDependentJobs.Remove(id)
×
UNCOV
365
                        }
×
366

367
                        // Remove the notification chan from childJobChans.
368
                        delete(v.childJobChans, id)
79✔
369

79✔
370
                        // Remove this job's dependency mapping.
79✔
371
                        delete(v.jobDependencies, id)
79✔
372

79✔
373
                        return nil
79✔
374
                }
375

376
                // Otherwise, we are removing a parent job.
377
                jobInfo, found := v.jobInfoMap[annID]
711✔
378
                if !found {
711✔
379
                        // NOTE: Some sort of consistency guarantee has been
×
380
                        // broken.
×
381
                        return fmt.Errorf("no job info found for "+
×
382
                                "identifier(%v)", id)
×
383
                }
×
384

385
                jobInfo.activeParentJobIDs.Remove(id)
711✔
386

711✔
387
                lastJob := jobInfo.activeParentJobIDs.IsEmpty()
711✔
388

711✔
389
                // Notify all dependent jobs that a parent job has completed.
711✔
390
                for child := range jobInfo.activeDependentJobs {
720✔
391
                        notifyChan, ok := v.childJobChans[child]
9✔
392
                        if !ok {
9✔
393
                                // NOTE: Some sort of consistency guarantee has
×
394
                                // been broken.
×
395
                                return fmt.Errorf("no job info found for "+
×
396
                                        "identifier(%v)", id)
×
397
                        }
×
398

399
                        // We don't want to block when sending out the signal.
400
                        select {
9✔
401
                        case notifyChan <- struct{}{}:
6✔
402
                        default:
3✔
403
                        }
404

405
                        // If this is the last parent job for this annID, also
406
                        // close the channel. This is needed because it's
407
                        // possible that the parent job cleans up the job
408
                        // mappings before the goroutine handling the child job
409
                        // has a chance to call WaitForParents and catch the
410
                        // signal sent above. We are allowed to close because
411
                        // no other parent job will be able to send along the
412
                        // channel (or close) as we're removing the entry from
413
                        // the jobInfoMap below.
414
                        if lastJob {
15✔
415
                                close(notifyChan)
6✔
416
                        }
6✔
417
                }
418

419
                // Remove from jobInfoMap if last job.
420
                if lastJob {
1,419✔
421
                        delete(v.jobInfoMap, annID)
708✔
422
                }
708✔
423

424
                return nil
711✔
425
        }
426

427
        switch msg := job.(type) {
316✔
428
        case *lnwire.ChannelAnnouncement1:
237✔
429
                // Signal to the child jobs that parent validation has
237✔
430
                // finished. We have to call removeJob for each annID
237✔
431
                // that this ChannelAnnouncement can be associated with.
237✔
432
                err := removeJob(msg.ShortChannelID.String(), id, false)
237✔
433
                if err != nil {
237✔
434
                        return err
×
435
                }
×
436

437
                err = removeJob(route.Vertex(msg.NodeID1).String(), id, false)
237✔
438
                if err != nil {
237✔
439
                        return err
×
440
                }
×
441

442
                err = removeJob(route.Vertex(msg.NodeID2).String(), id, false)
237✔
443
                if err != nil {
237✔
444
                        return err
×
445
                }
×
446

447
                return nil
237✔
448

449
        case *lnwire.NodeAnnouncement:
24✔
450
                // Remove child job info.
24✔
451
                return removeJob(route.Vertex(msg.NodeID).String(), id, true)
24✔
452

453
        case *lnwire.ChannelUpdate1:
55✔
454
                // Remove child job info.
55✔
455
                return removeJob(msg.ShortChannelID.String(), id, true)
55✔
456

457
        case *lnwire.AnnounceSignatures1:
×
458
                // No dependency mappings are stored for AnnounceSignatures1,
×
459
                // so do nothing.
×
460
                return nil
×
461
        }
462

463
        return errors.New("invalid message - no job dependencies")
×
464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc