• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/discovery"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/lnrpc"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing"
19
)
20

21
// Manager manages the node's local channels. The only operation that is
22
// currently implemented is updating forwarding policies.
23
type Manager struct {
24
        // SelfPub contains the public key of the local node.
25
        SelfPub *btcec.PublicKey
26

27
        // DefaultRoutingPolicy is the default routing policy.
28
        DefaultRoutingPolicy models.ForwardingPolicy
29

30
        // UpdateForwardingPolicies is used by the manager to update active
31
        // links with a new policy.
32
        UpdateForwardingPolicies func(
33
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
34

35
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
36
        // and broadcast it to the network.
37
        PropagateChanPolicyUpdate func(
38
                edgesToUpdate []discovery.EdgeWithInfo) error
39

40
        // ForAllOutgoingChannels is required to iterate over all our local
41
        // channels. The ChannelEdgePolicy parameter may be nil.
42
        ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo,
43
                *models.ChannelEdgePolicy) error) error
44

45
        // FetchChannel is used to query local channel parameters. Optionally an
46
        // existing db tx can be supplied.
47
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
48
                error)
49

50
        // AddEdge is used to add edge/channel to the topology of the router.
51
        AddEdge func(edge *models.ChannelEdgeInfo) error
52

53
        // policyUpdateLock ensures that the database and the link do not fall
54
        // out of sync if there are concurrent fee update calls. Without it,
55
        // there is a chance that policy A updates the database, then policy B
56
        // updates the database, then policy B updates the link, then policy A
57
        // updates the link.
58
        policyUpdateLock sync.Mutex
59
}
60

61
// UpdatePolicy updates the policy for the specified channels on disk and in
62
// the active links.
63
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
64
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
65
        []*lnrpc.FailedUpdate, error) {
6✔
66

6✔
67
        r.policyUpdateLock.Lock()
6✔
68
        defer r.policyUpdateLock.Unlock()
6✔
69

6✔
70
        // First, we'll construct a set of all the channels that we are
6✔
71
        // trying to update.
6✔
72
        unprocessedChans := make(map[wire.OutPoint]struct{})
6✔
73
        for _, chanPoint := range chanPoints {
11✔
74
                unprocessedChans[chanPoint] = struct{}{}
5✔
75
        }
5✔
76

77
        haveChanFilter := len(unprocessedChans) != 0
6✔
78

6✔
79
        var failedUpdates []*lnrpc.FailedUpdate
6✔
80
        var edgesToUpdate []discovery.EdgeWithInfo
6✔
81
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
6✔
82

6✔
83
        // NOTE: edge may be nil when this function is called.
6✔
84
        processChan := func(info *models.ChannelEdgeInfo,
6✔
85
                edge *models.ChannelEdgePolicy) error {
11✔
86

5✔
87
                // If we have a channel filter, and this channel isn't a part
5✔
88
                // of it, then we'll skip it.
5✔
89
                _, ok := unprocessedChans[info.ChannelPoint]
5✔
90
                if !ok && haveChanFilter {
6✔
91
                        return nil
1✔
92
                }
1✔
93

94
                // Mark this channel as found by removing it. unprocessedChans
95
                // will be used to report invalid channels later on.
96
                delete(unprocessedChans, info.ChannelPoint)
4✔
97

4✔
98
                if edge == nil {
4✔
99
                        log.Errorf("Got nil channel edge policy when updating "+
×
100
                                "a channel. Channel point: %v",
×
101
                                info.ChannelPoint.String())
×
102

×
103
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
104
                                info.ChannelPoint,
×
105
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
106
                                "edge policy not found",
×
107
                        ))
×
108

×
109
                        return nil
×
110
                }
×
111

112
                // Apply the new policy to the edge.
113
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
4✔
114
                if err != nil {
4✔
115
                        failedUpdates = append(failedUpdates,
×
116
                                makeFailureItem(info.ChannelPoint,
×
117
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
118
                                        err.Error(),
×
119
                                ))
×
120

×
121
                        return nil
×
122
                }
×
123

124
                // Add updated edge to list of edges to send to gossiper.
125
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
4✔
126
                        Info: info,
4✔
127
                        Edge: edge,
4✔
128
                })
4✔
129

4✔
130
                // Extract inbound fees from the ExtraOpaqueData.
4✔
131
                var inboundWireFee lnwire.Fee
4✔
132
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
4✔
133
                if err != nil {
4✔
134
                        return err
×
135
                }
×
136
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
4✔
137

4✔
138
                // Add updated policy to list of policies to send to switch.
4✔
139
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
4✔
140
                        BaseFee:       edge.FeeBaseMSat,
4✔
141
                        FeeRate:       edge.FeeProportionalMillionths,
4✔
142
                        TimeLockDelta: uint32(edge.TimeLockDelta),
4✔
143
                        MinHTLCOut:    edge.MinHTLC,
4✔
144
                        MaxHTLC:       edge.MaxHTLC,
4✔
145
                        InboundFee:    inboundFee,
4✔
146
                }
4✔
147

4✔
148
                return nil
4✔
149
        }
150

151
        // Next, we'll loop over all the outgoing channels the router knows of.
152
        // If we have a filter then we'll only collect those channels, otherwise
153
        // we'll collect them all.
154
        err := r.ForAllOutgoingChannels(processChan)
6✔
155
        if err != nil {
6✔
156
                return nil, err
×
157
        }
×
158

159
        // Construct a list of failed policy updates.
160
        for chanPoint := range unprocessedChans {
9✔
161
                channel, err := r.FetchChannel(chanPoint)
3✔
162
                switch {
3✔
163
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
164
                        failedUpdates = append(failedUpdates,
1✔
165
                                makeFailureItem(chanPoint,
1✔
166
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
167
                                        "not found",
1✔
168
                                ))
1✔
169

170
                case err != nil:
×
171
                        failedUpdates = append(failedUpdates,
×
172
                                makeFailureItem(chanPoint,
×
173
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
174
                                        err.Error(),
×
175
                                ))
×
176

177
                case channel.IsPending:
×
178
                        failedUpdates = append(failedUpdates,
×
179
                                makeFailureItem(chanPoint,
×
180
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
181
                                        "not yet confirmed",
×
182
                                ))
×
183

184
                // If the edge was not found, but the channel is found, that
185
                // means the edge is missing in the graph database and should be
186
                // recreated. The edge and policy are created in-memory. The
187
                // edge is inserted in createEdge below and the policy will be
188
                // added to the graph in the PropagateChanPolicyUpdate call
189
                // below.
190
                case createMissingEdge:
1✔
191
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
192
                                "during policy update. Recreating edge with "+
1✔
193
                                "default policy.",
1✔
194
                                channel.FundingOutpoint.String())
1✔
195

1✔
196
                        info, edge, failedUpdate := r.createMissingEdge(
1✔
197
                                channel, newSchema,
1✔
198
                        )
1✔
199
                        if failedUpdate == nil {
2✔
200
                                err = processChan(info, edge)
1✔
201
                                if err != nil {
1✔
202
                                        return nil, err
×
203
                                }
×
204
                        } else {
×
205
                                failedUpdates = append(
×
206
                                        failedUpdates, failedUpdate,
×
207
                                )
×
208
                        }
×
209

210
                default:
1✔
211
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
212
                                "during policy update. Could not update "+
1✔
213
                                "policy.", channel.FundingOutpoint.String())
1✔
214

1✔
215
                        failedUpdates = append(failedUpdates,
1✔
216
                                makeFailureItem(chanPoint,
1✔
217
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
1✔
218
                                        "could not update policies",
1✔
219
                                ))
1✔
220
                }
221
        }
222

223
        // Commit the policy updates to disk and broadcast to the network. We
224
        // validated the new policy above, so we expect no validation errors. If
225
        // this would happen because of a bug, the link policy will be
226
        // desynchronized. It is currently not possible to atomically commit
227
        // multiple edge updates.
228
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
6✔
229
        if err != nil {
6✔
230
                return nil, err
×
231
        }
×
232

233
        // Update active links.
234
        r.UpdateForwardingPolicies(policiesToUpdate)
6✔
235

6✔
236
        return failedUpdates, nil
6✔
237
}
238

239
func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel,
240
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
241
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
1✔
242

1✔
243
        info, edge, err := r.createEdge(channel, time.Now())
1✔
244
        if err != nil {
1✔
245
                log.Errorf("Failed to recreate missing edge "+
×
246
                        "for channel (%s): %v",
×
247
                        channel.FundingOutpoint.String(), err)
×
248

×
249
                return nil, nil, makeFailureItem(
×
250
                        channel.FundingOutpoint,
×
251
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
252
                        "could not update policies",
×
253
                )
×
254
        }
×
255

256
        // Validate the newly created edge policy with the user defined new
257
        // schema before adding the edge to the database.
258
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
1✔
259
        if err != nil {
1✔
260
                return nil, nil, makeFailureItem(
×
261
                        info.ChannelPoint,
×
262
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
263
                        err.Error(),
×
264
                )
×
265
        }
×
266

267
        // Insert the edge into the database to avoid `edge not
268
        // found` errors during policy update propagation.
269
        err = r.AddEdge(info)
1✔
270
        if err != nil {
1✔
271
                log.Errorf("Attempt to add missing edge for "+
×
272
                        "channel (%s) errored with: %v",
×
273
                        channel.FundingOutpoint.String(), err)
×
274

×
275
                return nil, nil, makeFailureItem(
×
276
                        channel.FundingOutpoint,
×
277
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
278
                        "could not add edge",
×
279
                )
×
280
        }
×
281

282
        return info, edge, nil
1✔
283
}
284

285
// createEdge recreates an edge and policy from an open channel in-memory.
286
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
287
        timestamp time.Time) (*models.ChannelEdgeInfo,
288
        *models.ChannelEdgePolicy, error) {
3✔
289

3✔
290
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
3✔
291
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
3✔
292
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
3✔
293
                SerializeCompressed()
3✔
294
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
3✔
295
                SerializeCompressed()
3✔
296
        channelFlags := lnwire.ChanUpdateChanFlags(0)
3✔
297

3✔
298
        // Make it such that node_id_1 is the lexicographically-lesser of the
3✔
299
        // two compressed keys sorted in ascending lexicographic order.
3✔
300
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
4✔
301
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
1✔
302
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
1✔
303
                        bitcoinKey1Bytes
1✔
304
                channelFlags = 1
1✔
305
        }
1✔
306

307
        var featureBuf bytes.Buffer
3✔
308
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
3✔
309
        if err != nil {
3✔
310
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
311
                        err)
×
312
        }
×
313

314
        info := &models.ChannelEdgeInfo{
3✔
315
                ChannelID:    channel.ShortChanID().ToUint64(),
3✔
316
                ChainHash:    channel.ChainHash,
3✔
317
                Features:     featureBuf.Bytes(),
3✔
318
                Capacity:     channel.Capacity,
3✔
319
                ChannelPoint: channel.FundingOutpoint,
3✔
320
        }
3✔
321

3✔
322
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
3✔
323
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
3✔
324
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
3✔
325
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
3✔
326

3✔
327
        // Construct a dummy channel edge policy with default values that will
3✔
328
        // be updated with the new values in the call to processChan below.
3✔
329
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
3✔
330
        edge := &models.ChannelEdgePolicy{
3✔
331
                ChannelID:                 channel.ShortChanID().ToUint64(),
3✔
332
                LastUpdate:                timestamp,
3✔
333
                TimeLockDelta:             timeLockDelta,
3✔
334
                ChannelFlags:              channelFlags,
3✔
335
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
3✔
336
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
3✔
337
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
3✔
338
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
3✔
339
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
3✔
340
        }
3✔
341

3✔
342
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
3✔
343

3✔
344
        return info, edge, nil
3✔
345
}
346

347
// updateEdge updates the given edge with the new schema.
348
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
349
        edge *models.ChannelEdgePolicy,
350
        newSchema routing.ChannelPolicy) error {
5✔
351

5✔
352
        channel, err := r.FetchChannel(chanPoint)
5✔
353
        if err != nil {
5✔
354
                return err
×
355
        }
×
356

357
        // Update forwarding fee scheme and required time lock delta.
358
        edge.FeeBaseMSat = newSchema.BaseFee
5✔
359
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
5✔
360
                newSchema.FeeRate,
5✔
361
        )
5✔
362

5✔
363
        // If inbound fees are set, we update the edge with them.
5✔
364
        err = fn.MapOptionZ(newSchema.InboundFee,
5✔
365
                func(f models.InboundFee) error {
5✔
366
                        inboundWireFee := f.ToWire()
×
367
                        return edge.ExtraOpaqueData.PackRecords(
×
368
                                &inboundWireFee,
×
369
                        )
×
370
                })
×
371
        if err != nil {
5✔
372
                return err
×
373
        }
×
374

375
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
5✔
376

5✔
377
        // Retrieve negotiated channel htlc amt limits.
5✔
378
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
5✔
379
        if err != nil {
5✔
380
                return err
×
381
        }
×
382

383
        // We now update the edge max htlc value.
384
        switch {
5✔
385
        // If a non-zero max htlc was specified, use it to update the edge.
386
        // Otherwise keep the value unchanged.
387
        case newSchema.MaxHTLC != 0:
4✔
388
                edge.MaxHTLC = newSchema.MaxHTLC
4✔
389

390
        // If this edge still doesn't have a max htlc set, set it to the max.
391
        // This is an on-the-fly migration.
392
        case !edge.MessageFlags.HasMaxHtlc():
×
393
                edge.MaxHTLC = amtMax
×
394

395
        // If this edge has a max htlc that exceeds what the channel can
396
        // actually carry, correct it now. This can happen, because we
397
        // previously set the max htlc to the channel capacity.
398
        case edge.MaxHTLC > amtMax:
×
399
                edge.MaxHTLC = amtMax
×
400
        }
401

402
        // If a new min htlc is specified, update the edge.
403
        if newSchema.MinHTLC != nil {
5✔
404
                edge.MinHTLC = *newSchema.MinHTLC
×
405
        }
×
406

407
        // If the MaxHtlc flag wasn't already set, we can set it now.
408
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
5✔
409

5✔
410
        // Validate htlc amount constraints.
5✔
411
        switch {
5✔
412
        case edge.MinHTLC < amtMin:
×
413
                return fmt.Errorf(
×
414
                        "min htlc amount of %v is below min htlc parameter of %v",
×
415
                        edge.MinHTLC, amtMin,
×
416
                )
×
417

418
        case edge.MaxHTLC > amtMax:
×
419
                return fmt.Errorf(
×
420
                        "max htlc size of %v is above max pending amount of %v",
×
421
                        edge.MaxHTLC, amtMax,
×
422
                )
×
423

424
        case edge.MinHTLC > edge.MaxHTLC:
×
425
                return fmt.Errorf(
×
426
                        "min_htlc %v greater than max_htlc %v",
×
427
                        edge.MinHTLC, edge.MaxHTLC,
×
428
                )
×
429
        }
430

431
        // Clear signature to help prevent usage of the previous signature.
432
        edge.SetSigBytes(nil)
5✔
433

5✔
434
        return nil
5✔
435
}
436

437
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
438
// constraints.
439
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
440
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
5✔
441

5✔
442
        // The max htlc policy field must be less than or equal to the channel
5✔
443
        // capacity AND less than or equal to the max in-flight HTLC value.
5✔
444
        // Since the latter is always less than or equal to the former, just
5✔
445
        // return the max in-flight value.
5✔
446
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
5✔
447

5✔
448
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
5✔
449
}
5✔
450

451
// makeFailureItem creates a lnrpc.FailedUpdate object.
452
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
453
        errStr string) *lnrpc.FailedUpdate {
2✔
454

2✔
455
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
456

2✔
457
        return &lnrpc.FailedUpdate{
2✔
458
                Outpoint:    outpoint,
2✔
459
                Reason:      updateFailure,
2✔
460
                UpdateError: errStr,
2✔
461
        }
2✔
462
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc