• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.69
/discovery/validation_barrier.go
1
package discovery
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/go-errors/errors"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
var (
15
        // ErrVBarrierShuttingDown signals that the barrier has been requested
16
        // to shutdown, and that the caller should not treat the wait condition
17
        // as fulfilled.
18
        ErrVBarrierShuttingDown = errors.New("ValidationBarrier shutting down")
19
)
20

21
// JobID identifies an active job in the validation barrier. It is large so
22
// that we don't need to worry about overflows.
23
type JobID uint64
24

25
// jobInfo stores job dependency info for a set of dependent gossip messages.
26
type jobInfo struct {
27
        // activeParentJobIDs is the set of active parent job ids.
28
        activeParentJobIDs fn.Set[JobID]
29

30
        // activeDependentJobs is the set of active dependent job ids.
31
        activeDependentJobs fn.Set[JobID]
32
}
33

34
// ValidationBarrier is a barrier used to enforce a strict validation order
35
// while concurrently validating other updates for channel edges. It uses a set
36
// of maps to track validation dependencies. This is needed in practice because
37
// gossip messages for a given channel may arive in order, but then due to
38
// scheduling in different goroutines, may be validated in the wrong order.
39
// With the ValidationBarrier, the dependent update will wait until the parent
40
// update completes.
41
type ValidationBarrier struct {
42
        // validationSemaphore is a channel of structs which is used as a
43
        // semaphore. Initially we'll fill this with a buffered channel of the
44
        // size of the number of active requests. Each new job will consume
45
        // from this channel, then restore the value upon completion.
46
        validationSemaphore chan struct{}
47

48
        // jobInfoMap stores the set of job ids for each channel.
49
        // NOTE: This MUST be used with the mutex.
50
        // NOTE: This currently stores string representations of
51
        // lnwire.ShortChannelID and route.Vertex. Since these are of different
52
        // lengths, collision cannot occur in their string representations.
53
        // N.B.: Check that any new string-converted types don't collide with
54
        // existing string-converted types.
55
        jobInfoMap map[string]*jobInfo
56

57
        // jobDependencies is a mapping from a child's JobID to the set of
58
        // parent JobID that it depends on.
59
        // NOTE: This MUST be used with the mutex.
60
        jobDependencies map[JobID]fn.Set[JobID]
61

62
        // childJobChans stores the notification channel that each child job
63
        // listens on for parent job completions.
64
        // NOTE: This MUST be used with the mutex.
65
        childJobChans map[JobID]chan struct{}
66

67
        // idCtr is an atomic integer that is used to assign JobIDs.
68
        idCtr atomic.Uint64
69

70
        quit chan struct{}
71
        sync.Mutex
72
}
73

74
// NewValidationBarrier creates a new instance of a validation barrier given
75
// the total number of active requests, and a quit channel which will be used
76
// to know when to kill pending, but unfilled jobs.
77
func NewValidationBarrier(numActiveReqs int,
78
        quitChan chan struct{}) *ValidationBarrier {
3✔
79

3✔
80
        v := &ValidationBarrier{
3✔
81
                jobInfoMap:      make(map[string]*jobInfo),
3✔
82
                jobDependencies: make(map[JobID]fn.Set[JobID]),
3✔
83
                childJobChans:   make(map[JobID]chan struct{}),
3✔
84
                quit:            quitChan,
3✔
85
        }
3✔
86

3✔
87
        // We'll first initialize a set of semaphores to limit our concurrency
3✔
88
        // when validating incoming requests in parallel.
3✔
89
        v.validationSemaphore = make(chan struct{}, numActiveReqs)
3✔
90
        for i := 0; i < numActiveReqs; i++ {
6✔
91
                v.validationSemaphore <- struct{}{}
3✔
92
        }
3✔
93

94
        return v
3✔
95
}
96

97
// InitJobDependencies will wait for a new job slot to become open, and then
98
// sets up any dependent signals/trigger for the new job.
99
func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID,
100
        error) {
3✔
101

3✔
102
        // We'll wait for either a new slot to become open, or for the quit
3✔
103
        // channel to be closed.
3✔
104
        select {
3✔
105
        case <-v.validationSemaphore:
3✔
106
        case <-v.quit:
×
107
        }
108

109
        v.Lock()
3✔
110
        defer v.Unlock()
3✔
111

3✔
112
        // updateOrCreateJobInfo modifies the set of activeParentJobs for this
3✔
113
        // annID and updates jobInfoMap.
3✔
114
        updateOrCreateJobInfo := func(annID string, annJobID JobID) {
6✔
115
                info, ok := v.jobInfoMap[annID]
3✔
116
                if ok {
6✔
117
                        // If an entry already exists for annID, then a job
3✔
118
                        // related to it is being validated. Add to the set of
3✔
119
                        // parent job ids. This addition will only affect
3✔
120
                        // _later_, _child_ jobs for the annID.
3✔
121
                        info.activeParentJobIDs.Add(annJobID)
3✔
122
                        return
3✔
123
                }
3✔
124

125
                // No entry exists for annID, meaning that we should create
126
                // one.
127
                parentJobSet := fn.NewSet(annJobID)
3✔
128

3✔
129
                info = &jobInfo{
3✔
130
                        activeParentJobIDs:  parentJobSet,
3✔
131
                        activeDependentJobs: fn.NewSet[JobID](),
3✔
132
                }
3✔
133
                v.jobInfoMap[annID] = info
3✔
134
        }
135

136
        // populateDependencies populates the job dependency mappings (i.e.
137
        // which should complete after another) for the (annID, childJobID)
138
        // tuple.
139
        populateDependencies := func(annID string, childJobID JobID) {
6✔
140
                // If there is no entry in the jobInfoMap, we don't have to
3✔
141
                // wait on any parent jobs to finish.
3✔
142
                info, ok := v.jobInfoMap[annID]
3✔
143
                if !ok {
6✔
144
                        return
3✔
145
                }
3✔
146

147
                // We want to see a snapshot of active parent jobs for this
148
                // annID that are already registered in activeParentJobIDs. The
149
                // child job identified by childJobID can only run after these
150
                // parent jobs have run. After grabbing the snapshot, we then
151
                // want to persist a slice of these jobs.
152

153
                // Create the notification chan that parent jobs will send (or
154
                // close) on when they complete.
155
                jobChan := make(chan struct{})
3✔
156

3✔
157
                // Add to set of activeDependentJobs for this annID.
3✔
158
                info.activeDependentJobs.Add(childJobID)
3✔
159

3✔
160
                // Store in childJobChans. The parent jobs will fetch this chan
3✔
161
                // to notify on. The child job will later fetch this chan to
3✔
162
                // listen on when WaitForParents is called.
3✔
163
                v.childJobChans[childJobID] = jobChan
3✔
164

3✔
165
                // Copy over the parent job IDs at this moment for this annID.
3✔
166
                // This job must be processed AFTER those parent IDs.
3✔
167
                parentJobs := info.activeParentJobIDs.Copy()
3✔
168

3✔
169
                // Populate the jobDependencies mapping.
3✔
170
                v.jobDependencies[childJobID] = parentJobs
3✔
171
        }
172

173
        // Once a slot is open, we'll examine the message of the job, to see if
174
        // there need to be any dependent barriers set up.
175
        switch msg := job.(type) {
3✔
176
        case *lnwire.ChannelAnnouncement1:
3✔
177
                id := JobID(v.idCtr.Add(1))
3✔
178

3✔
179
                updateOrCreateJobInfo(msg.ShortChannelID.String(), id)
3✔
180
                updateOrCreateJobInfo(route.Vertex(msg.NodeID1).String(), id)
3✔
181
                updateOrCreateJobInfo(route.Vertex(msg.NodeID2).String(), id)
3✔
182

3✔
183
                return id, nil
3✔
184

185
        // Populate the dependency mappings for the below child jobs.
186
        case *lnwire.ChannelUpdate1:
3✔
187
                childJobID := JobID(v.idCtr.Add(1))
3✔
188
                populateDependencies(msg.ShortChannelID.String(), childJobID)
3✔
189

3✔
190
                return childJobID, nil
3✔
191
        case *lnwire.NodeAnnouncement:
3✔
192
                childJobID := JobID(v.idCtr.Add(1))
3✔
193
                populateDependencies(
3✔
194
                        route.Vertex(msg.NodeID).String(), childJobID,
3✔
195
                )
3✔
196

3✔
197
                return childJobID, nil
3✔
198
        case *lnwire.AnnounceSignatures1:
×
199
                // TODO(roasbeef): need to wait on chan ann?
×
200
                // - We can do the above by calling populateDependencies. For
×
201
                //   now, while we evaluate potential side effects, don't do
×
202
                //   anything with childJobID and just return it.
×
203
                childJobID := JobID(v.idCtr.Add(1))
×
204
                return childJobID, nil
×
205

206
        default:
×
207
                // An invalid message was passed into InitJobDependencies.
×
208
                // Return an error.
×
209
                return JobID(0), errors.New("invalid message")
×
210
        }
211
}
212

213
// CompleteJob returns a free slot to the set of available job slots. This
214
// should be called once a job has been fully completed. Otherwise, slots may
215
// not be returned to the internal scheduling, causing a deadlock when a new
216
// overflow job is attempted.
217
func (v *ValidationBarrier) CompleteJob() {
3✔
218
        select {
3✔
219
        case v.validationSemaphore <- struct{}{}:
3✔
220
        case <-v.quit:
×
221
        }
222
}
223

224
// WaitForParents will block until all parent job dependencies have went
225
// through the validation pipeline. This allows us a graceful way to run jobs
226
// in goroutines and still have strict ordering guarantees. If this job doesn't
227
// have any parent job dependencies, then this function will return
228
// immediately.
229
func (v *ValidationBarrier) WaitForParents(childJobID JobID,
230
        job interface{}) error {
3✔
231

3✔
232
        var (
3✔
233
                ok      bool
3✔
234
                jobDesc string
3✔
235

3✔
236
                parentJobIDs fn.Set[JobID]
3✔
237
                annID        string
3✔
238
                jobChan      chan struct{}
3✔
239
        )
3✔
240

3✔
241
        // Acquire a lock to read ValidationBarrier.
3✔
242
        v.Lock()
3✔
243

3✔
244
        switch msg := job.(type) {
3✔
245
        // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
246
        // completion of any active ChannelAnnouncement jobs related to them.
247
        case *lnwire.ChannelUpdate1:
3✔
248
                annID = msg.ShortChannelID.String()
3✔
249

3✔
250
                parentJobIDs, ok = v.jobDependencies[childJobID]
3✔
251
                if !ok {
6✔
252
                        // If ok is false, it means that this child job never
3✔
253
                        // had any parent jobs to wait on.
3✔
254
                        v.Unlock()
3✔
255
                        return nil
3✔
256
                }
3✔
257

258
                jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v",
3✔
259
                        msg.ShortChannelID.ToUint64())
3✔
260

261
        case *lnwire.NodeAnnouncement:
3✔
262
                annID = route.Vertex(msg.NodeID).String()
3✔
263

3✔
264
                parentJobIDs, ok = v.jobDependencies[childJobID]
3✔
265
                if !ok {
6✔
266
                        // If ok is false, it means that this child job never
3✔
267
                        // had any parent jobs to wait on.
3✔
268
                        v.Unlock()
3✔
269
                        return nil
3✔
270
                }
3✔
271

272
                jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s",
3✔
273
                        route.Vertex(msg.NodeID))
3✔
274

275
        // Other types of jobs can be executed immediately, so we'll just
276
        // return directly.
277
        case *lnwire.AnnounceSignatures1:
×
278
                // TODO(roasbeef): need to wait on chan ann?
×
279
                v.Unlock()
×
280
                return nil
×
281

282
        case *lnwire.ChannelAnnouncement1:
3✔
283
                v.Unlock()
3✔
284
                return nil
3✔
285
        }
286

287
        // Release the lock once the above read is finished.
288
        v.Unlock()
3✔
289

3✔
290
        log.Debugf("Waiting for dependent on %s", jobDesc)
3✔
291

3✔
292
        v.Lock()
3✔
293
        jobChan, ok = v.childJobChans[childJobID]
3✔
294
        if !ok {
3✔
295
                v.Unlock()
×
296

×
297
                // The entry may not exist because this job does not depend on
×
298
                // any parent jobs.
×
299
                return nil
×
300
        }
×
301
        v.Unlock()
3✔
302

3✔
303
        for {
6✔
304
                select {
3✔
UNCOV
305
                case <-v.quit:
×
UNCOV
306
                        return ErrVBarrierShuttingDown
×
307

308
                case <-jobChan:
3✔
309
                        // Every time this is sent on or if it's closed, a
3✔
310
                        // parent job has finished. The parent jobs have to
3✔
311
                        // also potentially close the channel because if all
3✔
312
                        // the parent jobs finish and call SignalDependents
3✔
313
                        // before the goroutine running WaitForParents has a
3✔
314
                        // chance to grab the notification chan from
3✔
315
                        // childJobChans, then the running goroutine will wait
3✔
316
                        // here for a notification forever. By having the last
3✔
317
                        // parent job close the notificiation chan, we avoid
3✔
318
                        // this issue.
3✔
319

3✔
320
                        // Check and see if we have any parent jobs left. If we
3✔
321
                        // don't, we can finish up.
3✔
322
                        v.Lock()
3✔
323
                        info, found := v.jobInfoMap[annID]
3✔
324
                        if !found {
6✔
325
                                v.Unlock()
3✔
326

3✔
327
                                // No parent job info found, proceed with
3✔
328
                                // validation.
3✔
329
                                return nil
3✔
330
                        }
3✔
331

332
                        x := parentJobIDs.Intersect(info.activeParentJobIDs)
3✔
333
                        v.Unlock()
3✔
334
                        if x.IsEmpty() {
6✔
335
                                // The parent jobs have all completed. We can
3✔
336
                                // proceed with validation.
3✔
337
                                return nil
3✔
338
                        }
3✔
339

340
                        // If we've reached this point, we are still waiting on
341
                        // a parent job to complete.
342
                }
343
        }
344
}
345

346
// SignalDependents signals to any child jobs that this parent job has
347
// finished.
348
func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error {
3✔
349
        v.Lock()
3✔
350
        defer v.Unlock()
3✔
351

3✔
352
        // removeJob either removes a child job or a parent job. If it is
3✔
353
        // removing a child job, then it removes the child's JobID from the set
3✔
354
        // of dependent jobs for the announcement ID. If this is removing a
3✔
355
        // parent job, then it removes the parentJobID from the set of active
3✔
356
        // parent jobs and notifies the child jobs that it has finished
3✔
357
        // validating.
3✔
358
        removeJob := func(annID string, id JobID, child bool) error {
6✔
359
                if child {
6✔
360
                        // If we're removing a child job, check jobInfoMap and
3✔
361
                        // remove this job from activeDependentJobs.
3✔
362
                        info, ok := v.jobInfoMap[annID]
3✔
363
                        if ok {
6✔
364
                                info.activeDependentJobs.Remove(id)
3✔
365
                        }
3✔
366

367
                        // Remove the notification chan from childJobChans.
368
                        delete(v.childJobChans, id)
3✔
369

3✔
370
                        // Remove this job's dependency mapping.
3✔
371
                        delete(v.jobDependencies, id)
3✔
372

3✔
373
                        return nil
3✔
374
                }
375

376
                // Otherwise, we are removing a parent job.
377
                jobInfo, found := v.jobInfoMap[annID]
3✔
378
                if !found {
3✔
379
                        // NOTE: Some sort of consistency guarantee has been
×
380
                        // broken.
×
381
                        return fmt.Errorf("no job info found for "+
×
382
                                "identifier(%v)", id)
×
383
                }
×
384

385
                jobInfo.activeParentJobIDs.Remove(id)
3✔
386

3✔
387
                lastJob := jobInfo.activeParentJobIDs.IsEmpty()
3✔
388

3✔
389
                // Notify all dependent jobs that a parent job has completed.
3✔
390
                for child := range jobInfo.activeDependentJobs {
6✔
391
                        notifyChan, ok := v.childJobChans[child]
3✔
392
                        if !ok {
3✔
393
                                // NOTE: Some sort of consistency guarantee has
×
394
                                // been broken.
×
395
                                return fmt.Errorf("no job info found for "+
×
396
                                        "identifier(%v)", id)
×
397
                        }
×
398

399
                        // We don't want to block when sending out the signal.
400
                        select {
3✔
401
                        case notifyChan <- struct{}{}:
3✔
402
                        default:
3✔
403
                        }
404

405
                        // If this is the last parent job for this annID, also
406
                        // close the channel. This is needed because it's
407
                        // possible that the parent job cleans up the job
408
                        // mappings before the goroutine handling the child job
409
                        // has a chance to call WaitForParents and catch the
410
                        // signal sent above. We are allowed to close because
411
                        // no other parent job will be able to send along the
412
                        // channel (or close) as we're removing the entry from
413
                        // the jobInfoMap below.
414
                        if lastJob {
6✔
415
                                close(notifyChan)
3✔
416
                        }
3✔
417
                }
418

419
                // Remove from jobInfoMap if last job.
420
                if lastJob {
6✔
421
                        delete(v.jobInfoMap, annID)
3✔
422
                }
3✔
423

424
                return nil
3✔
425
        }
426

427
        switch msg := job.(type) {
3✔
428
        case *lnwire.ChannelAnnouncement1:
3✔
429
                // Signal to the child jobs that parent validation has
3✔
430
                // finished. We have to call removeJob for each annID
3✔
431
                // that this ChannelAnnouncement can be associated with.
3✔
432
                err := removeJob(msg.ShortChannelID.String(), id, false)
3✔
433
                if err != nil {
3✔
434
                        return err
×
435
                }
×
436

437
                err = removeJob(route.Vertex(msg.NodeID1).String(), id, false)
3✔
438
                if err != nil {
3✔
439
                        return err
×
440
                }
×
441

442
                err = removeJob(route.Vertex(msg.NodeID2).String(), id, false)
3✔
443
                if err != nil {
3✔
444
                        return err
×
445
                }
×
446

447
                return nil
3✔
448

449
        case *lnwire.NodeAnnouncement:
3✔
450
                // Remove child job info.
3✔
451
                return removeJob(route.Vertex(msg.NodeID).String(), id, true)
3✔
452

453
        case *lnwire.ChannelUpdate1:
3✔
454
                // Remove child job info.
3✔
455
                return removeJob(msg.ShortChannelID.String(), id, true)
3✔
456

457
        case *lnwire.AnnounceSignatures1:
×
458
                // No dependency mappings are stored for AnnounceSignatures1,
×
459
                // so do nothing.
×
460
                return nil
×
461
        }
462

463
        return errors.New("invalid message - no job dependencies")
×
464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc