• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12058234999

27 Nov 2024 09:06PM UTC coverage: 57.847% (-1.1%) from 58.921%
12058234999

Pull #9148

github

ProofOfKeags
lnwire: convert DynPropose and DynCommit to use typed tlv records
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

142 of 177 new or added lines in 4 files covered. (80.23%)

19365 existing lines in 251 files now uncovered.

100876 of 174383 relevant lines covered (57.85%)

25338.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.03
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/channeldb/models"
14
        "github.com/lightningnetwork/lnd/discovery"
15
        "github.com/lightningnetwork/lnd/fn"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnrpc"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing"
20
)
21

22
// Manager manages the node's local channels. The only operation that is
23
// currently implemented is updating forwarding policies.
24
type Manager struct {
25
        // SelfPub contains the public key of the local node.
26
        SelfPub *btcec.PublicKey
27

28
        // DefaultRoutingPolicy is the default routing policy.
29
        DefaultRoutingPolicy models.ForwardingPolicy
30

31
        // UpdateForwardingPolicies is used by the manager to update active
32
        // links with a new policy.
33
        UpdateForwardingPolicies func(
34
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
35

36
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
37
        // and broadcast it to the network.
38
        PropagateChanPolicyUpdate func(
39
                edgesToUpdate []discovery.EdgeWithInfo) error
40

41
        // ForAllOutgoingChannels is required to iterate over all our local
42
        // channels. The ChannelEdgePolicy parameter may be nil.
43
        ForAllOutgoingChannels func(cb func(kvdb.RTx,
44
                *models.ChannelEdgeInfo,
45
                *models.ChannelEdgePolicy) error) error
46

47
        // FetchChannel is used to query local channel parameters. Optionally an
48
        // existing db tx can be supplied.
49
        FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
50
                *channeldb.OpenChannel, error)
51

52
        // AddEdge is used to add edge/channel to the topology of the router.
53
        AddEdge func(edge *models.ChannelEdgeInfo) error
54

55
        // policyUpdateLock ensures that the database and the link do not fall
56
        // out of sync if there are concurrent fee update calls. Without it,
57
        // there is a chance that policy A updates the database, then policy B
58
        // updates the database, then policy B updates the link, then policy A
59
        // updates the link.
60
        policyUpdateLock sync.Mutex
61
}
62

63
// UpdatePolicy updates the policy for the specified channels on disk and in
64
// the active links.
65
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
66
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
67
        []*lnrpc.FailedUpdate, error) {
6✔
68

6✔
69
        r.policyUpdateLock.Lock()
6✔
70
        defer r.policyUpdateLock.Unlock()
6✔
71

6✔
72
        // First, we'll construct a set of all the channels that we are
6✔
73
        // trying to update.
6✔
74
        unprocessedChans := make(map[wire.OutPoint]struct{})
6✔
75
        for _, chanPoint := range chanPoints {
11✔
76
                unprocessedChans[chanPoint] = struct{}{}
5✔
77
        }
5✔
78

79
        haveChanFilter := len(unprocessedChans) != 0
6✔
80

6✔
81
        var failedUpdates []*lnrpc.FailedUpdate
6✔
82
        var edgesToUpdate []discovery.EdgeWithInfo
6✔
83
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
6✔
84

6✔
85
        // NOTE: edge may be nil when this function is called.
6✔
86
        processChan := func(
6✔
87
                tx kvdb.RTx,
6✔
88
                info *models.ChannelEdgeInfo,
6✔
89
                edge *models.ChannelEdgePolicy) error {
11✔
90

5✔
91
                // If we have a channel filter, and this channel isn't a part
5✔
92
                // of it, then we'll skip it.
5✔
93
                _, ok := unprocessedChans[info.ChannelPoint]
5✔
94
                if !ok && haveChanFilter {
6✔
95
                        return nil
1✔
96
                }
1✔
97

98
                // Mark this channel as found by removing it. unprocessedChans
99
                // will be used to report invalid channels later on.
100
                delete(unprocessedChans, info.ChannelPoint)
4✔
101

4✔
102
                if edge == nil {
4✔
103
                        log.Errorf("Got nil channel edge policy when updating "+
×
104
                                "a channel. Channel point: %v",
×
105
                                info.ChannelPoint.String())
×
106

×
107
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
108
                                info.ChannelPoint,
×
109
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
110
                                "edge policy not found",
×
111
                        ))
×
112

×
113
                        return nil
×
114
                }
×
115

116
                // Apply the new policy to the edge.
117
                err := r.updateEdge(
4✔
118
                        tx, info.ChannelPoint, edge, newSchema,
4✔
119
                )
4✔
120
                if err != nil {
4✔
UNCOV
121
                        failedUpdates = append(failedUpdates,
×
UNCOV
122
                                makeFailureItem(info.ChannelPoint,
×
UNCOV
123
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
UNCOV
124
                                        err.Error(),
×
UNCOV
125
                                ))
×
UNCOV
126

×
UNCOV
127
                        return nil
×
UNCOV
128
                }
×
129

130
                // Add updated edge to list of edges to send to gossiper.
131
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
4✔
132
                        Info: info,
4✔
133
                        Edge: edge,
4✔
134
                })
4✔
135

4✔
136
                // Extract inbound fees from the ExtraOpaqueData.
4✔
137
                var inboundWireFee lnwire.Fee
4✔
138
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
4✔
139
                if err != nil {
4✔
140
                        return err
×
141
                }
×
142
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
4✔
143

4✔
144
                // Add updated policy to list of policies to send to switch.
4✔
145
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
4✔
146
                        BaseFee:       edge.FeeBaseMSat,
4✔
147
                        FeeRate:       edge.FeeProportionalMillionths,
4✔
148
                        TimeLockDelta: uint32(edge.TimeLockDelta),
4✔
149
                        MinHTLCOut:    edge.MinHTLC,
4✔
150
                        MaxHTLC:       edge.MaxHTLC,
4✔
151
                        InboundFee:    inboundFee,
4✔
152
                }
4✔
153

4✔
154
                return nil
4✔
155
        }
156

157
        // Next, we'll loop over all the outgoing channels the router knows of.
158
        // If we have a filter then we'll only collect those channels, otherwise
159
        // we'll collect them all.
160
        err := r.ForAllOutgoingChannels(processChan)
6✔
161
        if err != nil {
6✔
162
                return nil, err
×
163
        }
×
164

165
        // Construct a list of failed policy updates.
166
        for chanPoint := range unprocessedChans {
9✔
167
                channel, err := r.FetchChannel(nil, chanPoint)
3✔
168
                switch {
3✔
169
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
170
                        failedUpdates = append(failedUpdates,
1✔
171
                                makeFailureItem(chanPoint,
1✔
172
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
173
                                        "not found",
1✔
174
                                ))
1✔
175

176
                case err != nil:
×
177
                        failedUpdates = append(failedUpdates,
×
178
                                makeFailureItem(chanPoint,
×
179
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
180
                                        err.Error(),
×
181
                                ))
×
182

183
                case channel.IsPending:
×
184
                        failedUpdates = append(failedUpdates,
×
185
                                makeFailureItem(chanPoint,
×
186
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
187
                                        "not yet confirmed",
×
188
                                ))
×
189

190
                // If the edge was not found, but the channel is found, that
191
                // means the edge is missing in the graph database and should be
192
                // recreated. The edge and policy are created in-memory. The
193
                // edge is inserted in createEdge below and the policy will be
194
                // added to the graph in the PropagateChanPolicyUpdate call
195
                // below.
196
                case createMissingEdge:
1✔
197
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
198
                                "during policy update. Recreating edge with "+
1✔
199
                                "default policy.",
1✔
200
                                channel.FundingOutpoint.String())
1✔
201

1✔
202
                        info, edge, failedUpdate := r.createMissingEdge(
1✔
203
                                channel, newSchema,
1✔
204
                        )
1✔
205
                        if failedUpdate == nil {
2✔
206
                                err = processChan(nil, info, edge)
1✔
207
                                if err != nil {
1✔
208
                                        return nil, err
×
209
                                }
×
210
                        } else {
×
211
                                failedUpdates = append(
×
212
                                        failedUpdates, failedUpdate,
×
213
                                )
×
214
                        }
×
215

216
                default:
1✔
217
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
218
                                "during policy update. Could not update "+
1✔
219
                                "policy.", channel.FundingOutpoint.String())
1✔
220

1✔
221
                        failedUpdates = append(failedUpdates,
1✔
222
                                makeFailureItem(chanPoint,
1✔
223
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
1✔
224
                                        "could not update policies",
1✔
225
                                ))
1✔
226
                }
227
        }
228

229
        // Commit the policy updates to disk and broadcast to the network. We
230
        // validated the new policy above, so we expect no validation errors. If
231
        // this would happen because of a bug, the link policy will be
232
        // desynchronized. It is currently not possible to atomically commit
233
        // multiple edge updates.
234
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
6✔
235
        if err != nil {
6✔
236
                return nil, err
×
237
        }
×
238

239
        // Update active links.
240
        r.UpdateForwardingPolicies(policiesToUpdate)
6✔
241

6✔
242
        return failedUpdates, nil
6✔
243
}
244

245
func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel,
246
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
247
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
1✔
248

1✔
249
        info, edge, err := r.createEdge(channel, time.Now())
1✔
250
        if err != nil {
1✔
251
                log.Errorf("Failed to recreate missing edge "+
×
252
                        "for channel (%s): %v",
×
253
                        channel.FundingOutpoint.String(), err)
×
254

×
255
                return nil, nil, makeFailureItem(
×
256
                        channel.FundingOutpoint,
×
257
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
258
                        "could not update policies",
×
259
                )
×
260
        }
×
261

262
        // Validate the newly created edge policy with the user defined new
263
        // schema before adding the edge to the database.
264
        err = r.updateEdge(nil, channel.FundingOutpoint, edge, newSchema)
1✔
265
        if err != nil {
1✔
266
                return nil, nil, makeFailureItem(
×
267
                        info.ChannelPoint,
×
268
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
269
                        err.Error(),
×
270
                )
×
271
        }
×
272

273
        // Insert the edge into the database to avoid `edge not
274
        // found` errors during policy update propagation.
275
        err = r.AddEdge(info)
1✔
276
        if err != nil {
1✔
277
                log.Errorf("Attempt to add missing edge for "+
×
278
                        "channel (%s) errored with: %v",
×
279
                        channel.FundingOutpoint.String(), err)
×
280

×
281
                return nil, nil, makeFailureItem(
×
282
                        channel.FundingOutpoint,
×
283
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
284
                        "could not add edge",
×
285
                )
×
286
        }
×
287

288
        return info, edge, nil
1✔
289
}
290

291
// createEdge recreates an edge and policy from an open channel in-memory.
292
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
293
        timestamp time.Time) (*models.ChannelEdgeInfo,
294
        *models.ChannelEdgePolicy, error) {
3✔
295

3✔
296
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
3✔
297
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
3✔
298
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
3✔
299
                SerializeCompressed()
3✔
300
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
3✔
301
                SerializeCompressed()
3✔
302
        channelFlags := lnwire.ChanUpdateChanFlags(0)
3✔
303

3✔
304
        // Make it such that node_id_1 is the lexicographically-lesser of the
3✔
305
        // two compressed keys sorted in ascending lexicographic order.
3✔
306
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
4✔
307
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
1✔
308
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
1✔
309
                        bitcoinKey1Bytes
1✔
310
                channelFlags = 1
1✔
311
        }
1✔
312

313
        var featureBuf bytes.Buffer
3✔
314
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
3✔
315
        if err != nil {
3✔
316
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
317
                        err)
×
318
        }
×
319

320
        info := &models.ChannelEdgeInfo{
3✔
321
                ChannelID:    channel.ShortChanID().ToUint64(),
3✔
322
                ChainHash:    channel.ChainHash,
3✔
323
                Features:     featureBuf.Bytes(),
3✔
324
                Capacity:     channel.Capacity,
3✔
325
                ChannelPoint: channel.FundingOutpoint,
3✔
326
        }
3✔
327

3✔
328
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
3✔
329
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
3✔
330
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
3✔
331
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
3✔
332

3✔
333
        // Construct a dummy channel edge policy with default values that will
3✔
334
        // be updated with the new values in the call to processChan below.
3✔
335
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
3✔
336
        edge := &models.ChannelEdgePolicy{
3✔
337
                ChannelID:                 channel.ShortChanID().ToUint64(),
3✔
338
                LastUpdate:                timestamp,
3✔
339
                TimeLockDelta:             timeLockDelta,
3✔
340
                ChannelFlags:              channelFlags,
3✔
341
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
3✔
342
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
3✔
343
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
3✔
344
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
3✔
345
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
3✔
346
        }
3✔
347

3✔
348
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
3✔
349

3✔
350
        return info, edge, nil
3✔
351
}
352

353
// updateEdge updates the given edge with the new schema.
354
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
355
        edge *models.ChannelEdgePolicy,
356
        newSchema routing.ChannelPolicy) error {
5✔
357

5✔
358
        channel, err := r.FetchChannel(tx, chanPoint)
5✔
359
        if err != nil {
5✔
UNCOV
360
                return err
×
UNCOV
361
        }
×
362

363
        // Update forwarding fee scheme and required time lock delta.
364
        edge.FeeBaseMSat = newSchema.BaseFee
5✔
365
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
5✔
366
                newSchema.FeeRate,
5✔
367
        )
5✔
368

5✔
369
        // If inbound fees are set, we update the edge with them.
5✔
370
        err = fn.MapOptionZ(newSchema.InboundFee,
5✔
371
                func(f models.InboundFee) error {
5✔
UNCOV
372
                        inboundWireFee := f.ToWire()
×
UNCOV
373
                        return edge.ExtraOpaqueData.PackRecords(
×
UNCOV
374
                                &inboundWireFee,
×
UNCOV
375
                        )
×
UNCOV
376
                })
×
377
        if err != nil {
5✔
378
                return err
×
379
        }
×
380

381
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
5✔
382

5✔
383
        // Retrieve negotiated channel htlc amt limits.
5✔
384
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
5✔
385
        if err != nil {
5✔
386
                return err
×
387
        }
×
388

389
        // We now update the edge max htlc value.
390
        switch {
5✔
391
        // If a non-zero max htlc was specified, use it to update the edge.
392
        // Otherwise keep the value unchanged.
393
        case newSchema.MaxHTLC != 0:
4✔
394
                edge.MaxHTLC = newSchema.MaxHTLC
4✔
395

396
        // If this edge still doesn't have a max htlc set, set it to the max.
397
        // This is an on-the-fly migration.
398
        case !edge.MessageFlags.HasMaxHtlc():
×
399
                edge.MaxHTLC = amtMax
×
400

401
        // If this edge has a max htlc that exceeds what the channel can
402
        // actually carry, correct it now. This can happen, because we
403
        // previously set the max htlc to the channel capacity.
404
        case edge.MaxHTLC > amtMax:
×
405
                edge.MaxHTLC = amtMax
×
406
        }
407

408
        // If a new min htlc is specified, update the edge.
409
        if newSchema.MinHTLC != nil {
5✔
410
                edge.MinHTLC = *newSchema.MinHTLC
×
411
        }
×
412

413
        // If the MaxHtlc flag wasn't already set, we can set it now.
414
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
5✔
415

5✔
416
        // Validate htlc amount constraints.
5✔
417
        switch {
5✔
418
        case edge.MinHTLC < amtMin:
×
419
                return fmt.Errorf(
×
420
                        "min htlc amount of %v is below min htlc parameter of %v",
×
421
                        edge.MinHTLC, amtMin,
×
422
                )
×
423

424
        case edge.MaxHTLC > amtMax:
×
425
                return fmt.Errorf(
×
426
                        "max htlc size of %v is above max pending amount of %v",
×
427
                        edge.MaxHTLC, amtMax,
×
428
                )
×
429

430
        case edge.MinHTLC > edge.MaxHTLC:
×
431
                return fmt.Errorf(
×
432
                        "min_htlc %v greater than max_htlc %v",
×
433
                        edge.MinHTLC, edge.MaxHTLC,
×
434
                )
×
435
        }
436

437
        // Clear signature to help prevent usage of the previous signature.
438
        edge.SetSigBytes(nil)
5✔
439

5✔
440
        return nil
5✔
441
}
442

443
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
444
// constraints.
445
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
446
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
5✔
447

5✔
448
        // The max htlc policy field must be less than or equal to the channel
5✔
449
        // capacity AND less than or equal to the max in-flight HTLC value.
5✔
450
        // Since the latter is always less than or equal to the former, just
5✔
451
        // return the max in-flight value.
5✔
452
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
5✔
453

5✔
454
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
5✔
455
}
5✔
456

457
// makeFailureItem creates a lnrpc.FailedUpdate object.
458
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
459
        errStr string) *lnrpc.FailedUpdate {
2✔
460

2✔
461
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
462

2✔
463
        return &lnrpc.FailedUpdate{
2✔
464
                Outpoint:    outpoint,
2✔
465
                Reason:      updateFailure,
2✔
466
                UpdateError: errStr,
2✔
467
        }
2✔
468
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc