• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.5
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/discovery"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/lnrpc"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing"
19
)
20

21
// Manager manages the node's local channels. The only operation that is
22
// currently implemented is updating forwarding policies.
23
type Manager struct {
24
        // SelfPub contains the public key of the local node.
25
        SelfPub *btcec.PublicKey
26

27
        // DefaultRoutingPolicy is the default routing policy.
28
        DefaultRoutingPolicy models.ForwardingPolicy
29

30
        // UpdateForwardingPolicies is used by the manager to update active
31
        // links with a new policy.
32
        UpdateForwardingPolicies func(
33
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
34

35
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
36
        // and broadcast it to the network.
37
        PropagateChanPolicyUpdate func(
38
                edgesToUpdate []discovery.EdgeWithInfo) error
39

40
        // ForAllOutgoingChannels is required to iterate over all our local
41
        // channels. The ChannelEdgePolicy parameter may be nil.
42
        ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo,
43
                *models.ChannelEdgePolicy) error) error
44

45
        // FetchChannel is used to query local channel parameters. Optionally an
46
        // existing db tx can be supplied.
47
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
48
                error)
49

50
        // AddEdge is used to add edge/channel to the topology of the router.
51
        AddEdge func(edge *models.ChannelEdgeInfo) error
52

53
        // policyUpdateLock ensures that the database and the link do not fall
54
        // out of sync if there are concurrent fee update calls. Without it,
55
        // there is a chance that policy A updates the database, then policy B
56
        // updates the database, then policy B updates the link, then policy A
57
        // updates the link.
58
        policyUpdateLock sync.Mutex
59
}
60

61
// UpdatePolicy updates the policy for the specified channels on disk and in
62
// the active links.
63
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
64
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
65
        []*lnrpc.FailedUpdate, error) {
6✔
66

6✔
67
        r.policyUpdateLock.Lock()
6✔
68
        defer r.policyUpdateLock.Unlock()
6✔
69

6✔
70
        // First, we'll construct a set of all the channels that we are
6✔
71
        // trying to update.
6✔
72
        unprocessedChans := make(map[wire.OutPoint]struct{})
6✔
73
        for _, chanPoint := range chanPoints {
11✔
74
                unprocessedChans[chanPoint] = struct{}{}
5✔
75
        }
5✔
76

77
        haveChanFilter := len(unprocessedChans) != 0
6✔
78

6✔
79
        var failedUpdates []*lnrpc.FailedUpdate
6✔
80
        var edgesToUpdate []discovery.EdgeWithInfo
6✔
81
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
6✔
82

6✔
83
        // NOTE: edge may be nil when this function is called.
6✔
84
        processChan := func(info *models.ChannelEdgeInfo,
6✔
85
                edge *models.ChannelEdgePolicy) error {
11✔
86

5✔
87
                // If we have a channel filter, and this channel isn't a part
5✔
88
                // of it, then we'll skip it.
5✔
89
                _, ok := unprocessedChans[info.ChannelPoint]
5✔
90
                if !ok && haveChanFilter {
6✔
91
                        return nil
1✔
92
                }
1✔
93

94
                // Mark this channel as found by removing it. unprocessedChans
95
                // will be used to report invalid channels later on.
96
                delete(unprocessedChans, info.ChannelPoint)
4✔
97

4✔
98
                if edge == nil {
4✔
99
                        log.Errorf("Got nil channel edge policy when updating "+
×
100
                                "a channel. Channel point: %v",
×
101
                                info.ChannelPoint.String())
×
102

×
103
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
104
                                info.ChannelPoint,
×
105
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
106
                                "edge policy not found",
×
107
                        ))
×
108

×
109
                        return nil
×
110
                }
×
111

112
                // Apply the new policy to the edge.
113
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
4✔
114
                if err != nil {
4✔
115
                        failedUpdates = append(failedUpdates,
×
116
                                makeFailureItem(info.ChannelPoint,
×
117
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
118
                                        err.Error(),
×
119
                                ))
×
120

×
121
                        return nil
×
122
                }
×
123

124
                // Add updated edge to list of edges to send to gossiper.
125
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
4✔
126
                        Info: info,
4✔
127
                        Edge: edge,
4✔
128
                })
4✔
129

4✔
130
                // Extract inbound fees from the ExtraOpaqueData.
4✔
131
                var inboundWireFee lnwire.Fee
4✔
132
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
4✔
133
                if err != nil {
4✔
134
                        return err
×
135
                }
×
136
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
4✔
137

4✔
138
                // Add updated policy to list of policies to send to switch.
4✔
139
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
4✔
140
                        BaseFee:       edge.FeeBaseMSat,
4✔
141
                        FeeRate:       edge.FeeProportionalMillionths,
4✔
142
                        TimeLockDelta: uint32(edge.TimeLockDelta),
4✔
143
                        MinHTLCOut:    edge.MinHTLC,
4✔
144
                        MaxHTLC:       edge.MaxHTLC,
4✔
145
                        InboundFee:    inboundFee,
4✔
146
                }
4✔
147

4✔
148
                return nil
4✔
149
        }
150

151
        // Next, we'll loop over all the outgoing channels the router knows of.
152
        // If we have a filter then we'll only collect those channels, otherwise
153
        // we'll collect them all.
154
        err := r.ForAllOutgoingChannels(processChan)
6✔
155
        if err != nil {
6✔
156
                return nil, err
×
157
        }
×
158

159
        // Construct a list of failed policy updates.
160
        for chanPoint := range unprocessedChans {
9✔
161
                channel, err := r.FetchChannel(chanPoint)
3✔
162
                switch {
3✔
163
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
164
                        failedUpdates = append(failedUpdates,
1✔
165
                                makeFailureItem(chanPoint,
1✔
166
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
167
                                        "not found",
1✔
168
                                ))
1✔
169

170
                case err != nil:
×
171
                        failedUpdates = append(failedUpdates,
×
172
                                makeFailureItem(chanPoint,
×
173
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
174
                                        err.Error(),
×
175
                                ))
×
176

177
                case channel.IsPending:
×
178
                        failedUpdates = append(failedUpdates,
×
179
                                makeFailureItem(chanPoint,
×
180
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
181
                                        "not yet confirmed",
×
182
                                ))
×
183

184
                // If the edge was not found, but the channel is found, that
185
                // means the edge is missing in the graph database and should be
186
                // recreated. The edge and policy are created in-memory. The
187
                // edge is inserted in createEdge below and the policy will be
188
                // added to the graph in the PropagateChanPolicyUpdate call
189
                // below.
190
                case createMissingEdge:
1✔
191
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
192
                                "during policy update. Recreating edge with "+
1✔
193
                                "default policy.",
1✔
194
                                channel.FundingOutpoint.String())
1✔
195

1✔
196
                        info, edge, failedUpdate := r.createMissingEdge(
1✔
197
                                channel, newSchema,
1✔
198
                        )
1✔
199
                        if failedUpdate == nil {
2✔
200
                                err = processChan(info, edge)
1✔
201
                                if err != nil {
1✔
202
                                        return nil, err
×
203
                                }
×
204
                        } else {
×
205
                                failedUpdates = append(
×
206
                                        failedUpdates, failedUpdate,
×
207
                                )
×
208
                        }
×
209

210
                default:
1✔
211
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
212
                                "during policy update. Could not update "+
1✔
213
                                "policy.", channel.FundingOutpoint.String())
1✔
214

1✔
215
                        failedUpdates = append(failedUpdates,
1✔
216
                                makeFailureItem(chanPoint,
1✔
217
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
1✔
218
                                        "could not update policies",
1✔
219
                                ))
1✔
220
                }
221
        }
222

223
        // Commit the policy updates to disk and broadcast to the network. We
224
        // validated the new policy above, so we expect no validation errors. If
225
        // this would happen because of a bug, the link policy will be
226
        // desynchronized. It is currently not possible to atomically commit
227
        // multiple edge updates.
228
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
6✔
229
        if err != nil {
6✔
230
                return nil, err
×
231
        }
×
232

233
        // Update active links.
234
        r.UpdateForwardingPolicies(policiesToUpdate)
6✔
235

6✔
236
        return failedUpdates, nil
6✔
237
}
238

239
func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel,
240
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
241
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
1✔
242

1✔
243
        info, edge, err := r.createEdge(channel, time.Now())
1✔
244
        if err != nil {
1✔
245
                log.Errorf("Failed to recreate missing edge "+
×
246
                        "for channel (%s): %v",
×
247
                        channel.FundingOutpoint.String(), err)
×
248

×
249
                return nil, nil, makeFailureItem(
×
250
                        channel.FundingOutpoint,
×
251
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
252
                        "could not update policies",
×
253
                )
×
254
        }
×
255

256
        // Validate the newly created edge policy with the user defined new
257
        // schema before adding the edge to the database.
258
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
1✔
259
        if err != nil {
1✔
260
                return nil, nil, makeFailureItem(
×
261
                        info.ChannelPoint,
×
262
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
263
                        err.Error(),
×
264
                )
×
265
        }
×
266

267
        // Insert the edge into the database to avoid `edge not
268
        // found` errors during policy update propagation.
269
        err = r.AddEdge(info)
1✔
270
        if err != nil {
1✔
271
                log.Errorf("Attempt to add missing edge for "+
×
272
                        "channel (%s) errored with: %v",
×
273
                        channel.FundingOutpoint.String(), err)
×
274

×
275
                return nil, nil, makeFailureItem(
×
276
                        channel.FundingOutpoint,
×
277
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
278
                        "could not add edge",
×
279
                )
×
280
        }
×
281

282
        return info, edge, nil
1✔
283
}
284

285
// createEdge recreates an edge and policy from an open channel in-memory.
286
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
287
        timestamp time.Time) (*models.ChannelEdgeInfo,
288
        *models.ChannelEdgePolicy, error) {
3✔
289

3✔
290
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
3✔
291
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
3✔
292
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
3✔
293
                SerializeCompressed()
3✔
294
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
3✔
295
                SerializeCompressed()
3✔
296
        channelFlags := lnwire.ChanUpdateChanFlags(0)
3✔
297

3✔
298
        // Make it such that node_id_1 is the lexicographically-lesser of the
3✔
299
        // two compressed keys sorted in ascending lexicographic order.
3✔
300
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
4✔
301
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
1✔
302
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
1✔
303
                        bitcoinKey1Bytes
1✔
304
                channelFlags = 1
1✔
305
        }
1✔
306

307
        var featureBuf bytes.Buffer
3✔
308
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
3✔
309
        if err != nil {
3✔
310
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
311
                        err)
×
312
        }
×
313

314
        // We need to make sure we use the real scid for public confirmed
315
        // zero-conf channels.
316
        shortChanID := channel.ShortChanID()
3✔
317
        isPublic := channel.ChannelFlags&lnwire.FFAnnounceChannel != 0
3✔
318
        if isPublic && channel.IsZeroConf() && channel.ZeroConfConfirmed() {
3✔
319
                shortChanID = channel.ZeroConfRealScid()
×
320
        }
×
321

322
        info := &models.ChannelEdgeInfo{
3✔
323
                ChannelID:    shortChanID.ToUint64(),
3✔
324
                ChainHash:    channel.ChainHash,
3✔
325
                Features:     featureBuf.Bytes(),
3✔
326
                Capacity:     channel.Capacity,
3✔
327
                ChannelPoint: channel.FundingOutpoint,
3✔
328
        }
3✔
329

3✔
330
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
3✔
331
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
3✔
332
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
3✔
333
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
3✔
334

3✔
335
        // Construct a dummy channel edge policy with default values that will
3✔
336
        // be updated with the new values in the call to processChan below.
3✔
337
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
3✔
338
        edge := &models.ChannelEdgePolicy{
3✔
339
                ChannelID:                 shortChanID.ToUint64(),
3✔
340
                LastUpdate:                timestamp,
3✔
341
                TimeLockDelta:             timeLockDelta,
3✔
342
                ChannelFlags:              channelFlags,
3✔
343
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
3✔
344
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
3✔
345
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
3✔
346
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
3✔
347
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
3✔
348
        }
3✔
349

3✔
350
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
3✔
351

3✔
352
        return info, edge, nil
3✔
353
}
354

355
// updateEdge updates the given edge with the new schema.
356
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
357
        edge *models.ChannelEdgePolicy,
358
        newSchema routing.ChannelPolicy) error {
5✔
359

5✔
360
        channel, err := r.FetchChannel(chanPoint)
5✔
361
        if err != nil {
5✔
362
                return err
×
363
        }
×
364

365
        // Update forwarding fee scheme and required time lock delta.
366
        edge.FeeBaseMSat = newSchema.BaseFee
5✔
367
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
5✔
368
                newSchema.FeeRate,
5✔
369
        )
5✔
370

5✔
371
        // If inbound fees are set, we update the edge with them.
5✔
372
        err = fn.MapOptionZ(newSchema.InboundFee,
5✔
373
                func(f models.InboundFee) error {
5✔
UNCOV
374
                        inboundWireFee := f.ToWire()
×
UNCOV
375
                        return edge.ExtraOpaqueData.PackRecords(
×
UNCOV
376
                                &inboundWireFee,
×
UNCOV
377
                        )
×
UNCOV
378
                })
×
379
        if err != nil {
5✔
380
                return err
×
381
        }
×
382

383
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
5✔
384

5✔
385
        // Retrieve negotiated channel htlc amt limits.
5✔
386
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
5✔
387
        if err != nil {
5✔
388
                return err
×
389
        }
×
390

391
        // We now update the edge max htlc value.
392
        switch {
5✔
393
        // If a non-zero max htlc was specified, use it to update the edge.
394
        // Otherwise keep the value unchanged.
395
        case newSchema.MaxHTLC != 0:
4✔
396
                edge.MaxHTLC = newSchema.MaxHTLC
4✔
397

398
        // If this edge still doesn't have a max htlc set, set it to the max.
399
        // This is an on-the-fly migration.
400
        case !edge.MessageFlags.HasMaxHtlc():
×
401
                edge.MaxHTLC = amtMax
×
402

403
        // If this edge has a max htlc that exceeds what the channel can
404
        // actually carry, correct it now. This can happen, because we
405
        // previously set the max htlc to the channel capacity.
406
        case edge.MaxHTLC > amtMax:
×
407
                edge.MaxHTLC = amtMax
×
408
        }
409

410
        // If a new min htlc is specified, update the edge.
411
        if newSchema.MinHTLC != nil {
5✔
412
                edge.MinHTLC = *newSchema.MinHTLC
×
413
        }
×
414

415
        // If the MaxHtlc flag wasn't already set, we can set it now.
416
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
5✔
417

5✔
418
        // Validate htlc amount constraints.
5✔
419
        switch {
5✔
420
        case edge.MinHTLC < amtMin:
×
421
                return fmt.Errorf(
×
422
                        "min htlc amount of %v is below min htlc parameter of %v",
×
423
                        edge.MinHTLC, amtMin,
×
424
                )
×
425

426
        case edge.MaxHTLC > amtMax:
×
427
                return fmt.Errorf(
×
428
                        "max htlc size of %v is above max pending amount of %v",
×
429
                        edge.MaxHTLC, amtMax,
×
430
                )
×
431

432
        case edge.MinHTLC > edge.MaxHTLC:
×
433
                return fmt.Errorf(
×
434
                        "min_htlc %v greater than max_htlc %v",
×
435
                        edge.MinHTLC, edge.MaxHTLC,
×
436
                )
×
437
        }
438

439
        // Clear signature to help prevent usage of the previous signature.
440
        edge.SetSigBytes(nil)
5✔
441

5✔
442
        return nil
5✔
443
}
444

445
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
446
// constraints.
447
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
448
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
5✔
449

5✔
450
        // The max htlc policy field must be less than or equal to the channel
5✔
451
        // capacity AND less than or equal to the max in-flight HTLC value.
5✔
452
        // Since the latter is always less than or equal to the former, just
5✔
453
        // return the max in-flight value.
5✔
454
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
5✔
455

5✔
456
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
5✔
457
}
5✔
458

459
// makeFailureItem creates a lnrpc.FailedUpdate object.
460
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
461
        errStr string) *lnrpc.FailedUpdate {
2✔
462

2✔
463
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
464

2✔
465
        return &lnrpc.FailedUpdate{
2✔
466
                Outpoint:    outpoint,
2✔
467
                Reason:      updateFailure,
2✔
468
                UpdateError: errStr,
2✔
469
        }
2✔
470
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc