• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.21
/discovery/validation_barrier.go
1
package discovery
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/go-errors/errors"
9
        "github.com/lightningnetwork/lnd/fn/v2"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
var (
15
        // ErrVBarrierShuttingDown signals that the barrier has been requested
16
        // to shutdown, and that the caller should not treat the wait condition
17
        // as fulfilled.
18
        ErrVBarrierShuttingDown = errors.New("ValidationBarrier shutting down")
19
)
20

21
// JobID identifies an active job in the validation barrier. It is large so
22
// that we don't need to worry about overflows.
23
type JobID uint64
24

25
// jobInfo stores job dependency info for a set of dependent gossip messages.
26
type jobInfo struct {
27
        // activeParentJobIDs is the set of active parent job ids.
28
        activeParentJobIDs fn.Set[JobID]
29

30
        // activeDependentJobs is the set of active dependent job ids.
31
        activeDependentJobs fn.Set[JobID]
32
}
33

34
// ValidationBarrier is a barrier used to enforce a strict validation order
35
// while concurrently validating other updates for channel edges. It uses a set
36
// of maps to track validation dependencies. This is needed in practice because
37
// gossip messages for a given channel may arive in order, but then due to
38
// scheduling in different goroutines, may be validated in the wrong order.
39
// With the ValidationBarrier, the dependent update will wait until the parent
40
// update completes.
41
type ValidationBarrier struct {
42
        // validationSemaphore is a channel of structs which is used as a
43
        // semaphore. Initially we'll fill this with a buffered channel of the
44
        // size of the number of active requests. Each new job will consume
45
        // from this channel, then restore the value upon completion.
46
        validationSemaphore chan struct{}
47

48
        // jobInfoMap stores the set of job ids for each channel.
49
        // NOTE: This MUST be used with the mutex.
50
        // NOTE: This currently stores string representations of
51
        // lnwire.ShortChannelID and route.Vertex. Since these are of different
52
        // lengths, collision cannot occur in their string representations.
53
        // N.B.: Check that any new string-converted types don't collide with
54
        // existing string-converted types.
55
        jobInfoMap map[string]*jobInfo
56

57
        // jobDependencies is a mapping from a child's JobID to the set of
58
        // parent JobID that it depends on.
59
        // NOTE: This MUST be used with the mutex.
60
        jobDependencies map[JobID]fn.Set[JobID]
61

62
        // childJobChans stores the notification channel that each child job
63
        // listens on for parent job completions.
64
        // NOTE: This MUST be used with the mutex.
65
        childJobChans map[JobID]chan struct{}
66

67
        // idCtr is an atomic integer that is used to assign JobIDs.
68
        idCtr atomic.Uint64
69

70
        quit chan struct{}
71
        sync.Mutex
72
}
73

74
// NewValidationBarrier creates a new instance of a validation barrier given
75
// the total number of active requests, and a quit channel which will be used
76
// to know when to kill pending, but unfilled jobs.
77
func NewValidationBarrier(numActiveReqs int,
78
        quitChan chan struct{}) *ValidationBarrier {
30✔
79

30✔
80
        v := &ValidationBarrier{
30✔
81
                jobInfoMap:      make(map[string]*jobInfo),
30✔
82
                jobDependencies: make(map[JobID]fn.Set[JobID]),
30✔
83
                childJobChans:   make(map[JobID]chan struct{}),
30✔
84
                quit:            quitChan,
30✔
85
        }
30✔
86

30✔
87
        // We'll first initialize a set of semaphores to limit our concurrency
30✔
88
        // when validating incoming requests in parallel.
30✔
89
        v.validationSemaphore = make(chan struct{}, numActiveReqs)
30✔
90
        for i := 0; i < numActiveReqs; i++ {
27,062✔
91
                v.validationSemaphore <- struct{}{}
27,032✔
92
        }
27,032✔
93

94
        return v
30✔
95
}
96

97
// InitJobDependencies will wait for a new job slot to become open, and then
98
// sets up any dependent signals/trigger for the new job.
99
func (v *ValidationBarrier) InitJobDependencies(job interface{}) (JobID,
100
        error) {
346✔
101

346✔
102
        // We'll wait for either a new slot to become open, or for the quit
346✔
103
        // channel to be closed.
346✔
104
        select {
346✔
105
        case <-v.validationSemaphore:
346✔
106
        case <-v.quit:
×
107
        }
108

109
        v.Lock()
346✔
110
        defer v.Unlock()
346✔
111

346✔
112
        // updateOrCreateJobInfo modifies the set of activeParentJobs for this
346✔
113
        // annID and updates jobInfoMap.
346✔
114
        updateOrCreateJobInfo := func(annID string, annJobID JobID) {
1,069✔
115
                info, ok := v.jobInfoMap[annID]
723✔
116
                if ok {
726✔
117
                        // If an entry already exists for annID, then a job
3✔
118
                        // related to it is being validated. Add to the set of
3✔
119
                        // parent job ids. This addition will only affect
3✔
120
                        // _later_, _child_ jobs for the annID.
3✔
121
                        info.activeParentJobIDs.Add(annJobID)
3✔
122
                        return
3✔
123
                }
3✔
124

125
                // No entry exists for annID, meaning that we should create
126
                // one.
127
                parentJobSet := fn.NewSet(annJobID)
720✔
128

720✔
129
                info = &jobInfo{
720✔
130
                        activeParentJobIDs:  parentJobSet,
720✔
131
                        activeDependentJobs: fn.NewSet[JobID](),
720✔
132
                }
720✔
133
                v.jobInfoMap[annID] = info
720✔
134
        }
135

136
        // populateDependencies populates the job dependency mappings (i.e.
137
        // which should complete after another) for the (annID, childJobID)
138
        // tuple.
139
        populateDependencies := func(annID string, childJobID JobID) {
451✔
140
                // If there is no entry in the jobInfoMap, we don't have to
105✔
141
                // wait on any parent jobs to finish.
105✔
142
                info, ok := v.jobInfoMap[annID]
105✔
143
                if !ok {
200✔
144
                        return
95✔
145
                }
95✔
146

147
                // We want to see a snapshot of active parent jobs for this
148
                // annID that are already registered in activeParentJobIDs. The
149
                // child job identified by childJobID can only run after these
150
                // parent jobs have run. After grabbing the snapshot, we then
151
                // want to persist a slice of these jobs.
152

153
                // Create the notification chan that parent jobs will send (or
154
                // close) on when they complete.
155
                jobChan := make(chan struct{})
10✔
156

10✔
157
                // Add to set of activeDependentJobs for this annID.
10✔
158
                info.activeDependentJobs.Add(childJobID)
10✔
159

10✔
160
                // Store in childJobChans. The parent jobs will fetch this chan
10✔
161
                // to notify on. The child job will later fetch this chan to
10✔
162
                // listen on when WaitForParents is called.
10✔
163
                v.childJobChans[childJobID] = jobChan
10✔
164

10✔
165
                // Copy over the parent job IDs at this moment for this annID.
10✔
166
                // This job must be processed AFTER those parent IDs.
10✔
167
                parentJobs := info.activeParentJobIDs.Copy()
10✔
168

10✔
169
                // Populate the jobDependencies mapping.
10✔
170
                v.jobDependencies[childJobID] = parentJobs
10✔
171
        }
172

173
        // Once a slot is open, we'll examine the message of the job, to see if
174
        // there need to be any dependent barriers set up.
175
        switch msg := job.(type) {
346✔
176
        case *lnwire.ChannelAnnouncement1:
241✔
177
                id := JobID(v.idCtr.Add(1))
241✔
178

241✔
179
                updateOrCreateJobInfo(msg.ShortChannelID.String(), id)
241✔
180
                updateOrCreateJobInfo(route.Vertex(msg.NodeID1).String(), id)
241✔
181
                updateOrCreateJobInfo(route.Vertex(msg.NodeID2).String(), id)
241✔
182

241✔
183
                return id, nil
241✔
184

185
        // Populate the dependency mappings for the below child jobs.
186
        case *lnwire.ChannelUpdate1:
80✔
187
                childJobID := JobID(v.idCtr.Add(1))
80✔
188
                populateDependencies(msg.ShortChannelID.String(), childJobID)
80✔
189

80✔
190
                return childJobID, nil
80✔
191
        case *lnwire.NodeAnnouncement:
25✔
192
                childJobID := JobID(v.idCtr.Add(1))
25✔
193
                populateDependencies(
25✔
194
                        route.Vertex(msg.NodeID).String(), childJobID,
25✔
195
                )
25✔
196

25✔
197
                return childJobID, nil
25✔
198
        case *lnwire.AnnounceSignatures1:
×
199
                // TODO(roasbeef): need to wait on chan ann?
×
200
                // - We can do the above by calling populateDependencies. For
×
201
                //   now, while we evaluate potential side effects, don't do
×
202
                //   anything with childJobID and just return it.
×
203
                childJobID := JobID(v.idCtr.Add(1))
×
204
                return childJobID, nil
×
205

206
        default:
×
207
                // An invalid message was passed into InitJobDependencies.
×
208
                // Return an error.
×
209
                return JobID(0), errors.New("invalid message")
×
210
        }
211
}
212

213
// CompleteJob returns a free slot to the set of available job slots. This
214
// should be called once a job has been fully completed. Otherwise, slots may
215
// not be returned to the internal scheduling, causing a deadlock when a new
216
// overflow job is attempted.
217
func (v *ValidationBarrier) CompleteJob() {
321✔
218
        select {
321✔
219
        case v.validationSemaphore <- struct{}{}:
321✔
220
        case <-v.quit:
×
221
        }
222
}
223

224
// WaitForParents will block until all parent job dependencies have went
225
// through the validation pipeline. This allows us a graceful way to run jobs
226
// in goroutines and still have strict ordering guarantees. If this job doesn't
227
// have any parent job dependencies, then this function will return
228
// immediately.
229
func (v *ValidationBarrier) WaitForParents(childJobID JobID,
230
        job interface{}) error {
319✔
231

319✔
232
        var (
319✔
233
                ok      bool
319✔
234
                jobDesc string
319✔
235

319✔
236
                parentJobIDs fn.Set[JobID]
319✔
237
                annID        string
319✔
238
                jobChan      chan struct{}
319✔
239
        )
319✔
240

319✔
241
        // Acquire a lock to read ValidationBarrier.
319✔
242
        v.Lock()
319✔
243

319✔
244
        switch msg := job.(type) {
319✔
245
        // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
246
        // completion of any active ChannelAnnouncement jobs related to them.
247
        case *lnwire.ChannelUpdate1:
64✔
248
                annID = msg.ShortChannelID.String()
64✔
249

64✔
250
                parentJobIDs, ok = v.jobDependencies[childJobID]
64✔
251
                if !ok {
119✔
252
                        // If ok is false, it means that this child job never
55✔
253
                        // had any parent jobs to wait on.
55✔
254
                        v.Unlock()
55✔
255
                        return nil
55✔
256
                }
55✔
257

258
                jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v",
9✔
259
                        msg.ShortChannelID.ToUint64())
9✔
260

261
        case *lnwire.NodeAnnouncement:
25✔
262
                annID = route.Vertex(msg.NodeID).String()
25✔
263

25✔
264
                parentJobIDs, ok = v.jobDependencies[childJobID]
25✔
265
                if !ok {
49✔
266
                        // If ok is false, it means that this child job never
24✔
267
                        // had any parent jobs to wait on.
24✔
268
                        v.Unlock()
24✔
269
                        return nil
24✔
270
                }
24✔
271

272
                jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s",
1✔
273
                        route.Vertex(msg.NodeID))
1✔
274

275
        // Other types of jobs can be executed immediately, so we'll just
276
        // return directly.
277
        case *lnwire.AnnounceSignatures1:
×
278
                // TODO(roasbeef): need to wait on chan ann?
×
279
                v.Unlock()
×
280
                return nil
×
281

282
        case *lnwire.ChannelAnnouncement1:
230✔
283
                v.Unlock()
230✔
284
                return nil
230✔
285
        }
286

287
        // Release the lock once the above read is finished.
288
        v.Unlock()
10✔
289

10✔
290
        log.Debugf("Waiting for dependent on %s", jobDesc)
10✔
291

10✔
292
        v.Lock()
10✔
293
        jobChan, ok = v.childJobChans[childJobID]
10✔
294
        if !ok {
10✔
295
                v.Unlock()
×
296

×
297
                // The entry may not exist because this job does not depend on
×
298
                // any parent jobs.
×
299
                return nil
×
300
        }
×
301
        v.Unlock()
10✔
302

10✔
303
        for {
20✔
304
                select {
10✔
305
                case <-v.quit:
4✔
306
                        return ErrVBarrierShuttingDown
4✔
307

308
                case <-jobChan:
6✔
309
                        // Every time this is sent on or if it's closed, a
6✔
310
                        // parent job has finished. The parent jobs have to
6✔
311
                        // also potentially close the channel because if all
6✔
312
                        // the parent jobs finish and call SignalDependents
6✔
313
                        // before the goroutine running WaitForParents has a
6✔
314
                        // chance to grab the notification chan from
6✔
315
                        // childJobChans, then the running goroutine will wait
6✔
316
                        // here for a notification forever. By having the last
6✔
317
                        // parent job close the notificiation chan, we avoid
6✔
318
                        // this issue.
6✔
319

6✔
320
                        // Check and see if we have any parent jobs left. If we
6✔
321
                        // don't, we can finish up.
6✔
322
                        v.Lock()
6✔
323
                        info, found := v.jobInfoMap[annID]
6✔
324
                        if !found {
12✔
325
                                v.Unlock()
6✔
326

6✔
327
                                // No parent job info found, proceed with
6✔
328
                                // validation.
6✔
329
                                return nil
6✔
330
                        }
6✔
331

UNCOV
332
                        x := parentJobIDs.Intersect(info.activeParentJobIDs)
×
UNCOV
333
                        v.Unlock()
×
UNCOV
334
                        if x.IsEmpty() {
×
UNCOV
335
                                // The parent jobs have all completed. We can
×
UNCOV
336
                                // proceed with validation.
×
UNCOV
337
                                return nil
×
UNCOV
338
                        }
×
339

340
                        // If we've reached this point, we are still waiting on
341
                        // a parent job to complete.
342
                }
343
        }
344
}
345

346
// SignalDependents signals to any child jobs that this parent job has
347
// finished.
348
func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) error {
316✔
349
        v.Lock()
316✔
350
        defer v.Unlock()
316✔
351

316✔
352
        // removeJob either removes a child job or a parent job. If it is
316✔
353
        // removing a child job, then it removes the child's JobID from the set
316✔
354
        // of dependent jobs for the announcement ID. If this is removing a
316✔
355
        // parent job, then it removes the parentJobID from the set of active
316✔
356
        // parent jobs and notifies the child jobs that it has finished
316✔
357
        // validating.
316✔
358
        removeJob := func(annID string, id JobID, child bool) error {
1,106✔
359
                if child {
869✔
360
                        // If we're removing a child job, check jobInfoMap and
79✔
361
                        // remove this job from activeDependentJobs.
79✔
362
                        info, ok := v.jobInfoMap[annID]
79✔
363
                        if ok {
79✔
UNCOV
364
                                info.activeDependentJobs.Remove(id)
×
UNCOV
365
                        }
×
366

367
                        // Remove the notification chan from childJobChans.
368
                        delete(v.childJobChans, id)
79✔
369

79✔
370
                        // Remove this job's dependency mapping.
79✔
371
                        delete(v.jobDependencies, id)
79✔
372

79✔
373
                        return nil
79✔
374
                }
375

376
                // Otherwise, we are removing a parent job.
377
                jobInfo, found := v.jobInfoMap[annID]
711✔
378
                if !found {
711✔
379
                        // NOTE: Some sort of consistency guarantee has been
×
380
                        // broken.
×
381
                        return fmt.Errorf("no job info found for "+
×
382
                                "identifier(%v)", id)
×
383
                }
×
384

385
                jobInfo.activeParentJobIDs.Remove(id)
711✔
386

711✔
387
                lastJob := jobInfo.activeParentJobIDs.IsEmpty()
711✔
388

711✔
389
                // Notify all dependent jobs that a parent job has completed.
711✔
390
                for child := range jobInfo.activeDependentJobs {
720✔
391
                        notifyChan, ok := v.childJobChans[child]
9✔
392
                        if !ok {
9✔
393
                                // NOTE: Some sort of consistency guarantee has
×
394
                                // been broken.
×
395
                                return fmt.Errorf("no job info found for "+
×
396
                                        "identifier(%v)", id)
×
397
                        }
×
398

399
                        // We don't want to block when sending out the signal.
400
                        select {
9✔
401
                        case notifyChan <- struct{}{}:
6✔
402
                        default:
3✔
403
                        }
404

405
                        // If this is the last parent job for this annID, also
406
                        // close the channel. This is needed because it's
407
                        // possible that the parent job cleans up the job
408
                        // mappings before the goroutine handling the child job
409
                        // has a chance to call WaitForParents and catch the
410
                        // signal sent above. We are allowed to close because
411
                        // no other parent job will be able to send along the
412
                        // channel (or close) as we're removing the entry from
413
                        // the jobInfoMap below.
414
                        if lastJob {
15✔
415
                                close(notifyChan)
6✔
416
                        }
6✔
417
                }
418

419
                // Remove from jobInfoMap if last job.
420
                if lastJob {
1,419✔
421
                        delete(v.jobInfoMap, annID)
708✔
422
                }
708✔
423

424
                return nil
711✔
425
        }
426

427
        switch msg := job.(type) {
316✔
428
        case *lnwire.ChannelAnnouncement1:
237✔
429
                // Signal to the child jobs that parent validation has
237✔
430
                // finished. We have to call removeJob for each annID
237✔
431
                // that this ChannelAnnouncement can be associated with.
237✔
432
                err := removeJob(msg.ShortChannelID.String(), id, false)
237✔
433
                if err != nil {
237✔
434
                        return err
×
435
                }
×
436

437
                err = removeJob(route.Vertex(msg.NodeID1).String(), id, false)
237✔
438
                if err != nil {
237✔
439
                        return err
×
440
                }
×
441

442
                err = removeJob(route.Vertex(msg.NodeID2).String(), id, false)
237✔
443
                if err != nil {
237✔
444
                        return err
×
445
                }
×
446

447
                return nil
237✔
448

449
        case *lnwire.NodeAnnouncement:
24✔
450
                // Remove child job info.
24✔
451
                return removeJob(route.Vertex(msg.NodeID).String(), id, true)
24✔
452

453
        case *lnwire.ChannelUpdate1:
55✔
454
                // Remove child job info.
55✔
455
                return removeJob(msg.ShortChannelID.String(), id, true)
55✔
456

457
        case *lnwire.AnnounceSignatures1:
×
458
                // No dependency mappings are stored for AnnounceSignatures1,
×
459
                // so do nothing.
×
460
                return nil
×
461
        }
462

463
        return errors.New("invalid message - no job dependencies")
×
464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc