• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12259140063

10 Dec 2024 03:16PM UTC coverage: 49.762% (-0.01%) from 49.773%
12259140063

Pull #9241

github

Crypt-iQ
release-notes: update for 0.19.0
Pull Request #9241: discovery+graph: track job set dependencies in vb

194 of 238 new or added lines in 3 files covered. (81.51%)

81 existing lines in 11 files now uncovered.

100106 of 201168 relevant lines covered (49.76%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.75
/graph/validation_barrier.go
1
package graph
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/go-errors/errors"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// JobID identifies an active job in the validation barrier. It is large so
15
// that we don't need to worry about overflows.
16
type JobID uint64
17

18
// jobInfo stores job dependency info for a set of dependent gossip messages.
19
type jobInfo struct {
20
        // activeParentJobIDs is the set of active parent job ids.
21
        activeParentJobIDs fn.Set[JobID]
22

23
        // activeDependentJobs is the set of active dependent job ids.
24
        activeDependentJobs fn.Set[JobID]
25
}
26

27
// ValidationBarrier is a barrier used to enforce a strict validation order
28
// while concurrently validating other updates for channel edges. It uses a set
29
// of maps to track validation dependencies. This is needed in practice because
30
// gossip messages for a given channel may arive in order, but then due to
31
// scheduling in different goroutines, may be validated in the wrong order.
32
// With the ValidationBarrier, the dependent update will wait until the parent
33
// update completes.
34
type ValidationBarrier struct {
35
        // validationSemaphore is a channel of structs which is used as a
36
        // semaphore. Initially we'll fill this with a buffered channel of the
37
        // size of the number of active requests. Each new job will consume
38
        // from this channel, then restore the value upon completion.
39
        validationSemaphore chan struct{}
40

41
        // jobIDMap stores the set of job ids for each channel.
42
        // NOTE: This MUST be used with the mutex.
43
        // NOTE: We don't need to worry about collisions between
44
        // lnire.ShortChannelID and route.Vertex because they are of different
45
        // length and entries therefore cannot hash to the same keys.
46
        // NOTE: IF OTHER TYPES OF KEYS ARE STORED, CHECK THAT COLLISION WON'T
47
        // OCCUR.
48
        jobInfoMap map[any]*jobInfo
49

50
        // jobDependencies is a mapping from a child's JobID to the set of
51
        // parent JobID that it depends on.
52
        // NOTE: This MUST be used with the mutex.
53
        jobDependencies map[JobID]fn.Set[JobID]
54

55
        // childJobChans stores the notification channel that each child job
56
        // listens on for parent job completions.
57
        // NOTE: This MUST be used with the mutex.
58
        childJobChans map[JobID]chan struct{}
59

60
        // idCtr is an atomic integer that is used to assign JobIDs.
61
        idCtr atomic.Uint64
62

63
        quit chan struct{}
64
        sync.Mutex
65
}
66

67
// NewValidationBarrier creates a new instance of a validation barrier given
68
// the total number of active requests, and a quit channel which will be used
69
// to know when to kill pending, but unfilled jobs.
70
func NewValidationBarrier(numActiveReqs int,
71
        quitChan chan struct{}) *ValidationBarrier {
3✔
72

3✔
73
        v := &ValidationBarrier{
3✔
74
                jobInfoMap:      make(map[any]*jobInfo),
3✔
75
                jobDependencies: make(map[JobID]fn.Set[JobID]),
3✔
76
                childJobChans:   make(map[JobID]chan struct{}),
3✔
77
                quit:            quitChan,
3✔
78
        }
3✔
79

3✔
80
        // We'll first initialize a set of semaphores to limit our concurrency
3✔
81
        // when validating incoming requests in parallel.
3✔
82
        v.validationSemaphore = make(chan struct{}, numActiveReqs)
3✔
83
        for i := 0; i < numActiveReqs; i++ {
6✔
84
                v.validationSemaphore <- struct{}{}
3✔
85
        }
3✔
86

87
        return v
3✔
88
}
89

90
// InitJobDependencies will wait for a new job slot to become open, and then
91
// sets up any dependent signals/trigger for the new job
92
func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID,
93
        error) {
3✔
94

3✔
95
        // We'll wait for either a new slot to become open, or for the quit
3✔
96
        // channel to be closed.
3✔
97
        select {
3✔
98
        case <-v.validationSemaphore:
3✔
99
        case <-v.quit:
×
100
        }
101

102
        v.Lock()
3✔
103
        defer v.Unlock()
3✔
104

3✔
105
        // updateOrCreateJobInfo modifies the set of activeParentJobs for this
3✔
106
        // annID and updates jobInfoMap.
3✔
107
        updateOrCreateJobInfo := func(annID any, annJobID JobID) {
6✔
108
                info, ok := v.jobInfoMap[annID]
3✔
109
                if ok {
6✔
110
                        // If an entry already exists for annID, then a job
3✔
111
                        // related to it is being validated. Add to the set of
3✔
112
                        // parent job ids. This addition will only affect
3✔
113
                        // _later_, _child_ jobs for the annID.
3✔
114
                        info.activeParentJobIDs.Add(annJobID)
3✔
115
                        return
3✔
116
                }
3✔
117

118
                // No entry exists for annID, meaning that we should create
119
                // one.
120
                parentJobSet := fn.NewSet(annJobID)
3✔
121

3✔
122
                info = &jobInfo{
3✔
123
                        activeParentJobIDs:  parentJobSet,
3✔
124
                        activeDependentJobs: fn.NewSet[JobID](),
3✔
125
                }
3✔
126
                v.jobInfoMap[annID] = info
3✔
127
        }
128

129
        // populateDependencies populates the job dependency mappings (i.e.
130
        // which should complete after another) for the (annID, childJobID)
131
        // tuple.
132
        populateDependencies := func(annID any, childJobID JobID) {
6✔
133
                // If there is no entry in the jobInfoMap, we don't have to
3✔
134
                // wait on any parent jobs to finish.
3✔
135
                info, ok := v.jobInfoMap[annID]
3✔
136
                if !ok {
6✔
137
                        return
3✔
138
                }
3✔
139

140
                // We want to see a snapshot of active parent jobs for this
141
                // annID that are already registered in activeParentJobIDs. The
142
                // child job identified by childJobID can only run after these
143
                // parent jobs have run. After grabbing the snapshot, we then
144
                // want to persist a slice of these jobs.
145

146
                // Create the notification chan that parent jobs will send (or
147
                // close) on when they complete.
148
                jobChan := make(chan struct{})
3✔
149

3✔
150
                // Add to set of activeDependentJobs for this annID.
3✔
151
                info.activeDependentJobs.Add(childJobID)
3✔
152

3✔
153
                // Store in childJobChans. The parent jobs will fetch this chan
3✔
154
                // to notify on. The child job will later fetch this chan to
3✔
155
                // listen on when WaitForParents is called.
3✔
156
                v.childJobChans[childJobID] = jobChan
3✔
157

3✔
158
                // Copy over the parent job IDs at this moment for this annID.
3✔
159
                // This job must be processed AFTER those parent IDs.
3✔
160
                parentJobs := info.activeParentJobIDs.Union(fn.NewSet[JobID]())
3✔
161

3✔
162
                // Populate the jobDependencies mapping.
3✔
163
                v.jobDependencies[childJobID] = parentJobs
3✔
164
        }
165

166
        // Once a slot is open, we'll examine the message of the job, to see if
167
        // there need to be any dependent barriers set up.
168
        switch msg := job.(type) {
3✔
169
        case *lnwire.ChannelAnnouncement1:
3✔
170
                id := JobID(v.idCtr.Add(1))
3✔
171

3✔
172
                updateOrCreateJobInfo(msg.ShortChannelID, id)
3✔
173
                updateOrCreateJobInfo(route.Vertex(msg.NodeID1), id)
3✔
174
                updateOrCreateJobInfo(route.Vertex(msg.NodeID2), id)
3✔
175

3✔
176
                return id, nil
3✔
177

178
        // Populate the dependency mappings for the below child jobs.
179
        case *lnwire.ChannelUpdate1:
3✔
180
                childJobID := JobID(v.idCtr.Add(1))
3✔
181
                populateDependencies(msg.ShortChannelID, childJobID)
3✔
182

3✔
183
                return childJobID, nil
3✔
184
        case *lnwire.NodeAnnouncement:
3✔
185
                childJobID := JobID(v.idCtr.Add(1))
3✔
186
                populateDependencies(route.Vertex(msg.NodeID), childJobID)
3✔
187

3✔
188
                return childJobID, nil
3✔
189
        case *lnwire.AnnounceSignatures1:
×
190
                // TODO(roasbeef): need to wait on chan ann?
×
NEW
191
                // - We can do the above by calling populateDependencies. For
×
NEW
192
                //   now, while we evaluate potential side effects, don't do
×
NEW
193
                //   anything with childJobID and just return it.
×
NEW
194
                childJobID := JobID(v.idCtr.Add(1))
×
NEW
195
                return childJobID, nil
×
196

NEW
197
        default:
×
NEW
198
                // An invalid message was passed into InitJobDependencies.
×
NEW
199
                // Return an error.
×
NEW
200
                return JobID(0), errors.New("invalid message")
×
201
        }
202
}
203

204
// CompleteJob returns a free slot to the set of available job slots. This
205
// should be called once a job has been fully completed. Otherwise, slots may
206
// not be returned to the internal scheduling, causing a deadlock when a new
207
// overflow job is attempted.
208
func (v *ValidationBarrier) CompleteJob() {
3✔
209
        select {
3✔
210
        case v.validationSemaphore <- struct{}{}:
3✔
211
        case <-v.quit:
3✔
212
        }
213
}
214

215
// WaitForParents will block until all parent job dependencies have went
216
// through the validation pipeline. This allows us a graceful way to run jobs
217
// in goroutines and still have strict ordering guarantees. If this job doesn't
218
// have any validation dependencies, then this function will return
219
// immediately.
220
func (v *ValidationBarrier) WaitForParents(childJobID JobID,
221
        job interface{}) error {
3✔
222

3✔
223
        var (
3✔
224
                ok      bool
3✔
225
                jobDesc string
3✔
226

3✔
227
                parentJobIDs fn.Set[JobID]
3✔
228
                annID        any
3✔
229
                jobChan      chan struct{}
3✔
230
        )
3✔
231

3✔
232
        // Acquire a lock to read ValidationBarrier.
3✔
233
        v.Lock()
3✔
234

3✔
235
        switch msg := job.(type) {
3✔
236
        // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
237
        // completion of any active ChannelAnnouncement jobs related to them.
238
        case *lnwire.ChannelUpdate1:
3✔
239
                annID = msg.ShortChannelID
3✔
240

3✔
241
                parentJobIDs, ok = v.jobDependencies[childJobID]
3✔
242
                if !ok {
6✔
243
                        // If ok is false, it means that this child job never
3✔
244
                        // had any parent jobs to wait on.
3✔
245
                        v.Unlock()
3✔
246
                        return nil
3✔
247
                }
3✔
248

249
                jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v",
3✔
250
                        msg.ShortChannelID.ToUint64())
3✔
251

252
        case *lnwire.NodeAnnouncement:
3✔
253
                annID = route.Vertex(msg.NodeID)
3✔
254

3✔
255
                parentJobIDs, ok = v.jobDependencies[childJobID]
3✔
256
                if !ok {
6✔
257
                        // If ok is false, it means that this child job never
3✔
258
                        // had any parent jobs to wait on.
3✔
259
                        v.Unlock()
3✔
260
                        return nil
3✔
261
                }
3✔
262

263
                jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s",
3✔
264
                        route.Vertex(msg.NodeID))
3✔
265

266
        // Other types of jobs can be executed immediately, so we'll just
267
        // return directly.
268
        case *lnwire.AnnounceSignatures1:
×
269
                // TODO(roasbeef): need to wait on chan ann?
270
        case *lnwire.ChannelAnnouncement1:
3✔
271
        }
272

273
        // Release the lock once the above read is finished.
274
        v.Unlock()
3✔
275

3✔
276
        // If it's not ok, it means either the job is not a dependent type, or
3✔
277
        // it doesn't have a dependency signal. Either way, we can return
3✔
278
        // early.
3✔
279
        if !ok {
6✔
280
                return nil
3✔
281
        }
3✔
282

283
        log.Debugf("Waiting for dependent on %s", jobDesc)
3✔
284

3✔
285
        v.Lock()
3✔
286
        jobChan, ok = v.childJobChans[childJobID]
3✔
287
        if !ok {
3✔
NEW
288
                v.Unlock()
×
UNCOV
289

×
NEW
290
                // The entry may not exist because this job does not depend on
×
NEW
291
                // any parent jobs.
×
UNCOV
292
                return nil
×
UNCOV
293
        }
×
294
        v.Unlock()
3✔
295

3✔
296
        for {
6✔
297
                select {
3✔
NEW
298
                case <-v.quit:
×
NEW
299
                        return NewErrf(ErrVBarrierShuttingDown,
×
NEW
300
                                "validation barrier shutting down")
×
301

302
                case <-jobChan:
3✔
303
                        // Every time this is sent on or if it's closed, a
3✔
304
                        // parent job has finished. The parent jobs have to
3✔
305
                        // also potentially close the channel because if all
3✔
306
                        // the parent jobs finish and call SignalDependents
3✔
307
                        // before the goroutine running WaitForParents has a
3✔
308
                        // chance to grab the notification chan from
3✔
309
                        // childJobChans, then the running goroutine will wait
3✔
310
                        // here for a notification forever. By having the last
3✔
311
                        // parent job close the notificiation chan, we avoid
3✔
312
                        // this issue.
3✔
313

3✔
314
                        // Check and see if we have any parent jobs left. If we
3✔
315
                        // don't, we can finish up.
3✔
316
                        v.Lock()
3✔
317
                        info, found := v.jobInfoMap[annID]
3✔
318
                        if !found {
6✔
319
                                v.Unlock()
3✔
320

3✔
321
                                // No parent job info found, proceed with
3✔
322
                                // validation.
3✔
323
                                return nil
3✔
324
                        }
3✔
325

326
                        x := parentJobIDs.Intersect(info.activeParentJobIDs)
3✔
327
                        v.Unlock()
3✔
328
                        if x.IsEmpty() {
6✔
329
                                // The parent jobs have all completed. We can
3✔
330
                                // proceed with validation.
3✔
331
                                return nil
3✔
332
                        }
3✔
333

334
                        // If we've reached this point, we are still waiting on
335
                        // a parent job to complete.
336
                }
337
        }
338
}
339

340
// SignalDependents signals to any child jobs that this parent job has
341
// finished.
342
func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error {
3✔
343
        v.Lock()
3✔
344
        defer v.Unlock()
3✔
345

3✔
346
        // removeJob either removes a child job or a parent job. If it is
3✔
347
        // removing a child job, then it removes the child's JobID from the set
3✔
348
        // of dependent jobs for the announcement ID. If this is removing a
3✔
349
        // parent job, then it removes the parentJobID from the set of active
3✔
350
        // parent jobs and notifies the child jobs that it has finished
3✔
351
        // validating.
3✔
352
        removeJob := func(annID any, id JobID, child bool) error {
6✔
353
                if child {
6✔
354
                        // If we're removing a child job, check jobInfoMap and
3✔
355
                        // remove this job from activeDependentJobs.
3✔
356
                        info, ok := v.jobInfoMap[annID]
3✔
357
                        if ok {
6✔
358
                                info.activeDependentJobs.Remove(id)
3✔
359
                        }
3✔
360

361
                        // Remove the notification chan from childJobChans.
362
                        delete(v.childJobChans, id)
3✔
363

3✔
364
                        // Remove this job's dependency mapping.
3✔
365
                        delete(v.jobDependencies, id)
3✔
366

3✔
367
                        return nil
3✔
368
                }
369

370
                // Otherwise, we are removing a parent job.
371
                jobInfo, found := v.jobInfoMap[annID]
3✔
372
                if !found {
3✔
NEW
373
                        // NOTE: Some sort of consistency guarantee has been
×
NEW
374
                        // broken.
×
NEW
375
                        return fmt.Errorf("no job info found for "+
×
NEW
376
                                "identifier(%v)", id)
×
NEW
377
                }
×
378

379
                jobInfo.activeParentJobIDs.Remove(id)
3✔
380

3✔
381
                lastJob := jobInfo.activeParentJobIDs.IsEmpty()
3✔
382

3✔
383
                // Notify all dependent jobs that a parent job has completed.
3✔
384
                for child := range jobInfo.activeDependentJobs {
6✔
385
                        notifyChan, ok := v.childJobChans[child]
3✔
386
                        if !ok {
3✔
NEW
387
                                // NOTE: Some sort of consistency guarantee has
×
NEW
388
                                // been broken.
×
NEW
389
                                return fmt.Errorf("no job info found for "+
×
NEW
390
                                        "identifier(%v)", id)
×
NEW
391
                        }
×
392

393
                        // We don't want to block when sending out the signal.
394
                        select {
3✔
395
                        case notifyChan <- struct{}{}:
3✔
396
                        default:
3✔
397
                        }
398

399
                        // If this is the last parent job for this id, also
400
                        // close the channel. This is needed because it's
401
                        // possible that the parent job cleans up the job
402
                        // mappings before the goroutine handling the child job
403
                        // has a change to call WaitForParents and catch the
404
                        // signal sent above. We are allowed to close because
405
                        // no other parent job will be able to send along the
406
                        // channel (or close) as we're removing the entry from
407
                        // the jobInfoMap below.
408
                        if lastJob {
6✔
409
                                close(notifyChan)
3✔
410
                        }
3✔
411
                }
412

413
                // Remove from jobInfoMap if last job.
414
                if lastJob {
6✔
415
                        delete(v.jobInfoMap, annID)
3✔
416
                }
3✔
417

418
                return nil
3✔
419
        }
420

421
        switch msg := job.(type) {
3✔
422
        case *lnwire.ChannelAnnouncement1:
3✔
423
                // Signal to the child jobs that parent validation has
3✔
424
                // finished. We have to call removeParentJob for each annID
3✔
425
                // that this ChannelAnnouncement can be associated with.
3✔
426
                err := removeJob(msg.ShortChannelID, id, false)
3✔
427
                if err != nil {
3✔
NEW
428
                        return err
×
NEW
429
                }
×
430

431
                err = removeJob(route.Vertex(msg.NodeID1), id, false)
3✔
432
                if err != nil {
3✔
NEW
433
                        return err
×
NEW
434
                }
×
435

436
                err = removeJob(route.Vertex(msg.NodeID2), id, false)
3✔
437
                if err != nil {
3✔
NEW
438
                        return err
×
UNCOV
439
                }
×
440

441
                return nil
3✔
442

443
        case *lnwire.NodeAnnouncement:
3✔
444
                // Remove child job info.
3✔
445
                return removeJob(route.Vertex(msg.NodeID), id, true)
3✔
446

447
        case *lnwire.ChannelUpdate1:
3✔
448
                // Remove child job info.
3✔
449
                return removeJob(msg.ShortChannelID, id, true)
3✔
450

451
        case *lnwire.AnnounceSignatures1:
×
NEW
452
                // No dependency mappings are stored for AnnounceSignatures1,
×
NEW
453
                // so do nothing.
×
NEW
454
                return nil
×
455
        }
456

NEW
457
        return errors.New("invalid message - no job dependencies")
×
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc