• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 19699489477

26 Nov 2025 09:50AM UTC coverage: 65.173%. First build
19699489477

Pull #10379

github

web-flow
Merge 88a1a8566 into a5f300683
Pull Request #10379: [g175:3] graph/db: continue prepping `models` for V2 data

245 of 435 new or added lines in 11 files covered. (56.32%)

137805 of 211444 relevant lines covered (65.17%)

20804.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.17
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/discovery"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph/db/models"
17
        "github.com/lightningnetwork/lnd/lnrpc"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
)
22

23
// Manager manages the node's local channels. The only operation that is
24
// currently implemented is updating forwarding policies.
25
type Manager struct {
26
        // SelfPub contains the public key of the local node.
27
        SelfPub *btcec.PublicKey
28

29
        // DefaultRoutingPolicy is the default routing policy.
30
        DefaultRoutingPolicy models.ForwardingPolicy
31

32
        // UpdateForwardingPolicies is used by the manager to update active
33
        // links with a new policy.
34
        UpdateForwardingPolicies func(
35
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
36

37
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
38
        // and broadcast it to the network.
39
        PropagateChanPolicyUpdate func(
40
                edgesToUpdate []discovery.EdgeWithInfo) error
41

42
        // ForAllOutgoingChannels is required to iterate over all our local
43
        // channels. The ChannelEdgePolicy parameter may be nil.
44
        ForAllOutgoingChannels func(ctx context.Context,
45
                cb func(*models.ChannelEdgeInfo,
46
                        *models.ChannelEdgePolicy) error, reset func()) error
47

48
        // FetchChannel is used to query local channel parameters. Optionally an
49
        // existing db tx can be supplied.
50
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
51
                error)
52

53
        // AddEdge is used to add edge/channel to the topology of the router.
54
        AddEdge func(ctx context.Context, edge *models.ChannelEdgeInfo) error
55

56
        // policyUpdateLock ensures that the database and the link do not fall
57
        // out of sync if there are concurrent fee update calls. Without it,
58
        // there is a chance that policy A updates the database, then policy B
59
        // updates the database, then policy B updates the link, then policy A
60
        // updates the link.
61
        policyUpdateLock sync.Mutex
62
}
63

64
// UpdatePolicy updates the policy for the specified channels on disk and in
65
// the active links.
66
func (r *Manager) UpdatePolicy(ctx context.Context,
67
        newSchema routing.ChannelPolicy,
68
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
69
        []*lnrpc.FailedUpdate, error) {
9✔
70

9✔
71
        r.policyUpdateLock.Lock()
9✔
72
        defer r.policyUpdateLock.Unlock()
9✔
73

9✔
74
        // First, we'll construct a set of all the channels that we are
9✔
75
        // trying to update.
9✔
76
        unprocessedChans := make(map[wire.OutPoint]struct{})
9✔
77
        for _, chanPoint := range chanPoints {
17✔
78
                unprocessedChans[chanPoint] = struct{}{}
8✔
79
        }
8✔
80

81
        haveChanFilter := len(unprocessedChans) != 0
9✔
82

9✔
83
        var failedUpdates []*lnrpc.FailedUpdate
9✔
84
        var edgesToUpdate []discovery.EdgeWithInfo
9✔
85
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
9✔
86

9✔
87
        // NOTE: edge may be nil when this function is called.
9✔
88
        processChan := func(info *models.ChannelEdgeInfo,
9✔
89
                edge *models.ChannelEdgePolicy) error {
17✔
90

8✔
91
                // If we have a channel filter, and this channel isn't a part
8✔
92
                // of it, then we'll skip it.
8✔
93
                _, ok := unprocessedChans[info.ChannelPoint]
8✔
94
                if !ok && haveChanFilter {
12✔
95
                        return nil
4✔
96
                }
4✔
97

98
                // Mark this channel as found by removing it. unprocessedChans
99
                // will be used to report invalid channels later on.
100
                delete(unprocessedChans, info.ChannelPoint)
7✔
101

7✔
102
                if edge == nil {
7✔
103
                        log.Errorf("Got nil channel edge policy when updating "+
×
104
                                "a channel. Channel point: %v",
×
105
                                info.ChannelPoint.String())
×
106

×
107
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
108
                                info.ChannelPoint,
×
109
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
110
                                "edge policy not found",
×
111
                        ))
×
112

×
113
                        return nil
×
114
                }
×
115

116
                // Apply the new policy to the edge.
117
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
7✔
118
                if err != nil {
7✔
119
                        failedUpdates = append(failedUpdates,
×
120
                                makeFailureItem(info.ChannelPoint,
×
121
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
122
                                        err.Error(),
×
123
                                ))
×
124

×
125
                        return nil
×
126
                }
×
127

128
                // Add updated edge to list of edges to send to gossiper.
129
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
7✔
130
                        Info: info,
7✔
131
                        Edge: edge,
7✔
132
                })
7✔
133

7✔
134
                var inboundWireFee lnwire.Fee
7✔
135
                edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
10✔
136
                        inboundWireFee = fee
3✔
137
                })
3✔
138
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
7✔
139

7✔
140
                // Add updated policy to list of policies to send to switch.
7✔
141
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
7✔
142
                        BaseFee:       edge.FeeBaseMSat,
7✔
143
                        FeeRate:       edge.FeeProportionalMillionths,
7✔
144
                        TimeLockDelta: uint32(edge.TimeLockDelta),
7✔
145
                        MinHTLCOut:    edge.MinHTLC,
7✔
146
                        MaxHTLC:       edge.MaxHTLC,
7✔
147
                        InboundFee:    inboundFee,
7✔
148
                }
7✔
149

7✔
150
                return nil
7✔
151
        }
152

153
        // Next, we'll loop over all the outgoing channels the router knows of.
154
        // If we have a filter then we'll only collect those channels, otherwise
155
        // we'll collect them all.
156
        err := r.ForAllOutgoingChannels(
9✔
157
                ctx, processChan,
9✔
158
                func() {
12✔
159
                        failedUpdates = nil
3✔
160
                        edgesToUpdate = nil
3✔
161
                        clear(policiesToUpdate)
3✔
162
                },
3✔
163
        )
164
        if err != nil {
9✔
165
                return nil, err
×
166
        }
×
167

168
        // Construct a list of failed policy updates.
169
        for chanPoint := range unprocessedChans {
12✔
170
                channel, err := r.FetchChannel(chanPoint)
3✔
171
                switch {
3✔
172
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
173
                        failedUpdates = append(failedUpdates,
1✔
174
                                makeFailureItem(chanPoint,
1✔
175
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
176
                                        "not found",
1✔
177
                                ))
1✔
178

179
                case err != nil:
×
180
                        failedUpdates = append(failedUpdates,
×
181
                                makeFailureItem(chanPoint,
×
182
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
183
                                        err.Error(),
×
184
                                ))
×
185

186
                case channel.IsPending:
×
187
                        failedUpdates = append(failedUpdates,
×
188
                                makeFailureItem(chanPoint,
×
189
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
190
                                        "not yet confirmed",
×
191
                                ))
×
192

193
                // If the edge was not found, but the channel is found, that
194
                // means the edge is missing in the graph database and should be
195
                // recreated. The edge and policy are created in-memory. The
196
                // edge is inserted in createEdge below and the policy will be
197
                // added to the graph in the PropagateChanPolicyUpdate call
198
                // below.
199
                case createMissingEdge:
1✔
200
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
201
                                "during policy update. Recreating edge with "+
1✔
202
                                "default policy.",
1✔
203
                                channel.FundingOutpoint.String())
1✔
204

1✔
205
                        info, edge, failedUpdate := r.createMissingEdge(
1✔
206
                                ctx, channel, newSchema,
1✔
207
                        )
1✔
208
                        if failedUpdate == nil {
2✔
209
                                err = processChan(info, edge)
1✔
210
                                if err != nil {
1✔
211
                                        return nil, err
×
212
                                }
×
213
                        } else {
×
214
                                failedUpdates = append(
×
215
                                        failedUpdates, failedUpdate,
×
216
                                )
×
217
                        }
×
218

219
                default:
1✔
220
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
221
                                "during policy update. Could not update "+
1✔
222
                                "policy.", channel.FundingOutpoint.String())
1✔
223

1✔
224
                        failedUpdates = append(failedUpdates,
1✔
225
                                makeFailureItem(chanPoint,
1✔
226
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
1✔
227
                                        "could not update policies",
1✔
228
                                ))
1✔
229
                }
230
        }
231

232
        // Commit the policy updates to disk and broadcast to the network. We
233
        // validated the new policy above, so we expect no validation errors. If
234
        // this would happen because of a bug, the link policy will be
235
        // desynchronized. It is currently not possible to atomically commit
236
        // multiple edge updates.
237
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
9✔
238
        if err != nil {
9✔
239
                return nil, err
×
240
        }
×
241

242
        // Update active links.
243
        r.UpdateForwardingPolicies(policiesToUpdate)
9✔
244

9✔
245
        return failedUpdates, nil
9✔
246
}
247

248
func (r *Manager) createMissingEdge(ctx context.Context,
249
        channel *channeldb.OpenChannel,
250
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
251
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
1✔
252

1✔
253
        info, edge, err := r.createEdge(channel, time.Now())
1✔
254
        if err != nil {
1✔
255
                log.Errorf("Failed to recreate missing edge "+
×
256
                        "for channel (%s): %v",
×
257
                        channel.FundingOutpoint.String(), err)
×
258

×
259
                return nil, nil, makeFailureItem(
×
260
                        channel.FundingOutpoint,
×
261
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
262
                        "could not update policies",
×
263
                )
×
264
        }
×
265

266
        // Validate the newly created edge policy with the user defined new
267
        // schema before adding the edge to the database.
268
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
1✔
269
        if err != nil {
1✔
270
                return nil, nil, makeFailureItem(
×
271
                        info.ChannelPoint,
×
272
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
273
                        err.Error(),
×
274
                )
×
275
        }
×
276

277
        // Insert the edge into the database to avoid `edge not
278
        // found` errors during policy update propagation.
279
        err = r.AddEdge(ctx, info)
1✔
280
        if err != nil {
1✔
281
                log.Errorf("Attempt to add missing edge for "+
×
282
                        "channel (%s) errored with: %v",
×
283
                        channel.FundingOutpoint.String(), err)
×
284

×
285
                return nil, nil, makeFailureItem(
×
286
                        channel.FundingOutpoint,
×
287
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
288
                        "could not add edge",
×
289
                )
×
290
        }
×
291

292
        return info, edge, nil
1✔
293
}
294

295
// createEdge recreates an edge and policy from an open channel in-memory.
296
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
297
        timestamp time.Time) (*models.ChannelEdgeInfo,
298
        *models.ChannelEdgePolicy, error) {
3✔
299

3✔
300
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
3✔
301
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
3✔
302
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
3✔
303
                SerializeCompressed()
3✔
304
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
3✔
305
                SerializeCompressed()
3✔
306
        channelFlags := lnwire.ChanUpdateChanFlags(0)
3✔
307

3✔
308
        // Make it such that node_id_1 is the lexicographically-lesser of the
3✔
309
        // two compressed keys sorted in ascending lexicographic order.
3✔
310
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
4✔
311
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
1✔
312
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
1✔
313
                        bitcoinKey1Bytes
1✔
314
                channelFlags = 1
1✔
315
        }
1✔
316

317
        // We need to make sure we use the real scid for public confirmed
318
        // zero-conf channels.
319
        shortChanID := channel.ShortChanID()
3✔
320
        isPublic := channel.ChannelFlags&lnwire.FFAnnounceChannel != 0
3✔
321
        if isPublic && channel.IsZeroConf() && channel.ZeroConfConfirmed() {
3✔
322
                shortChanID = channel.ZeroConfRealScid()
×
323
        }
×
324

325
        nodeKey1, err := route.NewVertexFromBytes(nodeKey1Bytes)
3✔
326
        if err != nil {
3✔
NEW
327
                return nil, nil, err
×
NEW
328
        }
×
329
        nodeKey2, err := route.NewVertexFromBytes(nodeKey2Bytes)
3✔
330
        if err != nil {
3✔
NEW
331
                return nil, nil, err
×
NEW
332
        }
×
333
        bitcoinKey1, err := route.NewVertexFromBytes(bitcoinKey1Bytes)
3✔
334
        if err != nil {
3✔
NEW
335
                return nil, nil, err
×
NEW
336
        }
×
337
        bitcoinKey2, err := route.NewVertexFromBytes(bitcoinKey2Bytes)
3✔
338
        if err != nil {
3✔
NEW
339
                return nil, nil, err
×
340
        }
×
341

342
        info, err := models.NewV1Channel(
3✔
343
                shortChanID.ToUint64(),
3✔
344
                channel.ChainHash,
3✔
345
                nodeKey1,
3✔
346
                nodeKey2,
3✔
347
                &models.ChannelV1Fields{
3✔
348
                        BitcoinKey1Bytes: bitcoinKey1,
3✔
349
                        BitcoinKey2Bytes: bitcoinKey2,
3✔
350
                },
3✔
351
                models.WithCapacity(channel.Capacity),
3✔
352
                models.WithChannelPoint(channel.FundingOutpoint),
3✔
353
        )
3✔
354
        if err != nil {
3✔
NEW
355
                return nil, nil, err
×
NEW
356
        }
×
357

358
        // Construct a dummy channel edge policy with default values that will
359
        // be updated with the new values in the call to processChan below.
360
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
3✔
361
        edge := &models.ChannelEdgePolicy{
3✔
362
                ChannelID:                 shortChanID.ToUint64(),
3✔
363
                LastUpdate:                timestamp,
3✔
364
                TimeLockDelta:             timeLockDelta,
3✔
365
                ChannelFlags:              channelFlags,
3✔
366
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
3✔
367
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
3✔
368
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
3✔
369
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
3✔
370
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
3✔
371
        }
3✔
372

3✔
373
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
3✔
374

3✔
375
        return info, edge, nil
3✔
376
}
377

378
// updateEdge updates the given edge with the new schema.
379
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
380
        edge *models.ChannelEdgePolicy,
381
        newSchema routing.ChannelPolicy) error {
8✔
382

8✔
383
        channel, err := r.FetchChannel(chanPoint)
8✔
384
        if err != nil {
8✔
385
                return err
×
386
        }
×
387

388
        // Update forwarding fee scheme and required time lock delta.
389
        edge.FeeBaseMSat = newSchema.BaseFee
8✔
390
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
8✔
391
                newSchema.FeeRate,
8✔
392
        )
8✔
393

8✔
394
        // If inbound fees are set, we update the edge with them.
8✔
395
        err = fn.MapOptionZ(newSchema.InboundFee,
8✔
396
                func(f models.InboundFee) error {
11✔
397
                        inboundWireFee := f.ToWire()
3✔
398
                        edge.InboundFee = fn.Some(inboundWireFee)
3✔
399

3✔
400
                        return edge.ExtraOpaqueData.PackRecords(
3✔
401
                                &inboundWireFee,
3✔
402
                        )
3✔
403
                })
3✔
404
        if err != nil {
8✔
405
                return err
×
406
        }
×
407

408
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
8✔
409

8✔
410
        // Retrieve negotiated channel htlc amt limits.
8✔
411
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
8✔
412
        if err != nil {
8✔
413
                return err
×
414
        }
×
415

416
        // We now update the edge max htlc value.
417
        switch {
8✔
418
        // If a non-zero max htlc was specified, use it to update the edge.
419
        // Otherwise keep the value unchanged.
420
        case newSchema.MaxHTLC != 0:
7✔
421
                edge.MaxHTLC = newSchema.MaxHTLC
7✔
422

423
        // If this edge still doesn't have a max htlc set, set it to the max.
424
        // This is an on-the-fly migration.
425
        case !edge.MessageFlags.HasMaxHtlc():
×
426
                edge.MaxHTLC = amtMax
×
427

428
        // If this edge has a max htlc that exceeds what the channel can
429
        // actually carry, correct it now. This can happen, because we
430
        // previously set the max htlc to the channel capacity.
431
        case edge.MaxHTLC > amtMax:
×
432
                edge.MaxHTLC = amtMax
×
433
        }
434

435
        // If a new min htlc is specified, update the edge.
436
        if newSchema.MinHTLC != nil {
8✔
437
                edge.MinHTLC = *newSchema.MinHTLC
×
438
        }
×
439

440
        // If the MaxHtlc flag wasn't already set, we can set it now.
441
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
8✔
442

8✔
443
        // Validate htlc amount constraints.
8✔
444
        switch {
8✔
445
        case edge.MinHTLC < amtMin:
×
446
                return fmt.Errorf(
×
447
                        "min htlc amount of %v is below min htlc parameter of %v",
×
448
                        edge.MinHTLC, amtMin,
×
449
                )
×
450

451
        case edge.MaxHTLC > amtMax:
×
452
                return fmt.Errorf(
×
453
                        "max htlc size of %v is above max pending amount of %v",
×
454
                        edge.MaxHTLC, amtMax,
×
455
                )
×
456

457
        case edge.MinHTLC > edge.MaxHTLC:
×
458
                return fmt.Errorf(
×
459
                        "min_htlc %v greater than max_htlc %v",
×
460
                        edge.MinHTLC, edge.MaxHTLC,
×
461
                )
×
462
        }
463

464
        // Clear signature to help prevent usage of the previous signature.
465
        edge.SetSigBytes(nil)
8✔
466

8✔
467
        return nil
8✔
468
}
469

470
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
471
// constraints.
472
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
473
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
8✔
474

8✔
475
        // The max htlc policy field must be less than or equal to the channel
8✔
476
        // capacity AND less than or equal to the max in-flight HTLC value.
8✔
477
        // Since the latter is always less than or equal to the former, just
8✔
478
        // return the max in-flight value.
8✔
479
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
8✔
480

8✔
481
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
8✔
482
}
8✔
483

484
// makeFailureItem creates a lnrpc.FailedUpdate object.
485
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
486
        errStr string) *lnrpc.FailedUpdate {
2✔
487

2✔
488
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
489

2✔
490
        return &lnrpc.FailedUpdate{
2✔
491
                Outpoint:    outpoint,
2✔
492
                Reason:      updateFailure,
2✔
493
                UpdateError: errStr,
2✔
494
        }
2✔
495
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc