• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15838907453

24 Jun 2025 01:26AM UTC coverage: 57.079% (-11.1%) from 68.172%
15838907453

Pull #9982

github

web-flow
Merge e42780be2 into 45c15646c
Pull Request #9982: lnwire+lnwallet: add LocalNonces field for splice nonce coordination w/ taproot channels

103 of 167 new or added lines in 5 files covered. (61.68%)

30191 existing lines in 463 files now uncovered.

96331 of 168768 relevant lines covered (57.08%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.5
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/discovery"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph/db/models"
17
        "github.com/lightningnetwork/lnd/lnrpc"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing"
20
)
21

22
// Manager manages the node's local channels. The only operation that is
23
// currently implemented is updating forwarding policies.
24
type Manager struct {
25
        // SelfPub contains the public key of the local node.
26
        SelfPub *btcec.PublicKey
27

28
        // DefaultRoutingPolicy is the default routing policy.
29
        DefaultRoutingPolicy models.ForwardingPolicy
30

31
        // UpdateForwardingPolicies is used by the manager to update active
32
        // links with a new policy.
33
        UpdateForwardingPolicies func(
34
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
35

36
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
37
        // and broadcast it to the network.
38
        PropagateChanPolicyUpdate func(
39
                edgesToUpdate []discovery.EdgeWithInfo) error
40

41
        // ForAllOutgoingChannels is required to iterate over all our local
42
        // channels. The ChannelEdgePolicy parameter may be nil.
43
        ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo,
44
                *models.ChannelEdgePolicy) error) error
45

46
        // FetchChannel is used to query local channel parameters. Optionally an
47
        // existing db tx can be supplied.
48
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
49
                error)
50

51
        // AddEdge is used to add edge/channel to the topology of the router.
52
        AddEdge func(ctx context.Context, edge *models.ChannelEdgeInfo) error
53

54
        // policyUpdateLock ensures that the database and the link do not fall
55
        // out of sync if there are concurrent fee update calls. Without it,
56
        // there is a chance that policy A updates the database, then policy B
57
        // updates the database, then policy B updates the link, then policy A
58
        // updates the link.
59
        policyUpdateLock sync.Mutex
60
}
61

62
// UpdatePolicy updates the policy for the specified channels on disk and in
63
// the active links.
64
func (r *Manager) UpdatePolicy(ctx context.Context,
65
        newSchema routing.ChannelPolicy,
66
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
67
        []*lnrpc.FailedUpdate, error) {
1✔
68

1✔
69
        r.policyUpdateLock.Lock()
1✔
70
        defer r.policyUpdateLock.Unlock()
1✔
71

1✔
72
        // First, we'll construct a set of all the channels that we are
1✔
73
        // trying to update.
1✔
74
        unprocessedChans := make(map[wire.OutPoint]struct{})
1✔
75
        for _, chanPoint := range chanPoints {
2✔
76
                unprocessedChans[chanPoint] = struct{}{}
1✔
77
        }
1✔
78

79
        haveChanFilter := len(unprocessedChans) != 0
1✔
80

1✔
81
        var failedUpdates []*lnrpc.FailedUpdate
1✔
82
        var edgesToUpdate []discovery.EdgeWithInfo
1✔
83
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
1✔
84

1✔
85
        // NOTE: edge may be nil when this function is called.
1✔
86
        processChan := func(info *models.ChannelEdgeInfo,
1✔
87
                edge *models.ChannelEdgePolicy) error {
2✔
88

1✔
89
                // If we have a channel filter, and this channel isn't a part
1✔
90
                // of it, then we'll skip it.
1✔
91
                _, ok := unprocessedChans[info.ChannelPoint]
1✔
92
                if !ok && haveChanFilter {
2✔
93
                        return nil
1✔
94
                }
1✔
95

96
                // Mark this channel as found by removing it. unprocessedChans
97
                // will be used to report invalid channels later on.
98
                delete(unprocessedChans, info.ChannelPoint)
1✔
99

1✔
100
                if edge == nil {
1✔
101
                        log.Errorf("Got nil channel edge policy when updating "+
×
102
                                "a channel. Channel point: %v",
×
103
                                info.ChannelPoint.String())
×
104

×
105
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
106
                                info.ChannelPoint,
×
107
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
108
                                "edge policy not found",
×
109
                        ))
×
110

×
111
                        return nil
×
112
                }
×
113

114
                // Apply the new policy to the edge.
115
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
1✔
116
                if err != nil {
1✔
117
                        failedUpdates = append(failedUpdates,
×
118
                                makeFailureItem(info.ChannelPoint,
×
119
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
120
                                        err.Error(),
×
121
                                ))
×
122

×
123
                        return nil
×
124
                }
×
125

126
                // Add updated edge to list of edges to send to gossiper.
127
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
1✔
128
                        Info: info,
1✔
129
                        Edge: edge,
1✔
130
                })
1✔
131

1✔
132
                var inboundWireFee lnwire.Fee
1✔
133
                edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
2✔
134
                        inboundWireFee = fee
1✔
135
                })
1✔
136
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
1✔
137

1✔
138
                // Add updated policy to list of policies to send to switch.
1✔
139
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
1✔
140
                        BaseFee:       edge.FeeBaseMSat,
1✔
141
                        FeeRate:       edge.FeeProportionalMillionths,
1✔
142
                        TimeLockDelta: uint32(edge.TimeLockDelta),
1✔
143
                        MinHTLCOut:    edge.MinHTLC,
1✔
144
                        MaxHTLC:       edge.MaxHTLC,
1✔
145
                        InboundFee:    inboundFee,
1✔
146
                }
1✔
147

1✔
148
                return nil
1✔
149
        }
150

151
        // Next, we'll loop over all the outgoing channels the router knows of.
152
        // If we have a filter then we'll only collect those channels, otherwise
153
        // we'll collect them all.
154
        err := r.ForAllOutgoingChannels(processChan)
1✔
155
        if err != nil {
1✔
156
                return nil, err
×
157
        }
×
158

159
        // Construct a list of failed policy updates.
160
        for chanPoint := range unprocessedChans {
1✔
UNCOV
161
                channel, err := r.FetchChannel(chanPoint)
×
UNCOV
162
                switch {
×
UNCOV
163
                case errors.Is(err, channeldb.ErrChannelNotFound):
×
UNCOV
164
                        failedUpdates = append(failedUpdates,
×
UNCOV
165
                                makeFailureItem(chanPoint,
×
UNCOV
166
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
UNCOV
167
                                        "not found",
×
UNCOV
168
                                ))
×
169

170
                case err != nil:
×
171
                        failedUpdates = append(failedUpdates,
×
172
                                makeFailureItem(chanPoint,
×
173
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
174
                                        err.Error(),
×
175
                                ))
×
176

177
                case channel.IsPending:
×
178
                        failedUpdates = append(failedUpdates,
×
179
                                makeFailureItem(chanPoint,
×
180
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
181
                                        "not yet confirmed",
×
182
                                ))
×
183

184
                // If the edge was not found, but the channel is found, that
185
                // means the edge is missing in the graph database and should be
186
                // recreated. The edge and policy are created in-memory. The
187
                // edge is inserted in createEdge below and the policy will be
188
                // added to the graph in the PropagateChanPolicyUpdate call
189
                // below.
UNCOV
190
                case createMissingEdge:
×
UNCOV
191
                        log.Warnf("Missing edge for active channel (%s) "+
×
UNCOV
192
                                "during policy update. Recreating edge with "+
×
UNCOV
193
                                "default policy.",
×
UNCOV
194
                                channel.FundingOutpoint.String())
×
UNCOV
195

×
UNCOV
196
                        info, edge, failedUpdate := r.createMissingEdge(
×
UNCOV
197
                                ctx, channel, newSchema,
×
UNCOV
198
                        )
×
UNCOV
199
                        if failedUpdate == nil {
×
UNCOV
200
                                err = processChan(info, edge)
×
UNCOV
201
                                if err != nil {
×
202
                                        return nil, err
×
203
                                }
×
204
                        } else {
×
205
                                failedUpdates = append(
×
206
                                        failedUpdates, failedUpdate,
×
207
                                )
×
208
                        }
×
209

UNCOV
210
                default:
×
UNCOV
211
                        log.Warnf("Missing edge for active channel (%s) "+
×
UNCOV
212
                                "during policy update. Could not update "+
×
UNCOV
213
                                "policy.", channel.FundingOutpoint.String())
×
UNCOV
214

×
UNCOV
215
                        failedUpdates = append(failedUpdates,
×
UNCOV
216
                                makeFailureItem(chanPoint,
×
UNCOV
217
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
UNCOV
218
                                        "could not update policies",
×
UNCOV
219
                                ))
×
220
                }
221
        }
222

223
        // Commit the policy updates to disk and broadcast to the network. We
224
        // validated the new policy above, so we expect no validation errors. If
225
        // this would happen because of a bug, the link policy will be
226
        // desynchronized. It is currently not possible to atomically commit
227
        // multiple edge updates.
228
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
1✔
229
        if err != nil {
1✔
230
                return nil, err
×
231
        }
×
232

233
        // Update active links.
234
        r.UpdateForwardingPolicies(policiesToUpdate)
1✔
235

1✔
236
        return failedUpdates, nil
1✔
237
}
238

239
func (r *Manager) createMissingEdge(ctx context.Context,
240
        channel *channeldb.OpenChannel,
241
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
UNCOV
242
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
×
UNCOV
243

×
UNCOV
244
        info, edge, err := r.createEdge(channel, time.Now())
×
UNCOV
245
        if err != nil {
×
246
                log.Errorf("Failed to recreate missing edge "+
×
247
                        "for channel (%s): %v",
×
248
                        channel.FundingOutpoint.String(), err)
×
249

×
250
                return nil, nil, makeFailureItem(
×
251
                        channel.FundingOutpoint,
×
252
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
253
                        "could not update policies",
×
254
                )
×
255
        }
×
256

257
        // Validate the newly created edge policy with the user defined new
258
        // schema before adding the edge to the database.
UNCOV
259
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
×
UNCOV
260
        if err != nil {
×
261
                return nil, nil, makeFailureItem(
×
262
                        info.ChannelPoint,
×
263
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
264
                        err.Error(),
×
265
                )
×
266
        }
×
267

268
        // Insert the edge into the database to avoid `edge not
269
        // found` errors during policy update propagation.
UNCOV
270
        err = r.AddEdge(ctx, info)
×
UNCOV
271
        if err != nil {
×
272
                log.Errorf("Attempt to add missing edge for "+
×
273
                        "channel (%s) errored with: %v",
×
274
                        channel.FundingOutpoint.String(), err)
×
275

×
276
                return nil, nil, makeFailureItem(
×
277
                        channel.FundingOutpoint,
×
278
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
279
                        "could not add edge",
×
280
                )
×
281
        }
×
282

UNCOV
283
        return info, edge, nil
×
284
}
285

286
// createEdge recreates an edge and policy from an open channel in-memory.
287
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
288
        timestamp time.Time) (*models.ChannelEdgeInfo,
UNCOV
289
        *models.ChannelEdgePolicy, error) {
×
UNCOV
290

×
UNCOV
291
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
×
UNCOV
292
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
×
UNCOV
293
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
×
UNCOV
294
                SerializeCompressed()
×
UNCOV
295
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
×
UNCOV
296
                SerializeCompressed()
×
UNCOV
297
        channelFlags := lnwire.ChanUpdateChanFlags(0)
×
UNCOV
298

×
UNCOV
299
        // Make it such that node_id_1 is the lexicographically-lesser of the
×
UNCOV
300
        // two compressed keys sorted in ascending lexicographic order.
×
UNCOV
301
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
×
UNCOV
302
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
×
UNCOV
303
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
×
UNCOV
304
                        bitcoinKey1Bytes
×
UNCOV
305
                channelFlags = 1
×
UNCOV
306
        }
×
307

UNCOV
308
        var featureBuf bytes.Buffer
×
UNCOV
309
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
×
UNCOV
310
        if err != nil {
×
311
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
312
                        err)
×
313
        }
×
314

315
        // We need to make sure we use the real scid for public confirmed
316
        // zero-conf channels.
UNCOV
317
        shortChanID := channel.ShortChanID()
×
UNCOV
318
        isPublic := channel.ChannelFlags&lnwire.FFAnnounceChannel != 0
×
UNCOV
319
        if isPublic && channel.IsZeroConf() && channel.ZeroConfConfirmed() {
×
320
                shortChanID = channel.ZeroConfRealScid()
×
321
        }
×
322

UNCOV
323
        info := &models.ChannelEdgeInfo{
×
UNCOV
324
                ChannelID:    shortChanID.ToUint64(),
×
UNCOV
325
                ChainHash:    channel.ChainHash,
×
UNCOV
326
                Features:     featureBuf.Bytes(),
×
UNCOV
327
                Capacity:     channel.Capacity,
×
UNCOV
328
                ChannelPoint: channel.FundingOutpoint,
×
UNCOV
329
        }
×
UNCOV
330

×
UNCOV
331
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
×
UNCOV
332
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
×
UNCOV
333
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
×
UNCOV
334
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
×
UNCOV
335

×
UNCOV
336
        // Construct a dummy channel edge policy with default values that will
×
UNCOV
337
        // be updated with the new values in the call to processChan below.
×
UNCOV
338
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
×
UNCOV
339
        edge := &models.ChannelEdgePolicy{
×
UNCOV
340
                ChannelID:                 shortChanID.ToUint64(),
×
UNCOV
341
                LastUpdate:                timestamp,
×
UNCOV
342
                TimeLockDelta:             timeLockDelta,
×
UNCOV
343
                ChannelFlags:              channelFlags,
×
UNCOV
344
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
×
UNCOV
345
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
×
UNCOV
346
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
×
UNCOV
347
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
×
UNCOV
348
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
×
UNCOV
349
        }
×
UNCOV
350

×
UNCOV
351
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
×
UNCOV
352

×
UNCOV
353
        return info, edge, nil
×
354
}
355

356
// updateEdge updates the given edge with the new schema.
357
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
358
        edge *models.ChannelEdgePolicy,
359
        newSchema routing.ChannelPolicy) error {
1✔
360

1✔
361
        channel, err := r.FetchChannel(chanPoint)
1✔
362
        if err != nil {
1✔
363
                return err
×
364
        }
×
365

366
        // Update forwarding fee scheme and required time lock delta.
367
        edge.FeeBaseMSat = newSchema.BaseFee
1✔
368
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
1✔
369
                newSchema.FeeRate,
1✔
370
        )
1✔
371

1✔
372
        // If inbound fees are set, we update the edge with them.
1✔
373
        err = fn.MapOptionZ(newSchema.InboundFee,
1✔
374
                func(f models.InboundFee) error {
2✔
375
                        inboundWireFee := f.ToWire()
1✔
376
                        edge.InboundFee = fn.Some(inboundWireFee)
1✔
377

1✔
378
                        return edge.ExtraOpaqueData.PackRecords(
1✔
379
                                &inboundWireFee,
1✔
380
                        )
1✔
381
                })
1✔
382
        if err != nil {
1✔
383
                return err
×
384
        }
×
385

386
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
1✔
387

1✔
388
        // Retrieve negotiated channel htlc amt limits.
1✔
389
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
1✔
390
        if err != nil {
1✔
391
                return err
×
392
        }
×
393

394
        // We now update the edge max htlc value.
395
        switch {
1✔
396
        // If a non-zero max htlc was specified, use it to update the edge.
397
        // Otherwise keep the value unchanged.
398
        case newSchema.MaxHTLC != 0:
1✔
399
                edge.MaxHTLC = newSchema.MaxHTLC
1✔
400

401
        // If this edge still doesn't have a max htlc set, set it to the max.
402
        // This is an on-the-fly migration.
403
        case !edge.MessageFlags.HasMaxHtlc():
×
404
                edge.MaxHTLC = amtMax
×
405

406
        // If this edge has a max htlc that exceeds what the channel can
407
        // actually carry, correct it now. This can happen, because we
408
        // previously set the max htlc to the channel capacity.
409
        case edge.MaxHTLC > amtMax:
×
410
                edge.MaxHTLC = amtMax
×
411
        }
412

413
        // If a new min htlc is specified, update the edge.
414
        if newSchema.MinHTLC != nil {
1✔
415
                edge.MinHTLC = *newSchema.MinHTLC
×
416
        }
×
417

418
        // If the MaxHtlc flag wasn't already set, we can set it now.
419
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
1✔
420

1✔
421
        // Validate htlc amount constraints.
1✔
422
        switch {
1✔
423
        case edge.MinHTLC < amtMin:
×
424
                return fmt.Errorf(
×
425
                        "min htlc amount of %v is below min htlc parameter of %v",
×
426
                        edge.MinHTLC, amtMin,
×
427
                )
×
428

429
        case edge.MaxHTLC > amtMax:
×
430
                return fmt.Errorf(
×
431
                        "max htlc size of %v is above max pending amount of %v",
×
432
                        edge.MaxHTLC, amtMax,
×
433
                )
×
434

435
        case edge.MinHTLC > edge.MaxHTLC:
×
436
                return fmt.Errorf(
×
437
                        "min_htlc %v greater than max_htlc %v",
×
438
                        edge.MinHTLC, edge.MaxHTLC,
×
439
                )
×
440
        }
441

442
        // Clear signature to help prevent usage of the previous signature.
443
        edge.SetSigBytes(nil)
1✔
444

1✔
445
        return nil
1✔
446
}
447

448
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
449
// constraints.
450
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
451
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
1✔
452

1✔
453
        // The max htlc policy field must be less than or equal to the channel
1✔
454
        // capacity AND less than or equal to the max in-flight HTLC value.
1✔
455
        // Since the latter is always less than or equal to the former, just
1✔
456
        // return the max in-flight value.
1✔
457
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
1✔
458

1✔
459
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
1✔
460
}
1✔
461

462
// makeFailureItem creates a lnrpc.FailedUpdate object.
463
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
UNCOV
464
        errStr string) *lnrpc.FailedUpdate {
×
UNCOV
465

×
UNCOV
466
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
×
UNCOV
467

×
UNCOV
468
        return &lnrpc.FailedUpdate{
×
UNCOV
469
                Outpoint:    outpoint,
×
UNCOV
470
                Reason:      updateFailure,
×
UNCOV
471
                UpdateError: errStr,
×
UNCOV
472
        }
×
UNCOV
473
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc