• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12058234999

27 Nov 2024 09:06PM UTC coverage: 57.847% (-1.1%) from 58.921%
12058234999

Pull #9148

github

ProofOfKeags
lnwire: convert DynPropose and DynCommit to use typed tlv records
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

142 of 177 new or added lines in 4 files covered. (80.23%)

19365 existing lines in 251 files now uncovered.

100876 of 174383 relevant lines covered (57.85%)

25338.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.53
/graph/validation_barrier.go
1
package graph
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/lightningnetwork/lnd/channeldb"
8
        "github.com/lightningnetwork/lnd/channeldb/models"
9
        "github.com/lightningnetwork/lnd/lnwire"
10
        "github.com/lightningnetwork/lnd/routing/route"
11
)
12

13
// validationSignals contains two signals which allows the ValidationBarrier to
14
// communicate back to the caller whether a dependent should be processed or not
15
// based on whether its parent was successfully validated. Only one of these
16
// signals is to be used at a time.
17
type validationSignals struct {
18
        // allow is the signal used to allow a dependent to be processed.
19
        allow chan struct{}
20

21
        // deny is the signal used to prevent a dependent from being processed.
22
        deny chan struct{}
23
}
24

25
// ValidationBarrier is a barrier used to ensure proper validation order while
26
// concurrently validating new announcements for channel edges, and the
27
// attributes of channel edges.  It uses this set of maps (protected by this
28
// mutex) to track validation dependencies. For a given channel our
29
// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must
30
// validate the item on the left of the arrow before that on the right.
31
type ValidationBarrier struct {
32
        // validationSemaphore is a channel of structs which is used as a
33
        // semaphore. Initially we'll fill this with a buffered channel of the
34
        // size of the number of active requests. Each new job will consume
35
        // from this channel, then restore the value upon completion.
36
        validationSemaphore chan struct{}
37

38
        // chanAnnFinSignal is map that keep track of all the pending
39
        // ChannelAnnouncement like validation job going on. Once the job has
40
        // been completed, the channel will be closed unblocking any
41
        // dependants.
42
        chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals
43

44
        // chanEdgeDependencies tracks any channel edge updates which should
45
        // wait until the completion of the ChannelAnnouncement before
46
        // proceeding. This is a dependency, as we can't validate the update
47
        // before we validate the announcement which creates the channel
48
        // itself.
49
        chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals
50

51
        // nodeAnnDependencies tracks any pending NodeAnnouncement validation
52
        // jobs which should wait until the completion of the
53
        // ChannelAnnouncement before proceeding.
54
        nodeAnnDependencies map[route.Vertex]*validationSignals
55

56
        quit chan struct{}
57
        sync.Mutex
58
}
59

60
// NewValidationBarrier creates a new instance of a validation barrier given
61
// the total number of active requests, and a quit channel which will be used
62
// to know when to kill pending, but unfilled jobs.
63
func NewValidationBarrier(numActiveReqs int,
64
        quitChan chan struct{}) *ValidationBarrier {
50✔
65

50✔
66
        v := &ValidationBarrier{
50✔
67
                chanAnnFinSignal:     make(map[lnwire.ShortChannelID]*validationSignals),
50✔
68
                chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals),
50✔
69
                nodeAnnDependencies:  make(map[route.Vertex]*validationSignals),
50✔
70
                quit:                 quitChan,
50✔
71
        }
50✔
72

50✔
73
        // We'll first initialize a set of semaphores to limit our concurrency
50✔
74
        // when validating incoming requests in parallel.
50✔
75
        v.validationSemaphore = make(chan struct{}, numActiveReqs)
50✔
76
        for i := 0; i < numActiveReqs; i++ {
28,394✔
77
                v.validationSemaphore <- struct{}{}
28,344✔
78
        }
28,344✔
79

80
        return v
50✔
81
}
82

83
// InitJobDependencies will wait for a new job slot to become open, and then
84
// sets up any dependent signals/trigger for the new job
85
func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
371✔
86
        // We'll wait for either a new slot to become open, or for the quit
371✔
87
        // channel to be closed.
371✔
88
        select {
371✔
89
        case <-v.validationSemaphore:
371✔
90
        case <-v.quit:
×
91
        }
92

93
        v.Lock()
371✔
94
        defer v.Unlock()
371✔
95

371✔
96
        // Once a slot is open, we'll examine the message of the job, to see if
371✔
97
        // there need to be any dependent barriers set up.
371✔
98
        switch msg := job.(type) {
371✔
99

100
        // If this is a channel announcement, then we'll need to set up den
101
        // tenancies, as we'll need to verify this before we verify any
102
        // ChannelUpdates for the same channel, or NodeAnnouncements of nodes
103
        // that are involved in this channel. This goes for both the wire
104
        // type,s and also the types that we use within the database.
105
        case *lnwire.ChannelAnnouncement1:
238✔
106

238✔
107
                // We ensure that we only create a new announcement signal iff,
238✔
108
                // one doesn't already exist, as there may be duplicate
238✔
109
                // announcements.  We'll close this signal once the
238✔
110
                // ChannelAnnouncement has been validated. This will result in
238✔
111
                // all the dependent jobs being unlocked so they can finish
238✔
112
                // execution themselves.
238✔
113
                if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok {
476✔
114
                        // We'll create the channel that we close after we
238✔
115
                        // validate this announcement. All dependants will
238✔
116
                        // point to this same channel, so they'll be unblocked
238✔
117
                        // at the same time.
238✔
118
                        signals := &validationSignals{
238✔
119
                                allow: make(chan struct{}),
238✔
120
                                deny:  make(chan struct{}),
238✔
121
                        }
238✔
122

238✔
123
                        v.chanAnnFinSignal[msg.ShortChannelID] = signals
238✔
124
                        v.chanEdgeDependencies[msg.ShortChannelID] = signals
238✔
125

238✔
126
                        v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals
238✔
127
                        v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals
238✔
128
                }
238✔
129
        case *models.ChannelEdgeInfo:
17✔
130

17✔
131
                shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
17✔
132
                if _, ok := v.chanAnnFinSignal[shortID]; !ok {
34✔
133
                        signals := &validationSignals{
17✔
134
                                allow: make(chan struct{}),
17✔
135
                                deny:  make(chan struct{}),
17✔
136
                        }
17✔
137

17✔
138
                        v.chanAnnFinSignal[shortID] = signals
17✔
139
                        v.chanEdgeDependencies[shortID] = signals
17✔
140

17✔
141
                        v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals
17✔
142
                        v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals
17✔
143
                }
17✔
144

145
        // These other types don't have any dependants, so no further
146
        // initialization needs to be done beyond just occupying a job slot.
147
        case *models.ChannelEdgePolicy:
6✔
148
                return
6✔
149
        case *lnwire.ChannelUpdate1:
63✔
150
                return
63✔
151
        case *lnwire.NodeAnnouncement:
24✔
152
                // TODO(roasbeef): node ann needs to wait on existing channel updates
24✔
153
                return
24✔
154
        case *channeldb.LightningNode:
7✔
155
                return
7✔
156
        case *lnwire.AnnounceSignatures1:
×
157
                // TODO(roasbeef): need to wait on chan ann?
×
158
                return
×
159
        }
160
}
161

162
// CompleteJob returns a free slot to the set of available job slots. This
163
// should be called once a job has been fully completed. Otherwise, slots may
164
// not be returned to the internal scheduling, causing a deadlock when a new
165
// overflow job is attempted.
166
func (v *ValidationBarrier) CompleteJob() {
351✔
167
        select {
351✔
168
        case v.validationSemaphore <- struct{}{}:
351✔
UNCOV
169
        case <-v.quit:
×
170
        }
171
}
172

173
// WaitForDependants will block until any jobs that this job dependants on have
174
// finished executing. This allows us a graceful way to schedule goroutines
175
// based on any pending uncompleted dependent jobs. If this job doesn't have an
176
// active dependent, then this function will return immediately.
177
func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
347✔
178

347✔
179
        var (
347✔
180
                signals *validationSignals
347✔
181
                ok      bool
347✔
182
                jobDesc string
347✔
183
        )
347✔
184

347✔
185
        // Acquire a lock to read ValidationBarrier.
347✔
186
        v.Lock()
347✔
187

347✔
188
        switch msg := job.(type) {
347✔
189
        // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
190
        // completion of any active ChannelAnnouncement jobs related to them.
191
        case *models.ChannelEdgePolicy:
6✔
192
                shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
6✔
193
                signals, ok = v.chanEdgeDependencies[shortID]
6✔
194

6✔
195
                jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v",
6✔
196
                        msg.ChannelID)
6✔
197

198
        case *channeldb.LightningNode:
7✔
199
                vertex := route.Vertex(msg.PubKeyBytes)
7✔
200
                signals, ok = v.nodeAnnDependencies[vertex]
7✔
201

7✔
202
                jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s",
7✔
203
                        vertex)
7✔
204

205
        case *lnwire.ChannelUpdate1:
63✔
206
                signals, ok = v.chanEdgeDependencies[msg.ShortChannelID]
63✔
207

63✔
208
                jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v",
63✔
209
                        msg.ShortChannelID.ToUint64())
63✔
210

211
        case *lnwire.NodeAnnouncement:
24✔
212
                vertex := route.Vertex(msg.NodeID)
24✔
213
                signals, ok = v.nodeAnnDependencies[vertex]
24✔
214
                jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s",
24✔
215
                        vertex)
24✔
216

217
        // Other types of jobs can be executed immediately, so we'll just
218
        // return directly.
219
        case *lnwire.AnnounceSignatures1:
×
220
                // TODO(roasbeef): need to wait on chan ann?
221
        case *models.ChannelEdgeInfo:
17✔
222
        case *lnwire.ChannelAnnouncement1:
230✔
223
        }
224

225
        // Release the lock once the above read is finished.
226
        v.Unlock()
347✔
227

347✔
228
        // If it's not ok, it means either the job is not a dependent type, or
347✔
229
        // it doesn't have a dependency signal. Either way, we can return
347✔
230
        // early.
347✔
231
        if !ok {
658✔
232
                return nil
311✔
233
        }
311✔
234

235
        log.Debugf("Waiting for dependent on %s", jobDesc)
36✔
236

36✔
237
        // If we do have an active job, then we'll wait until either the signal
36✔
238
        // is closed, or the set of jobs exits.
36✔
239
        select {
36✔
240
        case <-v.quit:
4✔
241
                return NewErrf(ErrVBarrierShuttingDown,
4✔
242
                        "validation barrier shutting down")
4✔
243

244
        case <-signals.deny:
2✔
245
                log.Debugf("Signal deny for %s", jobDesc)
2✔
246
                return NewErrf(ErrParentValidationFailed,
2✔
247
                        "parent validation failed")
2✔
248

249
        case <-signals.allow:
30✔
250
                log.Tracef("Signal allow for %s", jobDesc)
30✔
251
                return nil
30✔
252
        }
253
}
254

255
// SignalDependants will allow/deny any jobs that are dependent on this job that
256
// they can continue execution. If the job doesn't have any dependants, then
257
// this function sill exit immediately.
258
func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
343✔
259
        v.Lock()
343✔
260
        defer v.Unlock()
343✔
261

343✔
262
        switch msg := job.(type) {
343✔
263

264
        // If we've just finished executing a ChannelAnnouncement, then we'll
265
        // close out the signal, and remove the signal from the map of active
266
        // ones. This will allow/deny any dependent jobs to continue execution.
267
        case *models.ChannelEdgeInfo:
17✔
268
                shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
17✔
269
                finSignals, ok := v.chanAnnFinSignal[shortID]
17✔
270
                if ok {
34✔
271
                        if allow {
31✔
272
                                close(finSignals.allow)
14✔
273
                        } else {
17✔
274
                                close(finSignals.deny)
3✔
275
                        }
3✔
276
                        delete(v.chanAnnFinSignal, shortID)
17✔
277
                }
278
        case *lnwire.ChannelAnnouncement1:
234✔
279
                finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID]
234✔
280
                if ok {
468✔
281
                        if allow {
262✔
282
                                close(finSignals.allow)
28✔
283
                        } else {
234✔
284
                                close(finSignals.deny)
206✔
285
                        }
206✔
286
                        delete(v.chanAnnFinSignal, msg.ShortChannelID)
234✔
287
                }
288

289
                delete(v.chanEdgeDependencies, msg.ShortChannelID)
234✔
290

291
        // For all other job types, we'll delete the tracking entries from the
292
        // map, as if we reach this point, then all dependants have already
293
        // finished executing and we can proceed.
294
        case *channeldb.LightningNode:
7✔
295
                delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes))
7✔
296
        case *lnwire.NodeAnnouncement:
24✔
297
                delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID))
24✔
298
        case *lnwire.ChannelUpdate1:
55✔
299
                delete(v.chanEdgeDependencies, msg.ShortChannelID)
55✔
300
        case *models.ChannelEdgePolicy:
6✔
301
                shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
6✔
302
                delete(v.chanEdgeDependencies, shortID)
6✔
303

304
        case *lnwire.AnnounceSignatures1:
×
305
                return
×
306
        }
307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc